You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/09 22:51:58 UTC

[airflow] branch v2-1-test updated (3833fb7 -> 0bd9558)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a change to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard 3833fb7  Better diagnostics and self-healing of docker-compose (#17484)
 discard 0948366  Install providers from sources in prod image only on main (#17458)
 discard 8bd1a7d  Attempt to reduce flakiness of PythonVirtualeEnv test_airflow_context (#17486)
 discard 562be8c  Disable Helm tests when branch is not main
 discard 2ba71c9  Fixes some static check errors (#17432)
 discard 8fc7697  Switches to "/" convention in ghcr.io images (#17356)
 discard f196690  Fix failing static checks in main (#17424)
 discard 2df7e6e  Improve diagnostics message when users have secret_key misconfigured (#17410)
 discard 7371f98  Add timeout when asking whether to rebuild image (#17412)
 discard 106d9f0  Optimizes structure of the Dockerfiles and use latest tools (#17418)
 discard 8609c82  Improve image building documentation for new users (#17409)
 discard 38c7115  Increases timeout for helm chart builds (#17417)
 discard 70794b0  Do not pull CI image for ownership fixing on first, fresh breeze run (#17419)
 discard 7a43960  Enhancement to bash scripts (#17098)
 discard 3c0f4d9  Updates to FlaskAppBuilder 3.3.2+ (#17208)
 discard 37c935d  Update alias for field_mask in Google Memmcache (#16975)
 discard 42334b0  AIRFLOW-5529 Add Apache Drill provider. (#16884)
 discard 867adda  bump dnspython (#16698)
 discard dfeb73d  Add type annotations to setup.py (#16658)
 discard fe5920e  Bump Jinja2 upper-bound from 2.12.0 to 4.0.0 (#16595)
 discard 68977b9  Remove SQLAlchemy <1.4 constraint (#16630)
 discard ab3634a  Switch back http provider after requests removes LGPL dependency (#16974)
     new 3d2a5cf  Switches to "/" convention in ghcr.io images with optimisations
     new 88e6305  Switch back http provider after requests removes LGPL dependency (#16974)
     new e8058eb  Remove SQLAlchemy <1.4 constraint (#16630)
     new c967d29  Bump Jinja2 upper-bound from 2.12.0 to 4.0.0 (#16595)
     new f3b74d8  Add type annotations to setup.py (#16658)
     new ab46af9  bump dnspython (#16698)
     new 4aef457  AIRFLOW-5529 Add Apache Drill provider. (#16884)
     new f14860d  Update alias for field_mask in Google Memmcache (#16975)
     new ac832a2  Updates to FlaskAppBuilder 3.3.2+ (#17208)
     new 673d78a  fix(smart_sensor): Unbound variable errors (#14774)
     new 012321b  Fail tasks in scheduler when executor reports they failed (#15929)
     new 8e8b58a  Fix CLI 'kubernetes cleanup-pods' which fails on invalid label key (#17298)
     new b558205  Fix calculating duration in tree view (#16695)
     new 4678cf54 Validate type of `priority_weight` during parsing (#16765)
     new cf64187  Add back missing permissions to UserModelView controls. (#17431)
     new eaa21db  Add missing permissions to varimport (#17468)
     new 40f4388  Improve diagnostics message when users have secret_key misconfigured (#17410)
     new 798237a  Handle and log exceptions raised during task callback (#17347)
     new ed76d11  Show serialization exceptions in DAG parsing log (#17277)
     new 4a4959f   Fix running tasks with default_impersonation config (#17229)
     new 24e75e7  Do not seek error file when it is closed (#17187)
     new 1b4af0b  Fix: ``TaskInstance`` does not show ``queued_by_job_id`` & ``external_executor_id`` (#17179)
     new aaa9a4b  Adds more explanatory message when SecretsMasker is not configured (#17101)
     new 66897eb  #16976 Add json.dumps() for templated fields objects: 'dict' and 'list' (#17082)
     new 120cbb9  Core: Enable the use of __init_subclass__ in subclasses of BaseOperator (#17027)
     new 3beb37e  Fix UPDATING.md (#16933)
     new 36f0ecb  Fixed task instance retrieval in XCom view (#16923)
     new 77ce4a2  Fix minor typo in configuration.py (#16832)
     new 3750e5e  Don't check execution_date in refresh_from_db (#16809)
     new 60938f0  fix: instance name env var (#16749)
     new 479a34d  Fix unchecked indexing in _build_metrics (#16744)
     new 12c46c9  BugFix: Correctly handle custom `deps` and `task_group` during DAG Serialization (#16734)
     new 55a4645  Fix slow (cleared) tasks being be adopted by Celery worker. (#16718)
     new 7288621  Set Process title for Worker when using ``LocalExecutor`` (#16623)
     new 88fd2db  Fix ``AttributeError``: ``datetime.timezone`` object has no attribute ``name`` (#16599)
     new 4d2db2e  Redact conn secrets in webserver logs (#16579)
     new f479eff  fix: change graph focus to top of view instead of center (#16484)
     new 67b4ff0  commiting dagPickle session when the airflow tasks run --ship-dag --interactive command is executed FIXES: 15748 (#15890)
     new 0bd9558  Proper warning message when recorded PID is different from current PID (#17411)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (3833fb7)
            \
             N -- N -- N   refs/heads/v2-1-test (0bd9558)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 39 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .dockerignore                                      |   4 +-
 .github/workflows/build-images.yml                 |   7 +-
 .github/workflows/ci.yml                           |  16 +-
 BREEZE.rst                                         |   6 +
 Dockerfile.ci                                      |   4 +-
 README.md                                          | 123 ++++++-------
 UPDATING.md                                        |   4 +-
 airflow/cli/commands/kubernetes_command.py         |  12 +-
 airflow/cli/commands/task_command.py               |   6 +-
 airflow/configuration.py                           |   2 +-
 airflow/executors/local_executor.py                |   4 +
 airflow/hooks/base.py                              |   5 +-
 airflow/jobs/local_task_job.py                     |  16 +-
 airflow/jobs/scheduler_job.py                      |   6 +-
 airflow/models/baseoperator.py                     |  11 +-
 airflow/models/dag.py                              |   4 +-
 airflow/models/dagbag.py                           |   1 +
 airflow/models/dagrun.py                           |  29 +--
 airflow/models/taskinstance.py                     |  21 ++-
 airflow/sensors/smart_sensor.py                    |   1 +
 airflow/serialization/serialized_objects.py        |  16 +-
 airflow/utils/cli.py                               |   3 +-
 airflow/utils/log/file_task_handler.py             |  13 +-
 airflow/utils/log/secrets_masker.py                |   7 +-
 airflow/www/static/js/graph.js                     |   4 +-
 airflow/www/static/js/tree.js                      |   6 +-
 airflow/www/views.py                               | 151 +++++++++-------
 breeze                                             |   3 +
 confirm                                            |   6 +-
 docs/apache-airflow-providers/index.rst            |   5 -
 .../howto/customize-dag-ui-page-instance-name.rst  |   2 +-
 docs/apache-airflow/start/docker-compose.yaml      |  63 +------
 docs/apache-airflow/start/docker.rst               |  31 +---
 docs/docker-stack/build.rst                        |  13 --
 .../extending/add-apt-packages/Dockerfile          |   2 +-
 .../add-build-essential-extend/Dockerfile          |   2 +-
 .../extending/add-providers/Dockerfile             |  20 ---
 .../extending/add-pypi-packages/Dockerfile         |   2 +-
 .../extending/embedding-dags/Dockerfile            |   2 +-
 .../extending/writable-directory/Dockerfile        |   2 +-
 scripts/ci/images/ci_prepare_ci_image_on_ci.sh     |   2 +-
 scripts/ci/images/ci_prepare_prod_image_on_ci.sh   |   2 +-
 .../ci/images/ci_wait_for_and_verify_ci_image.sh   |   6 +-
 .../ci/images/ci_wait_for_and_verify_prod_image.sh |   6 +-
 scripts/ci/libraries/_build_images.sh              |  19 +-
 scripts/ci/libraries/_push_pull_remove_images.sh   |   2 +-
 scripts/in_container/entrypoint_ci.sh              |   2 -
 .../prod/airflow_scheduler_autorestart.sh          |   8 +-
 tests/cli/commands/test_kubernetes_command.py      |   8 +-
 tests/executors/test_celery_executor.py            |   2 +
 tests/jobs/test_local_task_job.py                  |  42 +++++
 tests/jobs/test_scheduler_job.py                   |  16 +-
 tests/models/test_baseoperator.py                  |  31 ++++
 tests/models/test_cleartasks.py                    |  24 +++
 tests/models/test_dag.py                           |  40 +++++
 tests/models/test_dagbag.py                        |   4 +-
 tests/models/test_taskinstance.py                  | 106 +++++++++++
 tests/operators/test_python.py                     |   3 -
 tests/serialization/test_dag_serialization.py      | 101 +++++++++++
 tests/www/views/test_views_custom_user_views.py    | 199 +++++++++++++++++++++
 tests/www/views/test_views_variable.py             |  13 ++
 61 files changed, 860 insertions(+), 411 deletions(-)
 delete mode 100644 docs/docker-stack/docker-examples/extending/add-providers/Dockerfile
 create mode 100644 tests/www/views/test_views_custom_user_views.py

[airflow] 33/39: Fix slow (cleared) tasks being be adopted by Celery worker. (#16718)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 55a4645044a46d33dd5934a43856af0e46ba4757
Author: Jorrick Sleijster <jo...@gmail.com>
AuthorDate: Sat Jul 3 00:02:01 2021 +0200

    Fix slow (cleared) tasks being be adopted by Celery worker. (#16718)
    
    Celery executor is currently adopting anything that has ever run before and has been cleared since then.
    
    **Example of the issue:**
    We have a DAG that runs over 150 sensor tasks and 50 ETL tasks while having a concurrency of 3 and max_active_runs of 16. This setup is required because we want to divide the resources and we don't want this DAG to take up all the resources. What will happen is that many tasks will be in scheduled for a bit as it can't queue them due to the concurrency of 3. However, because of the current implementations, if these tasks ever run before, they would get adopted by the schedulers execut [...]
    
    **Contents of the PR**:
    1. Tasks that are in scheduled should never have arrived at an executor. Hence, we remove the task state scheduled from the option to be adopted.
    2. Given this task instance `external_executor_id`  is quite important in deciding whether it is adopted, we will also reset this when we reset the state of the TaskInstance.
    
    (cherry picked from commit 554a23928efb4ff1d87d115ae2664edec3a9408c)
---
 airflow/jobs/scheduler_job.py           |  2 +-
 airflow/models/taskinstance.py          |  1 +
 tests/executors/test_celery_executor.py |  2 ++
 tests/jobs/test_scheduler_job.py        | 14 +++++++-------
 tests/models/test_cleartasks.py         | 24 ++++++++++++++++++++++++
 5 files changed, 35 insertions(+), 8 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 1758ae1..fe8e0b0 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1859,7 +1859,7 @@ class SchedulerJob(BaseJob):
                         self.log.info("Marked %d SchedulerJob instances as failed", num_failed)
                         Stats.incr(self.__class__.__name__.lower() + '_end', num_failed)
 
-                    resettable_states = [State.SCHEDULED, State.QUEUED, State.RUNNING]
+                    resettable_states = [State.QUEUED, State.RUNNING]
                     query = (
                         session.query(TI)
                         .filter(TI.state.in_(resettable_states))
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index ea5a72e..b99fa34 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -172,6 +172,7 @@ def clear_task_instances(
                 # original max_tries or the last attempted try number.
                 ti.max_tries = max(ti.max_tries, ti.prev_attempted_tries)
             ti.state = State.NONE
+            ti.external_executor_id = None
             session.merge(ti)
 
         task_id_by_key[ti.dag_id][ti.execution_date][ti.try_number].add(ti.task_id)
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index e24a3dd..88ea95c 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -329,9 +329,11 @@ class TestCeleryExecutor(unittest.TestCase):
         ti1 = TaskInstance(task=task_1, execution_date=exec_date)
         ti1.external_executor_id = '231'
         ti1.queued_dttm = queued_dttm
+        ti1.state = State.QUEUED
         ti2 = TaskInstance(task=task_2, execution_date=exec_date)
         ti2.external_executor_id = '232'
         ti2.queued_dttm = queued_dttm
+        ti2.state = State.QUEUED
 
         tis = [ti1, ti2]
         executor = celery_executor.CeleryExecutor()
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 37ae65b..fe0b257 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2124,9 +2124,9 @@ class TestSchedulerJob(unittest.TestCase):
             session=session,
         )
         ti = dr.get_task_instance(task_id=op1.task_id, session=session)
-        ti.state = State.SCHEDULED
+        ti.state = State.QUEUED
         ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
-        ti2.state = State.SCHEDULED
+        ti2.state = State.QUEUED
         session.commit()
 
         processor = mock.MagicMock()
@@ -2140,7 +2140,7 @@ class TestSchedulerJob(unittest.TestCase):
         assert ti.state == State.NONE
 
         ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
-        assert ti2.state == State.SCHEDULED, "Tasks run by Backfill Jobs should not be reset"
+        assert ti2.state == State.QUEUED, "Tasks run by Backfill Jobs should not be reset"
 
     @parameterized.expand(
         [
@@ -3654,7 +3654,7 @@ class TestSchedulerJob(unittest.TestCase):
             session=session,
         )
         ti = dr1.get_task_instances(session=session)[0]
-        ti.state = State.SCHEDULED
+        ti.state = State.QUEUED
         session.merge(ti)
         session.merge(dr1)
         session.commit()
@@ -3799,12 +3799,12 @@ class TestSchedulerJob(unittest.TestCase):
 
         ti1, ti2 = dr1.get_task_instances(session=session)
         dr1.state = State.RUNNING
-        ti1.state = State.SCHEDULED
+        ti1.state = State.QUEUED
         ti1.queued_by_job_id = old_job.id
         session.merge(dr1)
         session.merge(ti1)
 
-        ti2.state = State.SCHEDULED
+        ti2.state = State.QUEUED
         ti2.queued_by_job_id = self.scheduler_job.id
         session.merge(ti2)
         session.flush()
@@ -3816,7 +3816,7 @@ class TestSchedulerJob(unittest.TestCase):
         session.refresh(ti1)
         assert ti1.state is None
         session.refresh(ti2)
-        assert State.SCHEDULED == ti2.state
+        assert ti2.state == State.QUEUED
         session.rollback()
         if old_job.processor_agent:
             old_job.processor_agent.end()
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index 1c5606e..9b8fbd0 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -56,6 +56,7 @@ class TestClearTasks(unittest.TestCase):
 
         ti0.run()
         ti1.run()
+
         with create_session() as session:
             qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
             clear_task_instances(qry, session, dag=dag)
@@ -68,6 +69,29 @@ class TestClearTasks(unittest.TestCase):
         assert ti1.try_number == 2
         assert ti1.max_tries == 3
 
+    def test_clear_task_instances_external_executor_id(self):
+        dag = DAG(
+            'test_clear_task_instances_external_executor_id',
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+        )
+        task0 = DummyOperator(task_id='task0', owner='test', dag=dag)
+        ti0 = TI(task=task0, execution_date=DEFAULT_DATE)
+        ti0.state = State.SUCCESS
+        ti0.external_executor_id = "some_external_executor_id"
+
+        with create_session() as session:
+            session.add(ti0)
+            session.commit()
+
+            qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
+            clear_task_instances(qry, session, dag=dag)
+
+            ti0.refresh_from_db()
+
+            assert ti0.state is None
+            assert ti0.external_executor_id is None
+
     def test_clear_task_instances_without_task(self):
         dag = DAG(
             'test_clear_task_instances_without_task',

[airflow] 34/39: Set Process title for Worker when using ``LocalExecutor`` (#16623)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 72886212530e75271f92f94e0e5ebde077c393ee
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Thu Jun 24 09:18:52 2021 +0100

    Set Process title for Worker when using ``LocalExecutor`` (#16623)
    
    This has annoyed me for a long time. When using  ``LocalExecutor``, it was difficult to see which process is a worker as it just showed up as below -- which had same title as parent scheduler process. This PR/commit adds a title for idle workers and when a task is running it has the "command" that is running in the title, similar to our supervising process
    
    Before:
    
    ```
    root       124  0.0  0.0   6676  4636 pts/1    Ss   Jun23   0:00  \_ -bash
    root      1449  0.8  2.6 988356 326312 pts/1   Sl+  Jun23   0:16  |   \_ /usr/local/bin/python /usr/local/bin/airflow webserver
    root      1584  0.0  0.4 121068 56864 pts/1    S+   Jun23   0:01  |       \_ gunicorn: master [airflow-webserver]
    root      1587  0.6  2.5 986144 318712 pts/1   Sl+  Jun23   0:12  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1588  0.6  2.5 984776 317672 pts/1   Sl+  Jun23   0:12  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1589  0.6  2.5 985688 318148 pts/1   Sl+  Jun23   0:12  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1590  0.6  2.5 985200 317776 pts/1   Sl+  Jun23   0:11  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root       128  0.0  0.0   6676  4552 pts/2    Ss   Jun23   0:00  \_ -bash
    root     13933 31.0  0.9 466596 117656 pts/2   S+   00:22   0:01      \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13941  0.0  0.7 466340 97988 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13942  3.2  0.8 1392072 100136 pts/2  Sl+  00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13950  0.0  0.8 466340 98404 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13952  0.0  0.8 466340 98404 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13955  0.0  0.8 466340 98404 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13958  0.0  0.8 466340 98404 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13962  0.0  0.8 466340 98404 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13966  0.0  0.8 466340 98404 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13969  0.0  0.8 466340 98404 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13975  0.0  0.8 466340 98404 pts/2    S+   00:22   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13979  6.5  0.8 466596 99956 pts/2    S    00:22   0:00          \_ airflow scheduler -- DagFileProcessorManager
    ```
    
    After (with no running tasks - idle workers):
    
    ```
    root       124  0.0  0.0   6676  4636 pts/1    Ss   Jun23   0:00  \_ -bash
    root      1449  0.8  2.6 988356 326312 pts/1   Sl+  Jun23   0:16  |   \_ /usr/local/bin/python /usr/local/bin/airflow webserver
    root      1584  0.0  0.4 121068 56864 pts/1    S+   Jun23   0:01  |       \_ gunicorn: master [airflow-webserver]
    root      1587  0.6  2.5 985752 318184 pts/1   Sl+  Jun23   0:12  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1588  0.6  2.5 984776 317672 pts/1   Sl+  Jun23   0:11  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1589  0.6  2.5 985688 318148 pts/1   Sl+  Jun23   0:12  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1590  0.6  2.5 985200 317776 pts/1   Sl+  Jun23   0:11  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root       128  0.0  0.0   6676  4552 pts/2    Ss   Jun23   0:00  \_ -bash
    root     13237 25.7  0.9 466596 117692 pts/2   S+   00:20   0:02      \_ airflow worker -- LocalExecutor
    root     13245  0.1  0.7 466340 97804 pts/2    S+   00:20   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13246  2.1  0.8 1318340 100104 pts/2  Sl+  00:20   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     13254  0.0  0.8 466340 98396 pts/2    S+   00:20   0:00          \_ airflow worker -- LocalExecutor
    root     13256  0.0  0.8 466340 98396 pts/2    S+   00:20   0:00          \_ airflow worker -- LocalExecutor
    root     13259  0.0  0.8 466340 98396 pts/2    S+   00:20   0:00          \_ airflow worker -- LocalExecutor
    root     13263  0.0  0.8 466340 98396 pts/2    S+   00:20   0:00          \_ airflow worker -- LocalExecutor
    root     13267  0.0  0.8 466340 98396 pts/2    S+   00:20   0:00          \_ airflow worker -- LocalExecutor
    root     13271  0.0  0.8 466340 98396 pts/2    S+   00:20   0:00          \_ airflow worker -- LocalExecutor
    root     13274  0.0  0.8 466340 98396 pts/2    S+   00:20   0:00          \_ airflow worker -- LocalExecutor
    root     13276  0.0  0.8 466340 98396 pts/2    S+   00:20   0:00          \_ airflow worker -- LocalExecutor
    root     13282  4.1  0.8 466596 99952 pts/2    S    00:20   0:00          \_ airflow scheduler -- DagFileProcessorManager
    ```
    
    After (with running tasks):
    ```
    root@a7c8aa590704:/opt/airflow# ps auxf
    USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
    root      6434  0.0  0.0   6652  4584 pts/3    Ss   00:01   0:00 /bin/bash
    root     19250  0.0  0.0   9556  3064 pts/3    R+   00:39   0:00  \_ ps auxf
    root         1  0.0  0.0   2148   720 ?        Ss   Jun23   0:00 /usr/bin/dumb-init -- /entrypoint
    root         7  0.0  0.0   6656  4400 pts/0    Ss   Jun23   0:00 /bin/bash
    root       121  0.0  0.0   8220  3228 pts/0    S+   Jun23   0:00  \_ tmux
    root       101  0.0  0.0  15856  4272 ?        Ss   Jun23   0:00 /usr/sbin/sshd
    root       123  0.0  0.0  10176  5148 ?        Ss   Jun23   0:00 tmux
    root       124  0.0  0.0   6676  4636 pts/1    Ss   Jun23   0:00  \_ -bash
    root      1449  0.6  2.6 988356 326312 pts/1   Sl+  Jun23   0:20  |   \_ /usr/local/bin/python /usr/local/bin/airflow webserver
    root      1584  0.0  0.4 121068 56864 pts/1    S+   Jun23   0:01  |       \_ gunicorn: master [airflow-webserver]
    root      1587  0.4  2.5 986144 318712 pts/1   Sl+  Jun23   0:12  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1588  0.4  2.5 984776 317672 pts/1   Sl+  Jun23   0:12  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1589  0.4  2.5 985848 318600 pts/1   Sl+  Jun23   0:13  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root      1590  0.4  2.5 985628 318424 pts/1   Sl+  Jun23   0:12  |           \_ [ready] gunicorn: worker [airflow-webserver]
    root       128  0.0  0.0   6676  4552 pts/2    Ss   Jun23   0:00  \_ -bash
    root     19030 17.9  0.9 467108 118628 pts/2   S+   00:38   0:02      \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     19038  0.0  0.7 466084 97776 pts/2    S+   00:38   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     19039  1.4  0.8 1318084 99804 pts/2   Sl+  00:38   0:00          \_ /usr/local/bin/python /usr/local/bin/airflow scheduler
    root     19047  0.0  0.8 466084 98692 pts/2    S+   00:38   0:00          \_ airflow worker -- LocalExecutor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_2', '2021-06-24T00:39:06.539715+00:00', '--local', '
    root     19240 25.3  0.8 470820 104400 pts/2   S+   00:39   0:00          |   \_ airflow task supervisor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_2', '2021-06-24T00:39:06.539715+00:00', '--local', '--po
    root     19246  0.0  0.8 470956 103980 pts/2   S    00:39   0:00          |       \_ airflow task runner: example_bash_operator runme_2 2021-06-24T00:39:06.539715+00:00 91
    root     19049  0.1  0.8 466084 98696 pts/2    S+   00:38   0:00          \_ airflow worker -- LocalExecutor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_1', '2021-06-24T00:39:06.539715+00:00', '--local', '
    root     19241 26.0  0.8 470824 104408 pts/2   S+   00:39   0:00          |   \_ airflow task supervisor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_1', '2021-06-24T00:39:06.539715+00:00', '--local', '--po
    root     19248  0.0  0.8 470824 103720 pts/2   S    00:39   0:00          |       \_ airflow task runner: example_bash_operator runme_1 2021-06-24T00:39:06.539715+00:00 93
    root     19052  0.1  0.8 466084 98760 pts/2    S+   00:38   0:00          \_ airflow worker -- LocalExecutor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_0', '2021-06-24T00:39:06.539715+00:00', '--local', '
    root     19244 26.0  0.8 470824 104404 pts/2   S+   00:39   0:00          |   \_ airflow task supervisor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_0', '2021-06-24T00:39:06.539715+00:00', '--local', '--po
    root     19245  0.0  0.8 471212 104032 pts/2   S    00:39   0:00          |       \_ airflow task runner: example_bash_operator runme_0 2021-06-24T00:39:06.539715+00:00 90
    root     19056  0.1  0.8 466084 98760 pts/2    S+   00:38   0:00          \_ airflow worker -- LocalExecutor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'this_will_skip', '2021-06-24T00:39:06.539715+00:00', '--lo
    root     19243 24.6  0.8 470824 104400 pts/2   S+   00:39   0:00          |   \_ airflow task supervisor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'this_will_skip', '2021-06-24T00:39:06.539715+00:00', '--local'
    root     19247  0.0  0.8 470956 103712 pts/2   S    00:39   0:00          |       \_ airflow task runner: example_bash_operator this_will_skip 2021-06-24T00:39:06.539715+00:00 92
    root     19057  0.1  0.8 466084 98760 pts/2    S+   00:38   0:00          \_ airflow worker -- LocalExecutor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this', '2021-06-24T00:39:06.539715+00:00', '--loc
    root     19242 26.6  0.8 470824 104404 pts/2   R+   00:39   0:00          |   \_ airflow task supervisor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this', '2021-06-24T00:39:06.539715+00:00', '--local',
    root     19249  0.0  0.8 470824 101976 pts/2   S    00:39   0:00          |       \_ airflow task supervisor: ['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this', '2021-06-24T00:39:06.539715+00:00', '--loc
    root     19062  0.0  0.8 466084 98300 pts/2    S+   00:38   0:00          \_ airflow worker -- LocalExecutor
    root     19066  0.0  0.8 466084 98300 pts/2    S+   00:38   0:00          \_ airflow worker -- LocalExecutor
    root     19069  0.0  0.8 466084 98300 pts/2    S+   00:38   0:00          \_ airflow worker -- LocalExecutor
    root     19075  2.7  0.8 466596 100144 pts/2   S    00:38   0:00          \_ airflow scheduler -- DagFileProcessorManager
    ```
    
    Once the worker is done executing a task, the worker is renamed back to `airflow worker -- LocalExecutor`
    
    (cherry picked from commit c8a628abf484f0bd9805f44dd37e284d2b5ee7db)
---
 airflow/executors/local_executor.py | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index ab9356f..7789f28 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -63,6 +63,7 @@ class LocalWorkerBase(Process, LoggingMixin):
         # We know we've just started a new process, so lets disconnect from the metadata db now
         settings.engine.pool.dispose()
         settings.engine.dispose()
+        setproctitle("airflow worker -- LocalExecutor")
         return super().run()
 
     def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
@@ -76,12 +77,15 @@ class LocalWorkerBase(Process, LoggingMixin):
             return
 
         self.log.info("%s running %s", self.__class__.__name__, command)
+        setproctitle(f"airflow worker -- LocalExecutor: {command}")
         if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
             state = self._execute_work_in_subprocess(command)
         else:
             state = self._execute_work_in_fork(command)
 
         self.result_queue.put((key, state))
+        # Remove the command since the worker is done executing the task
+        setproctitle("airflow worker -- LocalExecutor")
 
     def _execute_work_in_subprocess(self, command: CommandType) -> str:
         try:

[airflow] 38/39: commiting dagPickle session when the airflow tasks run --ship-dag --interactive command is executed FIXES: 15748 (#15890)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 67b4ff0fd88e1e287b89f8cd2a60a4048934499f
Author: vikram Jadhav <vi...@gmail.com>
AuthorDate: Thu Jun 24 22:57:20 2021 +0530

    commiting dagPickle session when the airflow tasks run --ship-dag --interactive command is executed FIXES: 15748 (#15890)
    
    (cherry picked from commit 86d0a96bf796fd767cf50a7224be060efa402d94)
---
 airflow/cli/commands/task_command.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py
index 0302d4d..6dd36db 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -80,9 +80,9 @@ def _run_task_by_executor(args, dag, ti):
             with create_session() as session:
                 pickle = DagPickle(dag)
                 session.add(pickle)
-                pickle_id = pickle.id
-                # TODO: This should be written to a log
-                print(f'Pickled dag {dag} as pickle_id: {pickle_id}')
+            pickle_id = pickle.id
+            # TODO: This should be written to a log
+            print(f'Pickled dag {dag} as pickle_id: {pickle_id}')
         except Exception as e:
             print('Could not pickle the DAG')
             print(e)

[airflow] 09/39: Updates to FlaskAppBuilder 3.3.2+ (#17208)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ac832a2f8446d5a554af2b9833453d8a8bbcb195
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Wed Jul 28 21:37:48 2021 +0200

    Updates to FlaskAppBuilder 3.3.2+ (#17208)
    
    There are some clarifications about using the authentication
    via FlaskAppBuilder - the change implements minimum version of the
    FAB to 3.3.2 and clarifies the dependencies used in FAB 3 series
    to be only authlib rather than flask-oauth.
    
    Fixes: #16944 (this is the second, proper fix this time).
    (cherry picked from commit 6d7fa874ff201af7f602be9c58a827998814bdd1)
---
 setup.cfg |  2 +-
 setup.py  | 10 ++++------
 2 files changed, 5 insertions(+), 7 deletions(-)

diff --git a/setup.cfg b/setup.cfg
index fbe58cb..d3c5f57 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -100,7 +100,7 @@ install_requires =
     #      https://github.com/readthedocs/sphinx_rtd_theme/issues/1115
     docutils<0.17
     flask>=1.1.0, <2.0
-    flask-appbuilder~=3.3
+    flask-appbuilder>=3.3.2, <4.0.0
     flask-caching>=1.5.0, <2.0.0
     flask-login>=0.3, <0.5
     flask-wtf>=0.14.3, <0.15
diff --git a/setup.py b/setup.py
index 46ff73d..c74808a 100644
--- a/setup.py
+++ b/setup.py
@@ -270,10 +270,8 @@ exasol = [
 facebook = [
     'facebook-business>=6.0.2',
 ]
-flask_oauth = [
-    'Flask-OAuthlib>=0.9.1,<0.9.6',  # Flask OAuthLib 0.9.6 requires Flask-Login 0.5.0 - breaks FAB
-    'oauthlib!=2.0.3,!=2.0.4,!=2.0.5,<3.0.0,>=1.1.2',
-    'requests-oauthlib<1.2.0',
+flask_appbuilder_authlib = [
+    'authlib',
 ]
 google = [
     'PyOpenSSL',
@@ -622,8 +620,8 @@ CORE_EXTRAS_REQUIREMENTS: Dict[str, List[str]] = {
     'cncf.kubernetes': kubernetes,  # also has provider, but it extends the core with the KubernetesExecutor
     'dask': dask,
     'deprecated_api': deprecated_api,
-    'github_enterprise': flask_oauth,
-    'google_auth': flask_oauth,
+    'github_enterprise': flask_appbuilder_authlib,
+    'google_auth': flask_appbuilder_authlib,
     'kerberos': kerberos,
     'ldap': ldap,
     'leveldb': leveldb,

[airflow] 24/39: #16976 Add json.dumps() for templated fields objects: 'dict' and 'list' (#17082)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 66897eb5a0c497ed552f482fb133ffdb12272594
Author: Vladimir Aranovsky <66...@users.noreply.github.com>
AuthorDate: Mon Jul 19 22:22:42 2021 +0300

    #16976 Add json.dumps() for templated fields objects: 'dict' and 'list' (#17082)
    
    (cherry picked from commit 1a0730a08f2d72cd71447b6d6549ec10d266dd6a)
---
 airflow/www/views.py | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index 740a767..1e578d4 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -972,6 +972,8 @@ class Airflow(AirflowBaseView):
             content = getattr(task, template_field)
             renderer = task.template_fields_renderers.get(template_field, template_field)
             if renderer in renderers:
+                if isinstance(content, (dict, list)):
+                    content = json.dumps(content, sort_keys=True, indent=4)
                 html_dict[template_field] = renderers[renderer](content)
             else:
                 html_dict[template_field] = Markup("<pre><code>{}</pre></code>").format(pformat(content))

[airflow] 04/39: Bump Jinja2 upper-bound from 2.12.0 to 4.0.0 (#16595)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c967d29671b854fdad0d8eba11a807bc3bf3ce3c
Author: Ashwin Madavan <as...@gmail.com>
AuthorDate: Thu Jun 24 15:07:23 2021 -0400

    Bump Jinja2 upper-bound from 2.12.0 to 4.0.0 (#16595)
    
    (cherry picked from commit 5d5268f5e553a7031ebfb08754c31fca5c13bda7)
---
 setup.cfg | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/setup.cfg b/setup.cfg
index 0e4868f..fbe58cb 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -114,7 +114,7 @@ install_requires =
     iso8601>=0.1.12
     # Logging is broken with itsdangerous > 2
     itsdangerous>=1.1.0, <2.0
-    jinja2>=2.10.1, <2.12.0
+    jinja2>=2.10.1,<4
     jsonschema~=3.0
     lazy-object-proxy
     lockfile>=0.12.2

[airflow] 17/39: Improve diagnostics message when users have secret_key misconfigured (#17410)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 40f43886208043aa2b41e443c950ea889e8e03bd
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Wed Aug 4 15:15:38 2021 +0200

    Improve diagnostics message when users have secret_key misconfigured (#17410)
    
    * Improve diagnostics message when users have secret_key misconfigured
    
    Recently fixed log open-access vulnerability have caused
    quite a lot of questions and issues from the affected users who
    did not have webserver/secret_key configured for their workers
    (effectively leading to random value for those keys for workers)
    
    This PR explicitly explains the possible reason for the problem and
    encourages the user to configure their webserver's secret_key
    in both - workers and webserver.
    
    Related to: #17251 and a number of similar slack discussions.
    
    (cherry picked from commit 2321020e29511f3741940440739e4cc01c0a7ba2)
---
 airflow/utils/log/file_task_handler.py | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 2dc9beb..56b9d23 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -186,6 +186,11 @@ class FileTaskHandler(logging.Handler):
                 )
                 response.encoding = "utf-8"
 
+                if response.status_code == 403:
+                    log += "*** !!!! Please make sure that all your webservers and workers have" \
+                           " the same 'secret_key' configured in 'webserver' section !!!!!\n***"
+                    log += "*** See more at https://airflow.apache.org/docs/apache-airflow/" \
+                           "stable/configurations-ref.html#secret-key\n***"
                 # Check if the resource was properly fetched
                 response.raise_for_status()
 

[airflow] 37/39: fix: change graph focus to top of view instead of center (#16484)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f479effded98911f4a5cf53788cd540583813688
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Wed Jun 16 13:20:19 2021 -0500

    fix: change graph focus to top of view instead of center (#16484)
    
    (cherry picked from commit f1675853a5ed9b779ee2fc13bb9aa97185472bc7)
---
 airflow/www/static/js/graph.js | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/airflow/www/static/js/graph.js b/airflow/www/static/js/graph.js
index 6482d91..5d8ab8e 100644
--- a/airflow/www/static/js/graph.js
+++ b/airflow/www/static/js/graph.js
@@ -553,7 +553,6 @@ function focusedGroupKey() {
 function focusGroup(nodeId) {
   if (nodeId != null && zoom != null) {
     const { x } = g.node(nodeId);
-    const { y } = g.node(nodeId);
     // This is the total canvas size.
     const { width, height } = svg.node().getBoundingClientRect();
 
@@ -572,7 +571,8 @@ function focusGroup(nodeId) {
       1.5, // cap zoom level to 1.5 so nodes are not too large
     ) * 0.9;
 
-    const [deltaX, deltaY] = [width / 2 - x * scale, height / 2 - y * scale];
+    // deltaY of 5 keeps the zoom at the top of the view but with a slight margin
+    const [deltaX, deltaY] = [width / 2 - x * scale, 5];
     zoom.translate([deltaX, deltaY]);
     zoom.scale(scale);
     zoom.event(innerSvg.transition().duration(duration));

[airflow] 27/39: Fixed task instance retrieval in XCom view (#16923)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 36f0ecbb4c907b31b506e9750a615a53eabddf01
Author: Pietro <54...@users.noreply.github.com>
AuthorDate: Sun Jul 11 16:54:34 2021 +0200

    Fixed task instance retrieval in XCom view (#16923)
    
    The SQL logical "and" was not implemented correctly and hence the filter was returning wrong results
    
    (cherry picked from commit 9a1fc19a627662122dd8dacee2ef61c3edf97b85)
---
 airflow/www/views.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index 1e578d4..09d27e0 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1328,7 +1328,7 @@ class Airflow(AirflowBaseView):
         dm_db = models.DagModel
         ti_db = models.TaskInstance
         dag = session.query(dm_db).filter(dm_db.dag_id == dag_id).first()
-        ti = session.query(ti_db).filter(ti_db.dag_id == dag_id and ti_db.task_id == task_id).first()
+        ti = session.query(ti_db).filter(and_(ti_db.dag_id == dag_id, ti_db.task_id == task_id)).first()
 
         if not ti:
             flash(f"Task [{dag_id}.{task_id}] doesn't seem to exist at the moment", "error")

[airflow] 29/39: Don't check execution_date in refresh_from_db (#16809)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3750e5edb36523b62a0993cf803e261ddcd5f056
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Wed Jul 7 19:04:15 2021 +0800

    Don't check execution_date in refresh_from_db (#16809)
    
    The native sqlalchemy DateTime type does not compare well when timezones
    don't match. This can happen if the current execution_date on a DagRun
    instance is not in UTC (the db entry is always in UTC).
    
    Since DagRun has a unique constraint on (dag_id, run_id), these two
    should be able to return one unique result, and the executrion_date
    column should not be needed anyway. Let's just remove that filter to
    prevent all the datetime comparison trouble.
    
    (cherry picked from commit faffaec73385db3c6910d31ccea9fc4f9f3f9d42)
---
 airflow/models/dagrun.py | 29 ++---------------------------
 1 file changed, 2 insertions(+), 27 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 3bd86ee..5b8ac0c 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -18,19 +18,7 @@
 from datetime import datetime
 from typing import TYPE_CHECKING, Any, Iterable, List, NamedTuple, Optional, Tuple, Union
 
-from sqlalchemy import (
-    Boolean,
-    Column,
-    DateTime,
-    Index,
-    Integer,
-    PickleType,
-    String,
-    UniqueConstraint,
-    and_,
-    func,
-    or_,
-)
+from sqlalchemy import Boolean, Column, Index, Integer, PickleType, String, UniqueConstraint, and_, func, or_
 from sqlalchemy.exc import IntegrityError
 from sqlalchemy.ext.declarative import declared_attr
 from sqlalchemy.orm import backref, relationship, synonym
@@ -165,20 +153,7 @@ class DagRun(Base, LoggingMixin):
         :param session: database session
         :type session: Session
         """
-        DR = DagRun
-
-        exec_date = func.cast(self.execution_date, DateTime)
-
-        dr = (
-            session.query(DR)
-            .filter(
-                DR.dag_id == self.dag_id,
-                func.cast(DR.execution_date, DateTime) == exec_date,
-                DR.run_id == self.run_id,
-            )
-            .one()
-        )
-
+        dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id, DagRun.run_id == self.run_id).one()
         self.id = dr.id
         self.state = dr.state
 

[airflow] 14/39: Validate type of `priority_weight` during parsing (#16765)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4678cf54b5cd651e0232a42746f9be80db43a609
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Jul 2 01:52:50 2021 +0100

    Validate type of `priority_weight` during parsing (#16765)
    
    closes https://github.com/apache/airflow/issues/16762
    
    Without this the scheduler crashes as validation does not happen at DAG Parsing time.
    
    (cherry picked from commit 9d170279a60d9d4ed513bae1c35999926f042662)
---
 airflow/models/baseoperator.py    | 7 ++++++-
 tests/models/test_baseoperator.py | 5 +++++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 10e8bfd..1fec8cf 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -586,10 +586,15 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
             if isinstance(max_retry_delay, timedelta):
                 self.max_retry_delay = max_retry_delay
             else:
-                self.log.debug("Max_retry_delay isn't timedelta object, assuming secs")
+                self.log.debug("max_retry_delay isn't a timedelta object, assuming secs")
                 self.max_retry_delay = timedelta(seconds=max_retry_delay)
 
         self.params = params or {}  # Available in templates!
+        if priority_weight is not None and not isinstance(priority_weight, int):
+            raise AirflowException(
+                f"`priority_weight` for task '{self.task_id}' only accepts integers, "
+                f"received '{type(priority_weight)}'."
+            )
         self.priority_weight = priority_weight
         if not WeightRule.is_valid(weight_rule):
             raise AirflowException(
diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py
index fa02b4e..04d3f54 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -109,6 +109,11 @@ class TestBaseOperator(unittest.TestCase):
         with pytest.raises(AirflowException, match='Argument.*test_param.*required'):
             DummyClass(default_args=default_args)
 
+    def test_incorrect_priority_weight(self):
+        error_msg = "`priority_weight` for task 'test_op' only accepts integers, received '<class 'str'>'."
+        with pytest.raises(AirflowException, match=error_msg):
+            DummyOperator(task_id="test_op", priority_weight="2")
+
     @parameterized.expand(
         [
             ("{{ foo }}", {"foo": "bar"}, "bar"),

[airflow] 10/39: fix(smart_sensor): Unbound variable errors (#14774)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 673d78a6cc78038121d3f5e99caa6ded488d654a
Author: Shivansh Saini <sh...@gmail.com>
AuthorDate: Thu Jun 24 03:52:27 2021 +0530

    fix(smart_sensor): Unbound variable errors (#14774)
    
    Signed-off-by: Shivansh Saini <sh...@gmail.com>
    
    Closes #14770
    
    (cherry picked from commit 4aec25a80e3803238cf658c416c8e6d3975a30f6)
---
 airflow/sensors/smart_sensor.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/airflow/sensors/smart_sensor.py b/airflow/sensors/smart_sensor.py
index c8c5ba7..8755eb5 100644
--- a/airflow/sensors/smart_sensor.py
+++ b/airflow/sensors/smart_sensor.py
@@ -435,6 +435,7 @@ class SmartSensorOperator(BaseOperator, SkipMixin):
         TI = TaskInstance
 
         count_marked = 0
+        query_result = []
         try:
             query_result = (
                 session.query(TI, SI)

[airflow] 35/39: Fix ``AttributeError``: ``datetime.timezone`` object has no attribute ``name`` (#16599)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 88fd2db56175c770d2d76cc889d9cadd9022b120
Author: Ruben Laguna <ru...@gmail.com>
AuthorDate: Wed Jun 23 16:31:45 2021 +0200

    Fix ``AttributeError``: ``datetime.timezone`` object has no attribute ``name`` (#16599)
    
    closes: #16551
    
    Previous implementation tried to force / coerce the provided timezone (from the dag's `start_date`) into a `pendulum.tz.timezone.*` that only worked if the provided timezone was already a pendulum's timezone and it specifically failed when with `datetime.timezone.utc` as timezone.
    
    (cherry picked from commit 86c20910aed48f7d5b2ebaa91fa40d47c52d7db3)
---
 airflow/models/dag.py                         |  4 +--
 tests/models/test_dag.py                      | 40 +++++++++++++++++++++++++++
 tests/serialization/test_dag_serialization.py |  2 ++
 3 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 1861eda..13d69c2 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -485,7 +485,7 @@ class DAG(LoggingMixin):
             else:
                 # absolute (e.g. 3 AM)
                 naive = cron.get_next(datetime)
-                tz = pendulum.timezone(self.timezone.name)
+                tz = self.timezone
                 following = timezone.make_aware(naive, tz)
             return timezone.convert_to_utc(following)
         elif self.normalized_schedule_interval is not None:
@@ -513,7 +513,7 @@ class DAG(LoggingMixin):
             else:
                 # absolute (e.g. 3 AM)
                 naive = cron.get_prev(datetime)
-                tz = pendulum.timezone(self.timezone.name)
+                tz = self.timezone
                 previous = timezone.make_aware(naive, tz)
             return timezone.convert_to_utc(previous)
         elif self.normalized_schedule_interval is not None:
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 939baf4..34d761d 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -621,6 +621,46 @@ class TestDag(unittest.TestCase):
         _next = dag.following_schedule(_next)
         assert _next.isoformat() == "2015-01-02T02:00:00+00:00"
 
+    def test_previous_schedule_datetime_timezone(self):
+        # Check that we don't get an AttributeError 'name' for self.timezone
+
+        start = datetime.datetime(2018, 3, 25, 2, tzinfo=datetime.timezone.utc)
+        dag = DAG('tz_dag', start_date=start, schedule_interval='@hourly')
+        when = dag.previous_schedule(start)
+        assert when.isoformat() == "2018-03-25T01:00:00+00:00"
+
+    def test_following_schedule_datetime_timezone(self):
+        # Check that we don't get an AttributeError 'name' for self.timezone
+
+        start = datetime.datetime(2018, 3, 25, 2, tzinfo=datetime.timezone.utc)
+        dag = DAG('tz_dag', start_date=start, schedule_interval='@hourly')
+        when = dag.following_schedule(start)
+        assert when.isoformat() == "2018-03-25T03:00:00+00:00"
+
+    def test_following_schedule_datetime_timezone_utc0530(self):
+        # Check that we don't get an AttributeError 'name' for self.timezone
+        class UTC0530(datetime.tzinfo):
+            """tzinfo derived concrete class named "+0530" with offset of 19800"""
+
+            # can be configured here
+            _offset = datetime.timedelta(seconds=19800)
+            _dst = datetime.timedelta(0)
+            _name = "+0530"
+
+            def utcoffset(self, dt):
+                return self.__class__._offset
+
+            def dst(self, dt):
+                return self.__class__._dst
+
+            def tzname(self, dt):
+                return self.__class__._name
+
+        start = datetime.datetime(2018, 3, 25, 10, tzinfo=UTC0530())
+        dag = DAG('tz_dag', start_date=start, schedule_interval='@hourly')
+        when = dag.following_schedule(start)
+        assert when.isoformat() == "2018-03-25T05:30:00+00:00"
+
     def test_dagtag_repr(self):
         clear_db_dags()
         dag = DAG('dag-test-dagtag', start_date=DEFAULT_DATE, tags=['tag-1', 'tag-2'])
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 05edfda..7b0b476 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -27,6 +27,7 @@ from datetime import datetime, timedelta, timezone
 from glob import glob
 from unittest import mock
 
+import pendulum
 import pytest
 from dateutil.relativedelta import FR, relativedelta
 from kubernetes.client import models as k8s
@@ -444,6 +445,7 @@ class TestStringifiedDAGs(unittest.TestCase):
                 datetime(2019, 7, 30, tzinfo=timezone.utc),
                 datetime(2019, 8, 1, tzinfo=timezone.utc),
             ),
+            (pendulum.datetime(2019, 8, 1, tz='UTC'), None, pendulum.datetime(2019, 8, 1, tz='UTC')),
         ]
     )
     def test_deserialization_start_date(self, dag_start_date, task_start_date, expected_task_start_date):

[airflow] 25/39: Core: Enable the use of __init_subclass__ in subclasses of BaseOperator (#17027)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 120cbb9072da993e3f51350097a255ac7e15b2c4
Author: Henry Zhang <me...@henry.dev>
AuthorDate: Thu Jul 22 15:23:51 2021 -0700

    Core: Enable the use of __init_subclass__ in subclasses of BaseOperator (#17027)
    
    This fixes a regression in 2.1 where subclasses of BaseOperator could no
    longer use `__init_subclass__` to allow class instantiation time
    customization.
    
    Related BPO: https://bugs.python.org/issue29581
    Fixes: https://github.com/apache/airflow/issues/17014
    
    (cherry picked from commit 901513203f287d4f8152f028e9070a2dec73ad74)
---
 airflow/models/baseoperator.py    |  4 ++--
 tests/models/test_baseoperator.py | 26 ++++++++++++++++++++++++++
 2 files changed, 28 insertions(+), 2 deletions(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 1fec8cf..360f264 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -186,8 +186,8 @@ class BaseOperatorMeta(abc.ABCMeta):
 
         return cast(T, apply_defaults)
 
-    def __new__(cls, name, bases, namespace):
-        new_cls = super().__new__(cls, name, bases, namespace)
+    def __new__(cls, name, bases, namespace, **kwargs):
+        new_cls = super().__new__(cls, name, bases, namespace, **kwargs)
         new_cls.__init__ = cls._apply_defaults(new_cls.__init__)
         return new_cls
 
diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py
index 04d3f54..87a9247 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -516,3 +516,29 @@ class TestXComArgsRelationsAreResolved:
         with pytest.raises(AirflowException):
             op1 = DummyOperator(task_id="op1")
             CustomOp(task_id="op2", field=op1.output)
+
+
+class InitSubclassOp(DummyOperator):
+    def __init_subclass__(cls, class_arg=None, **kwargs) -> None:
+        cls._class_arg = class_arg
+        super().__init_subclass__(**kwargs)
+
+    def execute(self, context):
+        self.context_arg = context
+
+
+class TestInitSubclassOperator:
+    def test_init_subclass_args(self):
+        class_arg = "foo"
+        context = {"key": "value"}
+
+        class ConcreteSubclassOp(InitSubclassOp, class_arg=class_arg):
+            pass
+
+        task = ConcreteSubclassOp(task_id="op1")
+        task_copy = task.prepare_for_execution()
+
+        task_copy.execute(context)
+
+        assert task_copy._class_arg == class_arg
+        assert task_copy.context_arg == context

[airflow] 23/39: Adds more explanatory message when SecretsMasker is not configured (#17101)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit aaa9a4b3ebece41c0c665d7200ce451d4b642833
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Tue Jul 20 10:51:59 2021 +0200

    Adds more explanatory message when SecretsMasker is not configured (#17101)
    
    The secrets masker added in 2.1.0 introduced requirement that
    at least one SecretsMasker needs to be configured per task.
    
    However this introduced problems for several users who migrated
    from Airflow 1.10 and had their custom logging configuration
    done without first copying the base airflow configuration.
    
    The message about missing SecretsMasker was pretty cryptic for the
    users. This PR changes the message to be much more descriptive
    and pointing the user to the right place in documentation
    explaining how advanced logging configuration should be done.
    
    (cherry picked from commit 3a2e162387b5d73f1badda8fcf027fbc2caa0f28)
---
 airflow/utils/log/secrets_masker.py | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/airflow/utils/log/secrets_masker.py b/airflow/utils/log/secrets_masker.py
index 1e8ad16..0ce0424 100644
--- a/airflow/utils/log/secrets_masker.py
+++ b/airflow/utils/log/secrets_masker.py
@@ -102,7 +102,12 @@ def _secrets_masker() -> "SecretsMasker":
     for flt in logging.getLogger('airflow.task').filters:
         if isinstance(flt, SecretsMasker):
             return flt
-    raise RuntimeError("No SecretsMasker found!")
+    raise RuntimeError(
+        "Logging Configuration Error! No SecretsMasker found! If you have custom logging, please make "
+        "sure you configure it taking airflow configuration as a base as explained at "
+        "https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/logging-tasks.html"
+        "#advanced-configuration"
+    )
 
 
 class SecretsMasker(logging.Filter):

[airflow] 22/39: Fix: ``TaskInstance`` does not show ``queued_by_job_id`` & ``external_executor_id`` (#17179)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1b4af0baf752b54497caeff964633421d69cf426
Author: Jorrick Sleijster <jo...@gmail.com>
AuthorDate: Fri Jul 23 00:25:08 2021 +0200

    Fix: ``TaskInstance`` does not show ``queued_by_job_id`` & ``external_executor_id`` (#17179)
    
    **Problem discovery:**
    I was debugging a bug with the `external_executor_id` Airflow after which this UI bug caught my eye and I got annoyed by it. I figured to fix this one first so my other testing can go a bit smoother :)
    
    **Description of the problem:**
    Currently there is a BUG inside the Task Instance details (/task) view.
    It loads the TaskInstance by calling `TI(task, execution_date)` and then uses `refresh_from_db()` to refresh many fields that are no filled in yet.
    However, the assumption is made in that case that it refreshes all values, which it does not.
    `external_executor_id` and `queued_by_job_id` are not updated at all and `executor_config` is only instantiated by the original `TI(task, execution_date)` call but also not updated in `refresh_from_db()`.
    This also shows in the UI where these values are always showing None, while the TaskInstance view shows you these values are not None.
    
    **The changes in the PR:**
    1. Changes to the `update_from_db()` method to include the missing three values.
    2. A new test that checks we are really updating ALL values in `update_from_db()`
    3. Removal of an incorrect comment as we do need the `execution_date` for that view.
    
    (cherry picked from commit 759c76d7a5d23cc6f6ef4f724a1a322d2445bbd2)
---
 airflow/models/taskinstance.py    |  3 +++
 airflow/www/views.py              |  2 --
 tests/models/test_taskinstance.py | 54 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 57 insertions(+), 2 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 46ea7f9..ea5a72e 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -644,7 +644,10 @@ class TaskInstance(Base, LoggingMixin):
             self.priority_weight = ti.priority_weight
             self.operator = ti.operator
             self.queued_dttm = ti.queued_dttm
+            self.queued_by_job_id = ti.queued_by_job_id
             self.pid = ti.pid
+            self.executor_config = ti.executor_config
+            self.external_executor_id = ti.external_executor_id
         else:
             self.state = None
 
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 590de7d..740a767 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1229,8 +1229,6 @@ class Airflow(AirflowBaseView):
         """Retrieve task."""
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
-        # Carrying execution_date through, even though it's irrelevant for
-        # this context
         execution_date = request.args.get('execution_date')
         dttm = timezone.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 4b84d10..021809b 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2078,6 +2078,60 @@ class TestTaskInstance(unittest.TestCase):
         assert ti.start_date < ti.end_date
         assert ti.duration > 0
 
+    def test_refresh_from_db(self):
+        run_date = timezone.utcnow()
+
+        expected_values = {
+            "task_id": "test_refresh_from_db_task",
+            "dag_id": "test_refresh_from_db_dag",
+            "execution_date": run_date,
+            "start_date": run_date + datetime.timedelta(days=1),
+            "end_date": run_date + datetime.timedelta(days=1, seconds=1, milliseconds=234),
+            "duration": 1.234,
+            "state": State.SUCCESS,
+            "_try_number": 1,
+            "max_tries": 1,
+            "hostname": "some_unique_hostname",
+            "unixname": "some_unique_unixname",
+            "job_id": 1234,
+            "pool": "some_fake_pool_id",
+            "pool_slots": 25,
+            "queue": "some_queue_id",
+            "priority_weight": 123,
+            "operator": "some_custom_operator",
+            "queued_dttm": run_date + datetime.timedelta(hours=1),
+            "queued_by_job_id": 321,
+            "pid": 123,
+            "executor_config": {"Some": {"extra": "information"}},
+            "external_executor_id": "some_executor_id",
+        }
+        # Make sure we aren't missing any new value in our expected_values list.
+        expected_keys = {f"task_instance.{key.lstrip('_')}" for key in expected_values.keys()}
+        assert {str(c) for c in TI.__table__.columns} == expected_keys, (
+            "Please add all non-foreign values of TaskInstance to this list. "
+            "This prevents refresh_from_db() from missing a field."
+        )
+
+        operator = DummyOperator(task_id=expected_values['task_id'])
+        ti = TI(task=operator, execution_date=expected_values['execution_date'])
+        for key, expected_value in expected_values.items():
+            setattr(ti, key, expected_value)
+        with create_session() as session:
+            session.merge(ti)
+            session.commit()
+
+        mock_task = mock.MagicMock()
+        mock_task.task_id = expected_values["task_id"]
+        mock_task.dag_id = expected_values["dag_id"]
+
+        ti = TI(task=mock_task, execution_date=run_date)
+        ti.refresh_from_db()
+        for key, expected_value in expected_values.items():
+            assert hasattr(ti, key), f"Key {key} is missing in the TaskInstance."
+            assert (
+                getattr(ti, key) == expected_value
+            ), f"Key: {key} had different values. Make sure it loads it in the refresh refresh_from_db()"
+
 
 @pytest.mark.parametrize("pool_override", [None, "test_pool2"])
 def test_refresh_from_task(pool_override):

[airflow] 13/39: Fix calculating duration in tree view (#16695)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b558205e817f168df7e5c37c9f006a6af31d576e
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Mon Jun 28 11:23:19 2021 -0400

    Fix calculating duration in tree view (#16695)
    
    Make sure moment doesn't default the end_date to now and show the wrong duration
    
    (cherry picked from commit f0b3345ddc489627d73d190a1401804e7b0d9c4e)
---
 airflow/www/static/js/tree.js | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/airflow/www/static/js/tree.js b/airflow/www/static/js/tree.js
index 07daf0e..4bf366a 100644
--- a/airflow/www/static/js/tree.js
+++ b/airflow/www/static/js/tree.js
@@ -305,7 +305,11 @@ document.addEventListener('DOMContentLoaded', () => {
       .style('stroke-opacity', (d) => (d.external_trigger ? '0' : '1'))
       .on('mouseover', function (d) {
         // Calculate duration if it doesn't exist
-        const tt = tiTooltip({ ...d, duration: d.duration || moment(d.end_date).diff(d.start_date, 'seconds') });
+        const tt = tiTooltip({
+          ...d,
+          // if end_date is undefined then moment will default to now instead of null
+          duration: d.duration || d.end_date ? moment(d.end_date).diff(d.start_date, 'seconds') : null,
+        });
         taskTip.direction('n');
         taskTip.show(tt, this);
         d3.select(this).transition().duration(duration)

[airflow] 28/39: Fix minor typo in configuration.py (#16832)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 77ce4a2a6f49bac433c71c92e7a5caa64aaea6dc
Author: sanjayp <sa...@gmail.com>
AuthorDate: Thu Jul 8 19:45:20 2021 +0530

    Fix minor typo in configuration.py (#16832)
    
    (cherry picked from commit 2b10680a2f5188400459089713876c1ce49f1caf)
---
 airflow/configuration.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/configuration.py b/airflow/configuration.py
index 23cfb23..452f127 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -454,7 +454,7 @@ class AirflowConfigParser(ConfigParser):
         """
         Reads options, imports the full qualified name, and returns the object.
 
-        In case of failure, it throws an exception a clear message with the key aad the section names
+        In case of failure, it throws an exception with the key and section names
 
         :return: The object or None, if the option is empty
         """

[airflow] 02/39: Switch back http provider after requests removes LGPL dependency (#16974)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 88e6305c92a3e96134589f3bfb29b3ff9803fafd
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Tue Jul 13 22:13:30 2021 +0200

    Switch back http provider after requests removes LGPL dependency (#16974)
    
    Following merging the https://github.com/psf/requests/pull/5797
    and requests 2.26.0 release without LGPL chardet dependency,
    we can now bring back http as pre-installed provider as it does
    not bring chardet automatically any more.
    
    (cherry picked from commit c46e841519ef2df7dc40ff2596dd49c010514d87)
---
 docs/apache-airflow/extra-packages-ref.rst |  2 +-
 setup.py                                   | 13 ++++++-------
 2 files changed, 7 insertions(+), 8 deletions(-)

diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst
index b4b4bb4..b1dff07 100644
--- a/docs/apache-airflow/extra-packages-ref.rst
+++ b/docs/apache-airflow/extra-packages-ref.rst
@@ -258,7 +258,7 @@ Those are extras that provide support for integration with external systems via
 +---------------------+-----------------------------------------------------+--------------------------------------+--------------+
 | grpc                | ``pip install 'apache-airflow[grpc]'``              | Grpc hooks and operators             |              |
 +---------------------+-----------------------------------------------------+--------------------------------------+--------------+
-| http                | ``pip install 'apache-airflow[http]'``              | HTTP hooks, operators and sensors    |              |
+| http                | ``pip install 'apache-airflow[http]'``              | HTTP hooks, operators and sensors    |      *       |
 +---------------------+-----------------------------------------------------+--------------------------------------+--------------+
 | imap                | ``pip install 'apache-airflow[imap]'``              | IMAP hooks and sensors               |      *       |
 +---------------------+-----------------------------------------------------+--------------------------------------+--------------+
diff --git a/setup.py b/setup.py
index 5d6f752..21097e6 100644
--- a/setup.py
+++ b/setup.py
@@ -231,13 +231,13 @@ dask = [
     'distributed>=2.11.1, <2.20',
 ]
 databricks = [
-    'requests>=2.20.0, <3',
+    'requests>=2.26.0, <3',
 ]
 datadog = [
     'datadog>=0.14.0',
 ]
 deprecated_api = [
-    'requests>=2.20.0',
+    'requests>=2.26.0',
 ]
 doc = [
     # Sphinx is limited to < 3.5.0 because of https://github.com/sphinx-doc/sphinx/issues/8880
@@ -330,7 +330,9 @@ hive = [
     'thrift>=0.9.2',
 ]
 http = [
-    'requests>=2.20.0',
+    # The 2.26.0 release of requests got rid of the chardet LGPL mandatory dependency, allowing us to
+    # release it as a requirement for airflow
+    'requests>=2.26.0',
 ]
 http_provider = [
     # NOTE ! The HTTP provider is NOT preinstalled by default when Airflow is installed - because it
@@ -810,12 +812,9 @@ EXTRAS_REQUIREMENTS = sort_extras_requirements()
 # Those providers are pre-installed always when airflow is installed.
 # Those providers do not have dependency on airflow2.0 because that would lead to circular dependencies.
 # This is not a problem for PIP but some tools (pipdeptree) show those as a warning.
-# NOTE ! The HTTP provider is NOT preinstalled by default when Airflow is installed - because it
-#        depends on `requests` library and until `chardet` is mandatory dependency of `requests`
-#        we cannot make it mandatory dependency. See https://github.com/psf/requests/pull/5797
 PREINSTALLED_PROVIDERS = [
     'ftp',
-    # 'http',
+    'http',
     'imap',
     'sqlite',
 ]

[airflow] 05/39: Add type annotations to setup.py (#16658)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f3b74d802a1bd61060a302b51b894d178b543e03
Author: Ali Muhammad <al...@gmail.com>
AuthorDate: Fri Jun 25 22:25:52 2021 +0500

    Add type annotations to setup.py (#16658)
    
    (cherry picked from commit 402274641168f412f44c545c34f3e7edf5cf1476)
---
 setup.py | 40 ++++++++++++++++++++++++----------------
 1 file changed, 24 insertions(+), 16 deletions(-)

diff --git a/setup.py b/setup.py
index 21097e6..e6c1d46 100644
--- a/setup.py
+++ b/setup.py
@@ -62,14 +62,14 @@ class CleanCommand(Command):
     description = "Tidy up the project root"
     user_options: List[str] = []
 
-    def initialize_options(self):
+    def initialize_options(self) -> None:
         """Set default values for options."""
 
-    def finalize_options(self):
+    def finalize_options(self) -> None:
         """Set final values for options."""
 
     @staticmethod
-    def rm_all_files(files: List[str]):
+    def rm_all_files(files: List[str]) -> None:
         """Remove all files from the list"""
         for file in files:
             try:
@@ -77,7 +77,7 @@ class CleanCommand(Command):
             except Exception as e:
                 logger.warning("Error when removing %s: %s", file, e)
 
-    def run(self):
+    def run(self) -> None:
         """Remove temporary files and directories."""
         os.chdir(my_dir)
         self.rm_all_files(glob.glob('./build/*'))
@@ -98,10 +98,10 @@ class CompileAssets(Command):
     description = "Compile and build the frontend assets"
     user_options: List[str] = []
 
-    def initialize_options(self):
+    def initialize_options(self) -> None:
         """Set default values for options."""
 
-    def finalize_options(self):
+    def finalize_options(self) -> None:
         """Set final values for options."""
 
     def run(self) -> None:
@@ -118,10 +118,10 @@ class ListExtras(Command):
     description = "List available extras"
     user_options: List[str] = []
 
-    def initialize_options(self):
+    def initialize_options(self) -> None:
         """Set default values for options."""
 
-    def finalize_options(self):
+    def finalize_options(self) -> None:
         """Set final values for options."""
 
     def run(self) -> None:
@@ -165,7 +165,7 @@ def git_version(version_: str) -> str:
     return 'no_git_version'
 
 
-def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version"])):
+def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version"])) -> None:
     """
     Write the Semver version + git hash to file, e.g. ".dev0+2f635dc265e78db6708f59f68e8009abb92c1e65".
 
@@ -766,7 +766,7 @@ PACKAGES_EXCLUDED_FOR_ALL.extend(
 )
 
 
-def is_package_excluded(package: str, exclusion_list: List[str]):
+def is_package_excluded(package: str, exclusion_list: List[str]) -> bool:
     """
     Checks if package should be excluded.
 
@@ -820,7 +820,7 @@ PREINSTALLED_PROVIDERS = [
 ]
 
 
-def get_provider_package_from_package_id(package_id: str):
+def get_provider_package_from_package_id(package_id: str) -> str:
     """
     Builds the name of provider package out of the package id provided/
 
@@ -831,16 +831,18 @@ def get_provider_package_from_package_id(package_id: str):
     return f"apache-airflow-providers-{package_suffix}"
 
 
-def get_excluded_providers():
+def get_excluded_providers() -> List[str]:
     """
     Returns packages excluded for the current python version.
+
     Currently the only excluded provider is apache hive for Python 3.9.
     Until https://github.com/dropbox/PyHive/issues/380 is fixed.
+
     """
     return ['apache.hive'] if PY39 else []
 
 
-def get_all_provider_packages():
+def get_all_provider_packages() -> str:
     """Returns all provider packages configured in setup.py"""
     excluded_providers = get_excluded_providers()
     return " ".join(
@@ -851,7 +853,13 @@ def get_all_provider_packages():
 
 
 class AirflowDistribution(Distribution):
-    """The setuptools.Distribution subclass with Airflow specific behaviour"""
+    """
+    The setuptools.Distribution subclass with Airflow specific behaviour
+
+    The reason for pylint: disable=signature-differs of parse_config_files is explained here:
+    https://github.com/PyCQA/pylint/issues/3737
+
+    """
 
     def parse_config_files(self, *args, **kwargs) -> None:
         """
@@ -949,7 +957,7 @@ def add_all_provider_packages() -> None:
 class Develop(develop_orig):
     """Forces removal of providers in editable mode."""
 
-    def run(self):
+    def run(self) -> None:
         self.announce('Installing in editable mode. Uninstalling provider packages!', level=log.INFO)
         # We need to run "python3 -m pip" because it might be that older PIP binary is in the path
         # And it results with an error when running pip directly (cannot import pip module)
@@ -973,7 +981,7 @@ class Develop(develop_orig):
 class Install(install_orig):
     """Forces installation of providers from sources in editable mode."""
 
-    def run(self):
+    def run(self) -> None:
         self.announce('Standard installation. Providers are installed from packages', level=log.INFO)
         super().run()
 

[airflow] 20/39: Fix running tasks with default_impersonation config (#17229)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4a4959f1a0d5f76ff3d2920176ac2f3b56224521
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Jul 28 16:00:25 2021 +0100

     Fix running tasks with default_impersonation config (#17229)
    
    When default_impersonation is set in the configuration, airflow fails
     to run task due to PID mismatch between the recorded PID and the current PID
    
     This change fixes it by checking if task_runner.run_as_user is True and use the
     same way we check when ti.run_as_user is true to check the PID
    
    (cherry picked from commit 40419dd371c7be53e6c8017b0c4d1bc7f75d0fb6)
---
 airflow/jobs/local_task_job.py    |  5 ++++-
 tests/jobs/test_local_task_job.py | 42 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py
index 06a2f57..af7b317 100644
--- a/airflow/jobs/local_task_job.py
+++ b/airflow/jobs/local_task_job.py
@@ -187,9 +187,12 @@ class LocalTaskJob(BaseJob):
                 )
                 raise AirflowException("Hostname of job runner does not match")
             current_pid = self.task_runner.process.pid
+
             same_process = ti.pid == current_pid
-            if ti.run_as_user:
+
+            if ti.run_as_user or self.task_runner.run_as_user:
                 same_process = psutil.Process(ti.pid).ppid() == current_pid
+
             if ti.pid is not None and not same_process:
                 self.log.warning("Recorded pid %s does not match " "the current pid %s", ti.pid, current_pid)
                 raise AirflowException("PID of job runner does not match")
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 2a50d35..ed43198 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -189,6 +189,48 @@ class TestLocalTaskJob(unittest.TestCase):
         with pytest.raises(AirflowException, match='PID of job runner does not match'):
             job1.heartbeat_callback()
 
+    @conf_vars({('core', 'default_impersonation'): 'testuser'})
+    @mock.patch('airflow.jobs.local_task_job.psutil')
+    def test_localtaskjob_heartbeat_with_default_impersonation(self, psutil_mock, dag_maker):
+        session = settings.Session()
+        with dag_maker('test_localtaskjob_heartbeat'):
+            op1 = DummyOperator(task_id='op1')
+        dr = dag_maker.dag_run
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.RUNNING
+        ti.pid = 2
+        ti.hostname = get_hostname()
+        session.commit()
+
+        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+        ti.task = op1
+        ti.refresh_from_task(op1)
+        job1.task_runner = StandardTaskRunner(job1)
+        job1.task_runner.process = mock.Mock()
+        job1.task_runner.process.pid = 2
+        # Here, ti.pid is 2, the parent process of ti.pid is a mock(different).
+        # And task_runner process is 2. Should fail
+        with pytest.raises(AirflowException, match='PID of job runner does not match'):
+            job1.heartbeat_callback()
+
+        job1.task_runner.process.pid = 1
+        # We make the parent process of ti.pid to equal the task_runner process id
+        psutil_mock.Process.return_value.ppid.return_value = 1
+        ti.state = State.RUNNING
+        ti.pid = 2
+        # The task_runner process id is 1, same as the parent process of ti.pid
+        # as seen above
+        assert job1.task_runner.run_as_user == 'testuser'
+        session.merge(ti)
+        session.commit()
+        job1.heartbeat_callback(session=None)
+
+        # Here the task_runner process id is changed to 2
+        # while parent process of ti.pid is kept at 1, which is different
+        job1.task_runner.process.pid = 2
+        with pytest.raises(AirflowException, match='PID of job runner does not match'):
+            job1.heartbeat_callback()
+
     def test_heartbeat_failed_fast(self):
         """
         Test that task heartbeat will sleep when it fails fast

[airflow] 26/39: Fix UPDATING.md (#16933)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3beb37eb6682a915619a0fc2e2faddb5b27e2216
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Mon Jul 12 01:21:01 2021 +0100

    Fix UPDATING.md (#16933)
    
    (cherry picked from commit 543c31f76066592d356ef1a283888f3507fd968d)
---
 UPDATING.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index 67ec6b8..2361b6c 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -1607,9 +1607,9 @@ https://cloud.google.com/compute/docs/disks/performance
 
 Hence, the default value for `master_disk_size` in `DataprocCreateClusterOperator` has been changed from 500GB to 1TB.
 
-#### `<airflow class="providers google c"></airflow>loud.operators.bigquery.BigQueryGetDatasetTablesOperator`
+#### `airflow.providers.google.cloud.operators.bigquery.BigQueryGetDatasetTablesOperator`
 
-We changed signature of BigQueryGetDatasetTablesOperator.
+We changed signature of `BigQueryGetDatasetTablesOperator`.
 
 Before:
 

[airflow] 08/39: Update alias for field_mask in Google Memmcache (#16975)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f14860d6a7aefe8f98bf6eeba648c487b9cee2cf
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Tue Jul 13 20:54:38 2021 +0200

    Update alias for field_mask in Google Memmcache (#16975)
    
    The July 12 2021 release of google-memcache library removed
    field_mask alias from the library which broke our typechecking
    and made google provider unimportable. This PR fixes the import
    to use the actual import.
    
    (cherry picked from commit a3f5c93806258b5ad396a638ba0169eca7f9d065)
---
 .../providers/google/cloud/hooks/cloud_memorystore.py    | 16 ++++++++--------
 setup.py                                                 |  4 +++-
 2 files changed, 11 insertions(+), 9 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/cloud_memorystore.py b/airflow/providers/google/cloud/hooks/cloud_memorystore.py
index caf1cd6..8f4165b 100644
--- a/airflow/providers/google/cloud/hooks/cloud_memorystore.py
+++ b/airflow/providers/google/cloud/hooks/cloud_memorystore.py
@@ -487,8 +487,8 @@ class CloudMemorystoreHook(GoogleBaseHook):
             -  ``redisConfig``
 
             If a dict is provided, it must be of the same form as the protobuf message
-            :class:`~google.cloud.redis_v1.types.FieldMask`
-        :type update_mask: Union[Dict, google.cloud.redis_v1.types.FieldMask]
+            :class:`~google.protobuf.field_mask_pb2.FieldMask`
+        :type update_mask: Union[Dict, google.protobuf.field_mask_pb2.FieldMask]
         :param instance: Required. Update description. Only fields specified in ``update_mask`` are updated.
 
             If a dict is provided, it must be of the same form as the protobuf message
@@ -871,7 +871,7 @@ class CloudMemorystoreMemcachedHook(GoogleBaseHook):
     @GoogleBaseHook.fallback_to_default_project_id
     def update_instance(
         self,
-        update_mask: Union[Dict, cloud_memcache.field_mask.FieldMask],
+        update_mask: Union[Dict, FieldMask],
         instance: Union[Dict, cloud_memcache.Instance],
         project_id: str,
         location: Optional[str] = None,
@@ -889,9 +889,9 @@ class CloudMemorystoreMemcachedHook(GoogleBaseHook):
             -  ``displayName``
 
             If a dict is provided, it must be of the same form as the protobuf message
-            :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask`
+            :class:`~google.protobuf.field_mask_pb2.FieldMask`)
         :type update_mask:
-            Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask]
+            Union[Dict, google.protobuf.field_mask_pb2.FieldMask]
         :param instance: Required. Update description. Only fields specified in ``update_mask`` are updated.
 
             If a dict is provided, it must be of the same form as the protobuf message
@@ -935,7 +935,7 @@ class CloudMemorystoreMemcachedHook(GoogleBaseHook):
     @GoogleBaseHook.fallback_to_default_project_id
     def update_parameters(
         self,
-        update_mask: Union[Dict, cloud_memcache.field_mask.FieldMask],
+        update_mask: Union[Dict, FieldMask],
         parameters: Union[Dict, cloud_memcache.MemcacheParameters],
         project_id: str,
         location: str,
@@ -951,9 +951,9 @@ class CloudMemorystoreMemcachedHook(GoogleBaseHook):
 
         :param update_mask: Required. Mask of fields to update.
             If a dict is provided, it must be of the same form as the protobuf message
-            :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask`
+            :class:`~google.protobuf.field_mask_pb2.FieldMask`
         :type update_mask:
-            Union[Dict, google.cloud.memcache_v1beta2.types.cloud_memcache.field_mask.FieldMask]
+            Union[Dict, google.protobuf.field_mask_pb2.FieldMask]
         :param parameters: The parameters to apply to the instance.
             If a dict is provided, it must be of the same form as the protobuf message
             :class:`~google.cloud.memcache_v1beta2.types.cloud_memcache.MemcacheParameters`
diff --git a/setup.py b/setup.py
index 9dde824..46ff73d 100644
--- a/setup.py
+++ b/setup.py
@@ -292,7 +292,9 @@ google = [
     'google-cloud-kms>=2.0.0,<3.0.0',
     'google-cloud-language>=1.1.1,<2.0.0',
     'google-cloud-logging>=2.1.1,<3.0.0',
-    'google-cloud-memcache>=0.2.0',
+    # 1.1.0 removed field_mask and broke import for released providers
+    # We can remove the <1.1.0 limitation after we release new Google Provider
+    'google-cloud-memcache>=0.2.0,<1.1.0',
     'google-cloud-monitoring>=2.0.0,<3.0.0',
     'google-cloud-os-login>=2.0.0,<3.0.0',
     'google-cloud-pubsub>=2.0.0,<3.0.0',

[airflow] 15/39: Add back missing permissions to UserModelView controls. (#17431)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit cf641873ab6058954d1bfdee975f462afd6c09f3
Author: James Timmins <ja...@astronomer.io>
AuthorDate: Mon Aug 9 07:46:05 2021 -0700

    Add back missing permissions to UserModelView controls. (#17431)
    
    Currently, on user model views except from UserDBModelView, view controls don't show up on /users/list. This fixes that issue by adding the missing views back to all user model views.
    
    Additionally, Edit User on the different Show User views all redirect to the logged in user's profile views. This fixes that issue by removing the Edit User view from Show User.
    
    closes: #16202
    (cherry picked from commit c1e2af4dd2bf868307caae9f2fa825562319a4f8)
---
 airflow/www/views.py                            | 143 +++++++++--------
 tests/www/views/test_views_custom_user_views.py | 199 ++++++++++++++++++++++++
 2 files changed, 277 insertions(+), 65 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index e21b823..fa89cb1 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -55,6 +55,7 @@ from flask_appbuilder import BaseView, ModelView, expose
 from flask_appbuilder.actions import action
 from flask_appbuilder.fieldwidgets import Select2Widget
 from flask_appbuilder.models.sqla.filters import BaseFilter
+from flask_appbuilder.security.decorators import has_access
 from flask_appbuilder.security.views import (
     PermissionModelView,
     PermissionViewModelView,
@@ -4112,7 +4113,72 @@ class CustomViewMenuModelView(ViewMenuModelView):
     ]
 
 
-class CustomUserDBModelView(UserDBModelView):
+class CustomUserInfoEditView(UserInfoEditView):
+    """Customize permission names for FAB's builtin UserInfoEditView."""
+
+    class_permission_name = permissions.RESOURCE_MY_PROFILE
+    route_base = "/userinfoeditview"
+    method_permission_name = {
+        'this_form_get': 'edit',
+        'this_form_post': 'edit',
+    }
+    base_permissions = [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]
+
+
+class CustomUserStatsChartView(UserStatsChartView):
+    """Customize permission names for FAB's builtin UserStatsChartView."""
+
+    class_permission_name = permissions.RESOURCE_USER_STATS_CHART
+    route_base = "/userstatschartview"
+    method_permission_name = {
+        'chart': 'read',
+        'list': 'read',
+    }
+    base_permissions = [permissions.ACTION_CAN_READ]
+
+
+class MultiResourceUserMixin:
+    """Remaps UserModelView permissions to new resources and actions."""
+
+    _class_permission_name = permissions.RESOURCE_USER
+
+    class_permission_name_mapping = {
+        'userinfoedit': permissions.RESOURCE_MY_PROFILE,
+        'userinfo': permissions.RESOURCE_MY_PROFILE,
+    }
+
+    method_permission_name = {
+        'userinfo': 'read',
+        'download': 'read',
+        'show': 'read',
+        'list': 'read',
+        'edit': 'edit',
+        'userinfoedit': 'edit',
+        'delete': 'delete',
+    }
+
+    base_permissions = [
+        permissions.ACTION_CAN_READ,
+        permissions.ACTION_CAN_EDIT,
+        permissions.ACTION_CAN_DELETE,
+    ]
+
+    @expose("/show/<pk>", methods=["GET"])
+    @has_access
+    def show(self, pk):
+        pk = self._deserialize_pk_if_composite(pk)
+        widgets = self._show(pk)
+        widgets['show'].template_args['actions'].pop('userinfoedit')
+        return self.render_template(
+            self.show_template,
+            pk=pk,
+            title=self.show_title,
+            widgets=widgets,
+            related_views=self._related_views,
+        )
+
+
+class CustomUserDBModelView(MultiResourceUserMixin, UserDBModelView):
     """Customize permission names for FAB's builtin UserDBModelView."""
 
     _class_permission_name = permissions.RESOURCE_USER
@@ -4126,15 +4192,15 @@ class CustomUserDBModelView(UserDBModelView):
 
     method_permission_name = {
         'add': 'create',
-        'userinfo': 'read',
         'download': 'read',
         'show': 'read',
         'list': 'read',
         'edit': 'edit',
+        'delete': 'delete',
         'resetmypassword': 'read',
         'resetpasswords': 'read',
-        'userinfoedit': 'edit',
-        'delete': 'delete',
+        'userinfo': 'read',
+        'userinfoedit': 'read',
     }
 
     base_permissions = [
@@ -4154,7 +4220,6 @@ class CustomUserDBModelView(UserDBModelView):
                 return self.class_permission_name_mapping.get(action_name, self._class_permission_name)
             if method_name:
                 return self.class_permission_name_mapping.get(method_name, self._class_permission_name)
-
         return self._class_permission_name
 
     @class_permission_name.setter
@@ -4162,77 +4227,25 @@ class CustomUserDBModelView(UserDBModelView):
         self._class_permission_name = name
 
 
-class CustomUserInfoEditView(UserInfoEditView):
-    """Customize permission names for FAB's builtin UserInfoEditView."""
-
-    class_permission_name = permissions.RESOURCE_MY_PROFILE
-    route_base = "/userinfoeditview"
-    method_permission_name = {
-        'this_form_get': 'read',
-        'this_form_post': 'edit',
-    }
-    base_permissions = [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]
-
-
-class CustomUserStatsChartView(UserStatsChartView):
-    """Customize permission names for FAB's builtin UserStatsChartView."""
-
-    class_permission_name = permissions.RESOURCE_USER_STATS_CHART
-    route_base = "/userstatschartview"
-    method_permission_name = {
-        'chart': 'read',
-        'list': 'read',
-    }
-    base_permissions = [permissions.ACTION_CAN_READ]
-
-
-class CustomUserLDAPModelView(UserLDAPModelView):
+class CustomUserLDAPModelView(MultiResourceUserMixin, UserLDAPModelView):
     """Customize permission names for FAB's builtin UserLDAPModelView."""
 
-    class_permission_name = permissions.RESOURCE_MY_PROFILE
-    method_permission_name = {
-        'userinfo': 'read',
-        'list': 'read',
-    }
-    base_permissions = [
-        permissions.ACTION_CAN_READ,
-    ]
+    pass
 
 
-class CustomUserOAuthModelView(UserOAuthModelView):
+class CustomUserOAuthModelView(MultiResourceUserMixin, UserOAuthModelView):
     """Customize permission names for FAB's builtin UserOAuthModelView."""
 
-    class_permission_name = permissions.RESOURCE_MY_PROFILE
-    method_permission_name = {
-        'userinfo': 'read',
-        'list': 'read',
-    }
-    base_permissions = [
-        permissions.ACTION_CAN_READ,
-    ]
+    pass
 
 
-class CustomUserOIDModelView(UserOIDModelView):
+class CustomUserOIDModelView(MultiResourceUserMixin, UserOIDModelView):
     """Customize permission names for FAB's builtin UserOIDModelView."""
 
-    class_permission_name = permissions.RESOURCE_MY_PROFILE
-    method_permission_name = {
-        'userinfo': 'read',
-        'list': 'read',
-    }
-    base_permissions = [
-        permissions.ACTION_CAN_READ,
-    ]
+    pass
 
 
-class CustomUserRemoteUserModelView(UserRemoteUserModelView):
+class CustomUserRemoteUserModelView(MultiResourceUserMixin, UserRemoteUserModelView):
     """Customize permission names for FAB's builtin UserRemoteUserModelView."""
 
-    class_permission_name = permissions.RESOURCE_MY_PROFILE
-    method_permission_name = {
-        'userinfo': 'read',
-        'list': 'read',
-    }
-    base_permissions = [
-        permissions.ACTION_CAN_READ,
-    ]
+    pass
diff --git a/tests/www/views/test_views_custom_user_views.py b/tests/www/views/test_views_custom_user_views.py
new file mode 100644
index 0000000..a031472
--- /dev/null
+++ b/tests/www/views/test_views_custom_user_views.py
@@ -0,0 +1,199 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from flask_appbuilder import SQLA
+from parameterized import parameterized
+
+from airflow import settings
+from airflow.security import permissions
+from airflow.www import app as application
+from airflow.www.views import CustomUserDBModelView
+from tests.test_utils.api_connexion_utils import create_user, delete_role
+from tests.test_utils.www import check_content_in_response, check_content_not_in_response, client_with_login
+
+
+class TestSecurity(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        settings.configure_orm()
+        cls.session = settings.Session
+        cls.app = application.create_app(testing=True)
+        cls.appbuilder = cls.app.appbuilder
+        cls.app.config['WTF_CSRF_ENABLED'] = False
+        cls.security_manager = cls.appbuilder.sm
+
+        cls.delete_roles()
+
+    def setUp(self):
+        self.db = SQLA(self.app)
+        self.appbuilder.add_view(CustomUserDBModelView, "CustomUserDBModelView", category="ModelViews")
+        self.client = self.app.test_client()  # type:ignore
+
+    @classmethod
+    def delete_roles(cls):
+        for role_name in ['role_edit_one_dag']:
+            delete_role(cls.app, role_name)
+
+    @parameterized.expand(
+        [
+            (
+                "/resetpassword/form?pk={user.id}",
+                (permissions.ACTION_CAN_READ, permissions.RESOURCE_PASSWORD),
+                'Reset Password Form',
+            ),
+            (
+                "/resetmypassword/form",
+                (permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PASSWORD),
+                'Reset Password Form',
+            ),
+            (
+                "/users/userinfo/",
+                (permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PROFILE),
+                'Your user information',
+            ),
+            (
+                "/userinfoeditview/form",
+                (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_MY_PROFILE),
+                'Edit User',
+            ),
+            ("/users/add", (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_USER), 'Add User'),
+            ("/users/list/", (permissions.ACTION_CAN_READ, permissions.RESOURCE_USER), 'List Users'),
+            ("/users/show/{user.id}", (permissions.ACTION_CAN_READ, permissions.RESOURCE_USER), 'Show User'),
+            ("/users/edit/{user.id}", (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_USER), 'Edit User'),
+        ]
+    )
+    def test_user_model_view_with_access(self, url, permission, expected_text):
+        user_without_access = create_user(
+            self.app,
+            username="no_access",
+            role_name="role_no_access",
+            permissions=[
+                (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
+            ],
+        )
+        client = client_with_login(
+            self.app,
+            username="no_access",
+            password="no_access",
+        )
+        response = client.get(url.replace("{user.id}", str(user_without_access.id)), follow_redirects=True)
+        check_content_not_in_response(expected_text, response)
+
+    @parameterized.expand(
+        [
+            (
+                "/resetpassword/form?pk={user.id}",
+                (permissions.ACTION_CAN_READ, permissions.RESOURCE_PASSWORD),
+                'Reset Password Form',
+            ),
+            (
+                "/resetmypassword/form",
+                (permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PASSWORD),
+                'Reset Password Form',
+            ),
+            (
+                "/users/userinfo/",
+                (permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PROFILE),
+                'Your user information',
+            ),
+            (
+                "/userinfoeditview/form",
+                (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_MY_PROFILE),
+                'Edit User',
+            ),
+            ("/users/add", (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_USER), 'Add User'),
+            ("/users/list/", (permissions.ACTION_CAN_READ, permissions.RESOURCE_USER), 'List Users'),
+            ("/users/show/{user.id}", (permissions.ACTION_CAN_READ, permissions.RESOURCE_USER), 'Show User'),
+            ("/users/edit/{user.id}", (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_USER), 'Edit User'),
+        ]
+    )
+    def test_user_model_view_without_access(self, url, permission, expected_text):
+
+        user_with_access = create_user(
+            self.app,
+            username="has_access",
+            role_name="role_has_access",
+            permissions=[(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), permission],
+        )
+
+        client = client_with_login(
+            self.app,
+            username="has_access",
+            password="has_access",
+        )
+        response = client.get(url.replace("{user.id}", str(user_with_access.id)), follow_redirects=True)
+        check_content_in_response(expected_text, response)
+
+    def test_user_model_view_without_delete_access(self):
+
+        user_to_delete = create_user(
+            self.app,
+            username="user_to_delete",
+            role_name="user_to_delete",
+        )
+
+        create_user(
+            self.app,
+            username="no_access",
+            role_name="role_no_access",
+            permissions=[
+                (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
+            ],
+        )
+
+        client = client_with_login(
+            self.app,
+            username="no_access",
+            password="no_access",
+        )
+
+        response = client.post(f"/users/delete/{user_to_delete.id}", follow_redirects=True)
+
+        check_content_not_in_response("Deleted Row", response)
+        assert bool(self.security_manager.get_user_by_id(user_to_delete.id)) is True
+
+    def test_user_model_view_with_delete_access(self):
+
+        user_to_delete = create_user(
+            self.app,
+            username="user_to_delete",
+            role_name="user_to_delete",
+        )
+
+        create_user(
+            self.app,
+            username="has_access",
+            role_name="role_has_access",
+            permissions=[
+                (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
+                (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_USER),
+            ],
+        )
+
+        client = client_with_login(
+            self.app,
+            username="has_access",
+            password="has_access",
+        )
+
+        response = client.post(f"/users/delete/{user_to_delete.id}", follow_redirects=True)
+        check_content_in_response("Deleted Row", response)
+        check_content_not_in_response(user_to_delete.username, response)
+        assert bool(self.security_manager.get_user_by_id(user_to_delete.id)) is False

[airflow] 18/39: Handle and log exceptions raised during task callback (#17347)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 798237ad13614c8196dc8346db7ff9933bc6dd56
Author: Sam Wheating <sa...@shopify.com>
AuthorDate: Fri Aug 6 03:35:13 2021 -0700

    Handle and log exceptions raised during task callback (#17347)
    
    Add missing exception handling in success/retry/failure callbacks
    
    (cherry picked from commit faf9f731fa8810e05f868ffec989ea042381ada4)
---
 airflow/models/taskinstance.py    | 15 ++++++++++++---
 tests/models/test_taskinstance.py | 39 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 51 insertions(+), 3 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 76cb079..58bb680 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1384,18 +1384,27 @@ class TaskInstance(Base, LoggingMixin):
             if task.on_failure_callback is not None:
                 context = self.get_template_context()
                 context["exception"] = error
-                task.on_failure_callback(context)
+                try:
+                    task.on_failure_callback(context)
+                except Exception:
+                    self.log.exception("Error when executing on_failure_callback")
         elif self.state == State.SUCCESS:
             task = self.task
             if task.on_success_callback is not None:
                 context = self.get_template_context()
-                task.on_success_callback(context)
+                try:
+                    task.on_success_callback(context)
+                except Exception:
+                    self.log.exception("Error when executing on_success_callback")
         elif self.state == State.UP_FOR_RETRY:
             task = self.task
             if task.on_retry_callback is not None:
                 context = self.get_template_context()
                 context["exception"] = error
-                task.on_retry_callback(context)
+                try:
+                    task.on_retry_callback(context)
+                except Exception:
+                    self.log.exception("Error when executing on_retry_callback")
 
     @provide_session
     def run(
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index b6cd8b8..11fe0cd 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1644,6 +1644,45 @@ class TestTaskInstance(unittest.TestCase):
         ti.refresh_from_db()
         assert ti.state == State.SUCCESS
 
+    @parameterized.expand(
+        [
+            (State.SUCCESS, "Error when executing on_success_callback"),
+            (State.UP_FOR_RETRY, "Error when executing on_retry_callback"),
+            (State.FAILED, "Error when executing on_failure_callback"),
+        ]
+    )
+    def test_finished_callbacks_handle_and_log_exception(self, finished_state, expected_message):
+        called = completed = False
+
+        def on_finish_callable(context):
+            nonlocal called, completed
+            called = True
+            raise KeyError
+            completed = True
+
+        dag = DAG(
+            'test_success_callback_handles_exception',
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+        )
+        task = DummyOperator(
+            task_id='op',
+            email='test@test.test',
+            on_success_callback=on_finish_callable,
+            on_retry_callback=on_finish_callable,
+            on_failure_callback=on_finish_callable,
+            dag=dag,
+        )
+
+        ti = TI(task=task, execution_date=datetime.datetime.now())
+        ti._log = mock.Mock()
+        ti.state = finished_state
+        ti._run_finished_callback()
+
+        assert called
+        assert not completed
+        ti.log.exception.assert_called_once_with(expected_message)
+
     def test_handle_failure(self):
         start_date = timezone.datetime(2016, 6, 1)
         dag = models.DAG(dag_id="test_handle_failure", schedule_interval=None, start_date=start_date)

[airflow] 03/39: Remove SQLAlchemy <1.4 constraint (#16630)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit e8058ebdc6df5d7a168dcccf4d8042ec09faac80
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Thu Jun 24 21:10:15 2021 +0800

    Remove SQLAlchemy <1.4 constraint (#16630)
    
    This was added due to flask-sqlalchemy and sqlalchemy-utils not declaring
    the upper bounds. They have since released sqlalchemy 1.4-compatible
    versions, so we can remove that hack.
    
    Note that this does *not* actually make us run on sqlalchemy 1.4 since
    flask-appbuilder still has a <1.4 pin. But that's for flask-appbuilder
    to worry about -- code in Airflow is compatible, so we can remove the
    constraint now, and get sqlalchemy 1.4 as soon as flask-appbuilder
    allows us to.
    
    (cherry picked from commit d181604739c048c6969d8997dbaf8b159607904b)
---
 setup.cfg | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/setup.cfg b/setup.cfg
index 8b03296..0e4868f 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -150,8 +150,7 @@ install_requires =
     pyyaml>=5.1
     rich>=9.2.0
     setproctitle>=1.1.8, <2
-    # SQLAlchemy 1.4 breaks sqlalchemy-utils https://github.com/kvesteri/sqlalchemy-utils/issues/505
-    sqlalchemy>=1.3.18, <1.4
+    sqlalchemy>=1.3.18
     sqlalchemy_jsonfield~=1.0
     # Required by vendored-in connexion
     swagger-ui-bundle>=0.0.2

[airflow] 21/39: Do not seek error file when it is closed (#17187)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 24e75e72c674b47244fda8780b9d5368abbb05f6
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Sun Jul 25 16:19:18 2021 +0100

    Do not seek error file when it is closed (#17187)
    
    We do not check if error file is closed before we seek it, which causes exceptions.
    Sometimes, this error file does not exist e.g when the task state is changed externally.
    
    This change fixes it by returning None when the file is closed so that custom text can be used for error.
    
    Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
    (cherry picked from commit 01a0aca249eeaf71d182bf537b9d04121257ac09)
---
 airflow/models/taskinstance.py    |  2 ++
 tests/models/test_taskinstance.py | 13 +++++++++++++
 2 files changed, 15 insertions(+)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 58bb680..46ea7f9 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -110,6 +110,8 @@ def set_current_context(context: Context):
 
 def load_error_file(fd: IO[bytes]) -> Optional[Union[str, Exception]]:
     """Load and return error from error file"""
+    if fd.closed:
+        return None
     fd.seek(0, os.SEEK_SET)
     data = fd.read()
     if not data:
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 11fe0cd..4b84d10 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -21,6 +21,7 @@ import os
 import time
 import unittest
 import urllib
+from tempfile import NamedTemporaryFile
 from typing import List, Optional, Union, cast
 from unittest import mock
 from unittest.mock import call, mock_open, patch
@@ -44,6 +45,7 @@ from airflow.models import (
     TaskReschedule,
     Variable,
 )
+from airflow.models.taskinstance import load_error_file, set_error_file
 from airflow.operators.bash import BashOperator
 from airflow.operators.dummy import DummyOperator
 from airflow.operators.python import PythonOperator
@@ -113,6 +115,17 @@ class TestTaskInstance(unittest.TestCase):
     def tearDown(self):
         self.clean_db()
 
+    def test_load_error_file_returns_None_for_closed_file(self):
+        error_fd = NamedTemporaryFile()
+        error_fd.close()
+        assert load_error_file(error_fd) is None
+
+    def test_load_error_file_loads_correctly(self):
+        error_message = "some random error message"
+        with NamedTemporaryFile() as error_fd:
+            set_error_file(error_fd.name, error=error_message)
+            assert load_error_file(error_fd) == error_message
+
     def test_set_task_dates(self):
         """
         Test that tasks properly take start/end dates from DAGs

[airflow] 11/39: Fail tasks in scheduler when executor reports they failed (#15929)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 012321b1325c8d810ae60ad7006ab9f22dfaf95e
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu May 20 11:22:01 2021 +0100

    Fail tasks in scheduler when executor reports they failed (#15929)
    
    When a task fails in executor while still queued in scheduler, the executor reports
    this failure but scheduler doesn't change the task state resulting in the task
    being queued until the scheduler is restarted. This commit fixes it by ensuring
    that when a task is reported to have failed in the executor, the task is failed
    in scheduler
    
    (cherry picked from commit deececcabc080844ca89272a2e4ab1183cd51e3f)
---
 airflow/jobs/scheduler_job.py    | 4 +++-
 tests/jobs/test_scheduler_job.py | 2 +-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index b99f4b2..1758ae1 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1252,12 +1252,14 @@ class SchedulerJob(BaseJob):
                     "task says its %s. (Info: %s) Was the task killed externally?"
                 )
                 self.log.error(msg, ti, state, ti.state, info)
+
                 request = TaskCallbackRequest(
                     full_filepath=ti.dag_model.fileloc,
                     simple_task_instance=SimpleTaskInstance(ti),
                     msg=msg % (ti, state, ti.state, info),
                 )
-
+                self.log.info('Setting task instance %s state to %s as reported by executor', ti, state)
+                ti.set_state(state)
                 self.processor_agent.send_callback_to_execute(request)
 
         return len(event_buffer)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 0d1f530..37ae65b 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -907,7 +907,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
-        assert ti1.state == State.QUEUED
+        assert ti1.state == State.FAILED
         mock_task_callback.assert_called_once_with(
             full_filepath='/test_path1/',
             simple_task_instance=mock.ANY,

[airflow] 32/39: BugFix: Correctly handle custom `deps` and `task_group` during DAG Serialization (#16734)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 12c46c972dc84be0dc673c6ff1ced731afcf5630
Author: luoyuliuyin <lu...@gmail.com>
AuthorDate: Tue Jul 6 20:44:32 2021 +0800

    BugFix: Correctly handle custom `deps` and `task_group` during DAG Serialization (#16734)
    
    We check if the dag changed or not via dag_hash, so we need to correctly handle deps and task_group during DAG serialization to ensure that the generation of dag_hash is stable.
    
    closes https://github.com/apache/airflow/issues/16690
    
    (cherry picked from commit 0632ecf6f56214c78deea2a4b54ea0daebb4e95d)
---
 airflow/serialization/serialized_objects.py   | 16 +++--
 tests/serialization/test_dag_serialization.py | 99 +++++++++++++++++++++++++++
 2 files changed, 110 insertions(+), 5 deletions(-)

diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index d2f456d..bdbaea8 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -424,7 +424,10 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
                     )
 
                 deps.append(f'{module_name}.{klass.__name__}')
-            serialize_op['deps'] = deps
+            # deps needs to be sorted here, because op.deps is a set, which is unstable when traversing,
+            # and the same call may get different results.
+            # When calling json.dumps(self.data, sort_keys=True) to generate dag_hash, misjudgment will occur
+            serialize_op['deps'] = sorted(deps)
 
         # Store all template_fields as they are if there are JSON Serializable
         # If not, store them as strings
@@ -796,6 +799,9 @@ class SerializedTaskGroup(TaskGroup, BaseSerialization):
         if not task_group:
             return None
 
+        # task_group.xxx_ids needs to be sorted here, because task_group.xxx_ids is a set,
+        # when converting set to list, the order is uncertain.
+        # When calling json.dumps(self.data, sort_keys=True) to generate dag_hash, misjudgment will occur
         serialize_group = {
             "_group_id": task_group._group_id,
             "prefix_group_id": task_group.prefix_group_id,
@@ -808,10 +814,10 @@ class SerializedTaskGroup(TaskGroup, BaseSerialization):
                 else (DAT.TASK_GROUP, SerializedTaskGroup.serialize_task_group(child))
                 for label, child in task_group.children.items()
             },
-            "upstream_group_ids": cls._serialize(list(task_group.upstream_group_ids)),
-            "downstream_group_ids": cls._serialize(list(task_group.downstream_group_ids)),
-            "upstream_task_ids": cls._serialize(list(task_group.upstream_task_ids)),
-            "downstream_task_ids": cls._serialize(list(task_group.downstream_task_ids)),
+            "upstream_group_ids": cls._serialize(sorted(task_group.upstream_group_ids)),
+            "downstream_group_ids": cls._serialize(sorted(task_group.downstream_group_ids)),
+            "upstream_task_ids": cls._serialize(sorted(task_group.upstream_task_ids)),
+            "downstream_task_ids": cls._serialize(sorted(task_group.downstream_task_ids)),
         }
 
         return serialize_group
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index f24e862..05edfda 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -963,6 +963,105 @@ class TestStringifiedDAGs(unittest.TestCase):
 
         check_task_group(serialized_dag.task_group)
 
+    def test_deps_sorted(self):
+        """
+        Tests serialize_operator, make sure the deps is in order
+        """
+        from airflow.operators.dummy import DummyOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        execution_date = datetime(2020, 1, 1)
+        with DAG(dag_id="test_deps_sorted", start_date=execution_date) as dag:
+            task1 = ExternalTaskSensor(
+                task_id="task1",
+                external_dag_id="external_dag_id",
+                mode="reschedule",
+            )
+            task2 = DummyOperator(task_id="task2")
+            task1 >> task2
+
+        serialize_op = SerializedBaseOperator.serialize_operator(dag.task_dict["task1"])
+        deps = serialize_op["deps"]
+        assert deps == [
+            'airflow.ti_deps.deps.not_in_retry_period_dep.NotInRetryPeriodDep',
+            'airflow.ti_deps.deps.not_previously_skipped_dep.NotPreviouslySkippedDep',
+            'airflow.ti_deps.deps.prev_dagrun_dep.PrevDagrunDep',
+            'airflow.ti_deps.deps.ready_to_reschedule.ReadyToRescheduleDep',
+            'airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep',
+        ]
+
+    def test_task_group_sorted(self):
+        """
+        Tests serialize_task_group, make sure the list is in order
+        """
+        from airflow.operators.dummy import DummyOperator
+        from airflow.serialization.serialized_objects import SerializedTaskGroup
+        from airflow.utils.task_group import TaskGroup
+
+        """
+                    start
+                    ╱  ╲
+                  ╱      ╲
+        task_group_up1  task_group_up2
+            (task_up1)  (task_up2)
+                 ╲       ╱
+              task_group_middle
+                (task_middle)
+                  ╱      ╲
+        task_group_down1 task_group_down2
+           (task_down1) (task_down2)
+                 ╲        ╱
+                   ╲    ╱
+                    end
+        """
+        execution_date = datetime(2020, 1, 1)
+        with DAG(dag_id="test_task_group_sorted", start_date=execution_date) as dag:
+            start = DummyOperator(task_id="start")
+
+            with TaskGroup("task_group_up1") as task_group_up1:
+                _ = DummyOperator(task_id="task_up1")
+
+            with TaskGroup("task_group_up2") as task_group_up2:
+                _ = DummyOperator(task_id="task_up2")
+
+            with TaskGroup("task_group_middle") as task_group_middle:
+                _ = DummyOperator(task_id="task_middle")
+
+            with TaskGroup("task_group_down1") as task_group_down1:
+                _ = DummyOperator(task_id="task_down1")
+
+            with TaskGroup("task_group_down2") as task_group_down2:
+                _ = DummyOperator(task_id="task_down2")
+
+            end = DummyOperator(task_id='end')
+
+            start >> task_group_up1
+            start >> task_group_up2
+            task_group_up1 >> task_group_middle
+            task_group_up2 >> task_group_middle
+            task_group_middle >> task_group_down1
+            task_group_middle >> task_group_down2
+            task_group_down1 >> end
+            task_group_down2 >> end
+
+        task_group_middle_dict = SerializedTaskGroup.serialize_task_group(
+            dag.task_group.children["task_group_middle"]
+        )
+        upstream_group_ids = task_group_middle_dict["upstream_group_ids"]
+        assert upstream_group_ids == ['task_group_up1', 'task_group_up2']
+
+        upstream_task_ids = task_group_middle_dict["upstream_task_ids"]
+        assert upstream_task_ids == ['task_group_up1.task_up1', 'task_group_up2.task_up2']
+
+        downstream_group_ids = task_group_middle_dict["downstream_group_ids"]
+        assert downstream_group_ids == ['task_group_down1', 'task_group_down2']
+
+        task_group_down1_dict = SerializedTaskGroup.serialize_task_group(
+            dag.task_group.children["task_group_down1"]
+        )
+        downstream_task_ids = task_group_down1_dict["downstream_task_ids"]
+        assert downstream_task_ids == ['end']
+
     def test_edge_info_serialization(self):
         """
         Tests edge_info serialization/deserialization.

[airflow] 30/39: fix: instance name env var (#16749)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 60938f04e308a5f81ac7fdae1f0499d1e3ab26df
Author: Rikhil Raithatha <23...@users.noreply.github.com>
AuthorDate: Thu Jul 1 11:03:35 2021 +0100

    fix: instance name env var (#16749)
    
    (cherry picked from commit fa811057a6ae0fc6c5e4bff1e18971c262a42a4c)
---
 docs/apache-airflow/howto/customize-dag-ui-page-instance-name.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/apache-airflow/howto/customize-dag-ui-page-instance-name.rst b/docs/apache-airflow/howto/customize-dag-ui-page-instance-name.rst
index e54a066..f67c193 100644
--- a/docs/apache-airflow/howto/customize-dag-ui-page-instance-name.rst
+++ b/docs/apache-airflow/howto/customize-dag-ui-page-instance-name.rst
@@ -38,7 +38,7 @@ To make this change, simply:
 
 .. code-block::
 
-  AIRFLOW__WEBSERVER__SITE_TITLE = "DevEnv"
+  AIRFLOW__WEBSERVER__INSTANCE_NAME = "DevEnv"
 
 
 Screenshots

[airflow] 31/39: Fix unchecked indexing in _build_metrics (#16744)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 479a34d5f97aa4a92fbb324caa7ea0abc4fa98da
Author: Emil Ejbyfeldt <ee...@liveintent.com>
AuthorDate: Thu Jul 1 09:24:16 2021 +0200

    Fix unchecked indexing in _build_metrics (#16744)
    
    I am not sure if this can happend in regular uses, but when running test
    cases `sys.argv` can be that args passed to the pytest. When this is the
    case it is definently possible for argv to only contain a single
    element.
    
    (cherry picked from commit a2d6aa07c9de83278eef14a228eb4a02ad43205c)
---
 airflow/utils/cli.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py
index 2b5eb59..acdb291 100644
--- a/airflow/utils/cli.py
+++ b/airflow/utils/cli.py
@@ -115,7 +115,8 @@ def _build_metrics(func_name, namespace):
     sub_commands_to_check = {'users', 'connections'}
     sensitive_fields = {'-p', '--password', '--conn-password'}
     full_command = list(sys.argv)
-    if full_command[1] in sub_commands_to_check:
+    sub_command = full_command[1] if len(full_command) > 1 else None
+    if sub_command in sub_commands_to_check:
         for idx, command in enumerate(full_command):
             if command in sensitive_fields:
                 # For cases when password is passed as "--password xyz" (with space between key and value)

[airflow] 19/39: Show serialization exceptions in DAG parsing log (#17277)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ed76d111a7a903f9b283979aac770cd0ad436d00
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Wed Jul 28 10:51:21 2021 -0600

    Show serialization exceptions in DAG parsing log (#17277)
    
    Make sure that any exceptions that happen when writing serialized DAGs
    to the db get written to the DAG parsing log, instead of only being added
    to `import_errors` for consumption via the UI.
    
    (cherry picked from commit 9cd5a97654fa82f1d4d8f599e8eb81957b3f7286)
---
 airflow/models/dagbag.py    | 1 +
 tests/models/test_dagbag.py | 4 +++-
 2 files changed, 4 insertions(+), 1 deletion(-)

diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 5d84ba2..b37482a 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -595,6 +595,7 @@ class DagBag(LoggingMixin):
             except OperationalError:
                 raise
             except Exception:
+                self.log.exception("Failed to write serialized DAG: %s", dag.full_filepath)
                 return [(dag.fileloc, traceback.format_exc(limit=-self.dagbag_import_error_traceback_depth))]
 
         # Retry 'DAG.bulk_write_to_db' & 'SerializedDagModel.bulk_sync_to_db' in case
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index b4edc0c..a065318 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -703,7 +703,9 @@ class TestDagBag(unittest.TestCase):
             )
             assert dagbag.import_errors == {}
 
-            dagbag.sync_to_db(session=session)
+            with self.assertLogs(level="ERROR") as cm:
+                dagbag.sync_to_db(session=session)
+            self.assertIn("SerializationError", "\n".join(cm.output))
 
             assert path in dagbag.import_errors
             err = dagbag.import_errors[path]

[airflow] 06/39: bump dnspython (#16698)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ab46af97bd50ad0f024892eb1c65d7bbb724ae89
Author: kurtqq <47...@users.noreply.github.com>
AuthorDate: Tue Jun 29 00:21:24 2021 +0300

    bump dnspython (#16698)
    
    (cherry picked from commit 57dcac22137bc958c1ed9f12fa54484e13411a6f)
---
 setup.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/setup.py b/setup.py
index e6c1d46..9e8c28d 100644
--- a/setup.py
+++ b/setup.py
@@ -370,7 +370,7 @@ ldap = [
 ]
 leveldb = ['plyvel']
 mongo = [
-    'dnspython>=1.13.0,<2.0.0',
+    'dnspython>=1.13.0,<3.0.0',
     'pymongo>=3.6.0',
 ]
 mssql = [

[airflow] 16/39: Add missing permissions to varimport (#17468)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit eaa21db41f4e272f759cd0fdc0e4294a7ca9a6f4
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Fri Aug 6 16:22:50 2021 +0100

    Add missing permissions to varimport (#17468)
    
    (cherry picked from commit eb6af07f5bc8958efd06818e84a5273a079304e1)
---
 airflow/www/views.py                   |  2 +-
 tests/www/views/test_views_variable.py | 13 +++++++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index fa89cb1..590de7d 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3276,7 +3276,6 @@ class VariableModelView(AirflowModelView):
         'delete': 'delete',
         'action_muldelete': 'delete',
         'action_varexport': 'read',
-        'varimport': 'create',
     }
     base_permissions = [
         permissions.ACTION_CAN_CREATE,
@@ -3339,6 +3338,7 @@ class VariableModelView(AirflowModelView):
         return response
 
     @expose('/varimport', methods=["POST"])
+    @auth.has_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_VARIABLE)])
     @action_logging
     def varimport(self):
         """Import variables"""
diff --git a/tests/www/views/test_views_variable.py b/tests/www/views/test_views_variable.py
index f9b3744..292e971 100644
--- a/tests/www/views/test_views_variable.py
+++ b/tests/www/views/test_views_variable.py
@@ -96,6 +96,19 @@ def test_import_variables_success(session, admin_client):
     check_content_in_response('4 variable(s) successfully updated.', resp)
 
 
+def test_import_variables_anon(session, app):
+    assert session.query(Variable).count() == 0
+
+    content = '{"str_key": "str_value}'
+    bytes_content = io.BytesIO(bytes(content, encoding='utf-8'))
+
+    resp = app.test_client().post(
+        '/variable/varimport', data={'file': (bytes_content, 'test.json')}, follow_redirects=True
+    )
+    check_content_not_in_response('variable(s) successfully updated.', resp)
+    check_content_in_response('Sign In', resp)
+
+
 def test_description_retrieval(session, admin_client):
     # create valid variable
     admin_client.post('/variable/add', data=VARIABLE, follow_redirects=True)

[airflow] 07/39: AIRFLOW-5529 Add Apache Drill provider. (#16884)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4aef457774c646940caef7a02e0c2c0d76915160
Author: dzamo <91...@users.noreply.github.com>
AuthorDate: Mon Jul 12 19:59:35 2021 +0200

    AIRFLOW-5529 Add Apache Drill provider. (#16884)
    
    (cherry picked from commit 8808b641942e1b81c21db054fd6d36e2031cfab8)
---
 CONTRIBUTING.rst                                   |  22 ++---
 INSTALL                                            |  24 +++---
 airflow/providers/apache/drill/CHANGELOG.rst       |  25 ++++++
 airflow/providers/apache/drill/__init__.py         |  17 ++++
 .../apache/drill/example_dags/example_drill_dag.py |  46 +++++++++++
 airflow/providers/apache/drill/hooks/__init__.py   |  17 ++++
 airflow/providers/apache/drill/hooks/drill.py      |  89 +++++++++++++++++++++
 .../providers/apache/drill/operators/__init__.py   |  17 ++++
 airflow/providers/apache/drill/operators/drill.py  |  71 ++++++++++++++++
 airflow/providers/apache/drill/provider.yaml       |  49 ++++++++++++
 airflow/ui/src/views/Docs.tsx                      |   1 +
 airflow/utils/db.py                                |  10 +++
 .../commits.rst                                    |  23 ++++++
 .../connections/drill.rst                          |  44 ++++++++++
 .../index.rst                                      |  50 ++++++++++++
 .../operators.rst                                  |  51 ++++++++++++
 docs/apache-airflow/extra-packages-ref.rst         |   2 +
 docs/conf.py                                       |   1 +
 docs/integration-logos/apache/drill.png            | Bin 0 -> 40173 bytes
 docs/spelling_wordlist.txt                         |   1 +
 setup.py                                           |   3 +
 tests/providers/apache/drill/__init__.py           |  17 ++++
 tests/providers/apache/drill/hooks/__init__.py     |  17 ++++
 tests/providers/apache/drill/hooks/test_drill.py   |  84 +++++++++++++++++++
 tests/providers/apache/drill/operators/__init__.py |  17 ++++
 .../providers/apache/drill/operators/test_drill.py |  63 +++++++++++++++
 26 files changed, 738 insertions(+), 23 deletions(-)

diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index 98cdf93..be807f4 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -573,17 +573,17 @@ This is the full list of those extras:
 
   .. START EXTRAS HERE
 
-airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid,
-apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark,
-apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
-cncf.kubernetes, crypto, dask, databricks, datadog, deprecated_api, devel, devel_all, devel_ci,
-devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp,
-gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc,
-jenkins, jira, kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql,
-microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, papermill,
-password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba,
-segment, sendgrid, sentry, sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau,
-telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
+airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.drill,
+apache.druid, apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot,
+apache.spark, apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cassandra, celery,
+cgroups, cloudant, cncf.kubernetes, crypto, dask, databricks, datadog, deprecated_api, devel,
+devel_all, devel_ci, devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol,
+facebook, ftp, gcp, gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive,
+http, imap, jdbc, jenkins, jira, kerberos, kubernetes, ldap, leveldb, microsoft.azure,
+microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, opsgenie, oracle,
+pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3,
+salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, snowflake, spark, sqlite,
+ssh, statsd, tableau, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
 
   .. END EXTRAS HERE
 
diff --git a/INSTALL b/INSTALL
index 111b51f..554af5c 100644
--- a/INSTALL
+++ b/INSTALL
@@ -1,6 +1,6 @@
 # INSTALL / BUILD instructions for Apache Airflow
 
-This ia a generic installation method that requires a number of dependencies to be installed.
+This is a generic installation method that requires a number of dependencies to be installed.
 
 Depending on your system you might need different prerequisites, but the following
 systems/prerequisites are known to work:
@@ -89,17 +89,17 @@ The list of available extras:
 
 # START EXTRAS HERE
 
-airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid,
-apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark,
-apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
-cncf.kubernetes, crypto, dask, databricks, datadog, deprecated_api, devel, devel_all, devel_ci,
-devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp,
-gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc,
-jenkins, jira, kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql,
-microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, papermill,
-password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba,
-segment, sendgrid, sentry, sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau,
-telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
+airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.drill,
+apache.druid, apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot,
+apache.spark, apache.sqoop, apache.webhdfs, asana, async, atlas, aws, azure, cassandra, celery,
+cgroups, cloudant, cncf.kubernetes, crypto, dask, databricks, datadog, deprecated_api, devel,
+devel_all, devel_ci, devel_hadoop, dingding, discord, doc, docker, druid, elasticsearch, exasol,
+facebook, ftp, gcp, gcp_api, github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive,
+http, imap, jdbc, jenkins, jira, kerberos, kubernetes, ldap, leveldb, microsoft.azure,
+microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, opsgenie, oracle,
+pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole, rabbitmq, redis, s3,
+salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, snowflake, spark, sqlite,
+ssh, statsd, tableau, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
 
 # END EXTRAS HERE
 
diff --git a/airflow/providers/apache/drill/CHANGELOG.rst b/airflow/providers/apache/drill/CHANGELOG.rst
new file mode 100644
index 0000000..cef7dda
--- /dev/null
+++ b/airflow/providers/apache/drill/CHANGELOG.rst
@@ -0,0 +1,25 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Changelog
+---------
+
+1.0.0
+.....
+
+Initial version of the provider.
diff --git a/airflow/providers/apache/drill/__init__.py b/airflow/providers/apache/drill/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/apache/drill/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/drill/example_dags/example_drill_dag.py b/airflow/providers/apache/drill/example_dags/example_drill_dag.py
new file mode 100644
index 0000000..60a35ee
--- /dev/null
+++ b/airflow/providers/apache/drill/example_dags/example_drill_dag.py
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG to submit Apache Spark applications using
+`SparkSubmitOperator`, `SparkJDBCOperator` and `SparkSqlOperator`.
+"""
+from airflow.models import DAG
+from airflow.providers.apache.drill.operators.drill import DrillOperator
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'Airflow',
+}
+
+with DAG(
+    dag_id='example_drill_dag',
+    default_args=args,
+    schedule_interval=None,
+    start_date=days_ago(2),
+    tags=['example'],
+) as dag:
+    # [START howto_operator_drill]
+    sql_task = DrillOperator(
+        task_id='json_to_parquet_table',
+        sql='''
+        drop table if exists dfs.tmp.employee;
+        create table dfs.tmp.employee as select * from cp.`employee.json`;
+        ''',
+    )
+    # [END howto_operator_drill]
diff --git a/airflow/providers/apache/drill/hooks/__init__.py b/airflow/providers/apache/drill/hooks/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/apache/drill/hooks/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/drill/hooks/drill.py b/airflow/providers/apache/drill/hooks/drill.py
new file mode 100644
index 0000000..470be8c
--- /dev/null
+++ b/airflow/providers/apache/drill/hooks/drill.py
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Iterable, Optional, Tuple
+
+from sqlalchemy import create_engine
+from sqlalchemy.engine import Connection
+
+from airflow.hooks.dbapi import DbApiHook
+
+
+class DrillHook(DbApiHook):
+    """
+    Interact with Apache Drill via sqlalchemy-drill.
+
+    You can specify the SQLAlchemy dialect and driver that sqlalchemy-drill
+    will employ to communicate with Drill in the extras field of your
+    connection, e.g. ``{"dialect_driver": "drill+sadrill"}`` for communication
+    over Drill's REST API.  See the sqlalchemy-drill documentation for
+    descriptions of the supported dialects and drivers.
+
+    You can specify the default storage_plugin for the sqlalchemy-drill
+    connection using the extras field e.g. ``{"storage_plugin": "dfs"}``.
+    """
+
+    conn_name_attr = 'drill_conn_id'
+    default_conn_name = 'drill_default'
+    conn_type = 'drill'
+    hook_name = 'Drill'
+    supports_autocommit = False
+
+    def get_conn(self) -> Connection:
+        """Establish a connection to Drillbit."""
+        conn_md = self.get_connection(getattr(self, self.conn_name_attr))
+        creds = f'{conn_md.login}:{conn_md.password}@' if conn_md.login else ''
+        engine = create_engine(
+            f'{conn_md.extra_dejson.get("dialect_driver", "drill+sadrill")}://{creds}'
+            f'{conn_md.host}:{conn_md.port}/'
+            f'{conn_md.extra_dejson.get("storage_plugin", "dfs")}'
+        )
+
+        self.log.info(
+            'Connected to the Drillbit at %s:%s as user %s', conn_md.host, conn_md.port, conn_md.login
+        )
+        return engine.raw_connection()
+
+    def get_uri(self) -> str:
+        """
+        Returns the connection URI
+
+        e.g: ``drill://localhost:8047/dfs``
+        """
+        conn_md = self.get_connection(getattr(self, self.conn_name_attr))
+        host = conn_md.host
+        if conn_md.port is not None:
+            host += f':{conn_md.port}'
+        conn_type = 'drill' if not conn_md.conn_type else conn_md.conn_type
+        dialect_driver = conn_md.extra_dejson.get('dialect_driver', 'drill+sadrill')
+        storage_plugin = conn_md.extra_dejson.get('storage_plugin', 'dfs')
+        return f'{conn_type}://{host}/{storage_plugin}' f'?dialect_driver={dialect_driver}'
+
+    def set_autocommit(self, conn: Connection, autocommit: bool) -> NotImplemented:
+        raise NotImplementedError("There are no transactions in Drill.")
+
+    def insert_rows(
+        self,
+        table: str,
+        rows: Iterable[Tuple[str]],
+        target_fields: Optional[Iterable[str]] = None,
+        commit_every: int = 1000,
+        replace: bool = False,
+        **kwargs: Any,
+    ) -> NotImplemented:
+        raise NotImplementedError("There is no INSERT statement in Drill.")
diff --git a/airflow/providers/apache/drill/operators/__init__.py b/airflow/providers/apache/drill/operators/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/apache/drill/operators/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/drill/operators/drill.py b/airflow/providers/apache/drill/operators/drill.py
new file mode 100644
index 0000000..459c623
--- /dev/null
+++ b/airflow/providers/apache/drill/operators/drill.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Iterable, Mapping, Optional, Union
+
+import sqlparse
+
+from airflow.models import BaseOperator
+from airflow.providers.apache.drill.hooks.drill import DrillHook
+from airflow.utils.decorators import apply_defaults
+
+
+class DrillOperator(BaseOperator):
+    """
+    Executes the provided SQL in the identified Drill environment.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:DrillOperator`
+
+    :param sql: the SQL code to be executed. (templated)
+    :type sql: Can receive a str representing a sql statement,
+        a list of str (sql statements), or a reference to a template file.
+        Template references are recognized by str ending in '.sql'
+    :param drill_conn_id: id of the connection config for the target Drill
+        environment
+    :type drill_conn_id: str
+    :param parameters: (optional) the parameters to render the SQL query with.
+    :type parameters: dict or iterable
+    """
+
+    template_fields = ('sql',)
+    template_fields_renderers = {'sql': 'sql'}
+    template_ext = ('.sql',)
+    ui_color = '#ededed'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        sql: str,
+        drill_conn_id: str = 'drill_default',
+        parameters: Optional[Union[Mapping, Iterable]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.sql = sql
+        self.drill_conn_id = drill_conn_id
+        self.parameters = parameters
+        self.hook = None
+
+    def execute(self, context):
+        self.log.info('Executing: %s on %s', self.sql, self.drill_conn_id)
+        self.hook = DrillHook(drill_conn_id=self.drill_conn_id)
+        sql = sqlparse.split(sqlparse.format(self.sql, strip_comments=True))
+        no_term_sql = [s[:-1] for s in sql if s[-1] == ';']
+        self.hook.run(no_term_sql, parameters=self.parameters)
diff --git a/airflow/providers/apache/drill/provider.yaml b/airflow/providers/apache/drill/provider.yaml
new file mode 100644
index 0000000..88021e6
--- /dev/null
+++ b/airflow/providers/apache/drill/provider.yaml
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-apache-drill
+name: Apache Drill
+description: |
+    `Apache Drill <https://drill.apache.org/>`__.
+
+versions:
+  - 1.0.0
+
+additional-dependencies:
+  - apache-airflow>=2.1.0
+
+integrations:
+  - integration-name: Apache Drill
+    external-doc-url: https://drill.apache.org/
+    how-to-guide:
+      - /docs/apache-airflow-providers-apache-drill/operators.rst
+    logo: /integration-logos/apache/drill.png
+    tags: [apache]
+
+operators:
+  - integration-name: Apache Drill
+    python-modules:
+      - airflow.providers.apache.drill.operators.drill
+
+hooks:
+  - integration-name: Apache Drill
+    python-modules:
+      - airflow.providers.apache.drill.hooks.drill
+
+hook-class-names:
+  - airflow.providers.apache.drill.hooks.drill.DrillHook
diff --git a/airflow/ui/src/views/Docs.tsx b/airflow/ui/src/views/Docs.tsx
index 754b803..ea42ca6 100644
--- a/airflow/ui/src/views/Docs.tsx
+++ b/airflow/ui/src/views/Docs.tsx
@@ -41,6 +41,7 @@ const Docs: React.FC = () => {
     { path: 'amazon', name: 'Amazon' },
     { path: 'apache-beam', name: 'Apache Beam' },
     { path: 'apache-cassandra', name: 'Apache Cassandra' },
+    { path: 'apache-drill', name: 'Apache Drill' },
     { path: 'apache-druid', name: 'Apache Druid' },
     { path: 'apache-hdfs', name: 'Apache HDFS' },
     { path: 'apache-hive', name: 'Apache Hive' },
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index ae8dc0e..69e2e00 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -168,6 +168,16 @@ def create_default_connections(session=None):
     )
     merge_conn(
         Connection(
+            conn_id="drill_default",
+            conn_type="drill",
+            host="localhost",
+            port=8047,
+            extra='{"dialect_driver": "drill+sadrill", "storage_plugin": "dfs"}',
+        ),
+        session,
+    )
+    merge_conn(
+        Connection(
             conn_id="druid_broker_default",
             conn_type="druid",
             host="druid-broker",
diff --git a/docs/apache-airflow-providers-apache-drill/commits.rst b/docs/apache-airflow-providers-apache-drill/commits.rst
new file mode 100644
index 0000000..deb31b3
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-drill/commits.rst
@@ -0,0 +1,23 @@
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Package apache-airflow-providers-apache-drill
+------------------------------------------------------
+
+`Apache Drill <https://drill.apache.org/>`__.
diff --git a/docs/apache-airflow-providers-apache-drill/connections/drill.rst b/docs/apache-airflow-providers-apache-drill/connections/drill.rst
new file mode 100644
index 0000000..05e00a2
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-drill/connections/drill.rst
@@ -0,0 +1,44 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+.. _howto/connection:drill:
+
+Apache Drill Connection
+=======================
+
+The Apache Drill connection type configures a connection to Apache Drill via the sqlalchemy-drill Python package.
+
+Default Connection IDs
+----------------------
+
+Drill hooks and operators use ``drill_default`` by default.
+
+Configuring the Connection
+--------------------------
+Host (required)
+    The host of the Drillbit to connect to (HTTP, JDBC) or the DSN of the Drill ODBC connection.
+
+Port (optional)
+    The port of the Drillbit to connect to.
+
+Extra (optional)
+     A JSON dictionary specifying the extra parameters that can be used in sqlalchemy-drill connection.
+
+    * ``dialect_driver`` - The dialect and driver as understood by sqlalchemy-drill.  Defaults to ``drill_sadrill`` (HTTP).
+    * ``storage_plugin`` - The default Drill storage plugin for this connection.  Defaults to ``dfs``.
diff --git a/docs/apache-airflow-providers-apache-drill/index.rst b/docs/apache-airflow-providers-apache-drill/index.rst
new file mode 100644
index 0000000..9ef0f2e
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-drill/index.rst
@@ -0,0 +1,50 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+``apache-airflow-providers-apache-drill``
+=========================================
+
+Content
+-------
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Guides
+
+    Connection types <connections/drill>
+    Operators <operators>
+
+.. toctree::
+    :maxdepth: 1
+    :caption: References
+
+    Python API <_api/airflow/providers/apache/drill/index>
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Resources
+
+    Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/apache/drill/example_dags>
+    PyPI Repository <https://pypi.org/project/apache-airflow-providers-apache-drill/>
+
+.. THE REMINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME!
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Commits
+
+    Detailed list of commits <commits>
diff --git a/docs/apache-airflow-providers-apache-drill/operators.rst b/docs/apache-airflow-providers-apache-drill/operators.rst
new file mode 100644
index 0000000..f2d5cf5
--- /dev/null
+++ b/docs/apache-airflow-providers-apache-drill/operators.rst
@@ -0,0 +1,51 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Apache Drill Operators
+======================
+
+.. contents::
+  :depth: 1
+  :local:
+
+Prerequisite
+------------
+
+To use ``DrillOperator``, you must configure a :doc:`Drill Connection <connections/drill>`.
+
+
+.. _howto/operator:DrillOperator:
+
+DrillOperator
+-------------
+
+Executes one or more SQL queries on an Apache Drill server.  The ``sql`` parameter can be templated and be an external ``.sql`` file.
+
+Using the operator
+""""""""""""""""""
+
+.. exampleinclude:: /../../airflow/providers/apache/drill/example_dags/example_drill_dag.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_drill]
+    :end-before: [END howto_operator_drill]
+
+Reference
+"""""""""
+
+For further information, see `the Drill documentation on querying data <http://apache.github.io/drill/docs/query-data/>`_.
diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst
index b1dff07..2d4769e 100644
--- a/docs/apache-airflow/extra-packages-ref.rst
+++ b/docs/apache-airflow/extra-packages-ref.rst
@@ -116,6 +116,8 @@ custom bash/python providers).
 +---------------------+-----------------------------------------------------+------------------------------------------------+
 | apache.cassandra    | ``pip install 'apache-airflow[apache.cassandra]'``  | Cassandra related operators & hooks            |
 +---------------------+-----------------------------------------------------+------------------------------------------------+
+| apache.drill        | ``pip install 'apache-airflow[apache.drill]'``      | Drill related operators & hooks                |
++---------------------+-----------------------------------------------------+------------------------------------------------+
 | apache.druid        | ``pip install 'apache-airflow[apache.druid]'``      | Druid related operators & hooks                |
 +---------------------+-----------------------------------------------------+------------------------------------------------+
 | apache.hdfs         | ``pip install 'apache-airflow[apache.hdfs]'``       | HDFS hooks and operators                       |
diff --git a/docs/conf.py b/docs/conf.py
index 3046303..da3b8e2 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -512,6 +512,7 @@ autodoc_mock_imports = [
     'slack_sdk',
     'smbclient',
     'snowflake',
+    'sqlalchemy-drill',
     'sshtunnel',
     'telegram',
     'tenacity',
diff --git a/docs/integration-logos/apache/drill.png b/docs/integration-logos/apache/drill.png
new file mode 100644
index 0000000..9f76b61
Binary files /dev/null and b/docs/integration-logos/apache/drill.png differ
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index c476512..7631c6c 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -120,6 +120,7 @@ Docstring
 Docstrings
 Dont
 Driesprong
+Drillbit
 Drivy
 Dsn
 Dynamodb
diff --git a/setup.py b/setup.py
index 9e8c28d..9dde824 100644
--- a/setup.py
+++ b/setup.py
@@ -255,6 +255,7 @@ doc = [
 docker = [
     'docker',
 ]
+drill = ['sqlalchemy-drill>=1.1.0', 'sqlparse>=0.4.1']
 druid = [
     'pydruid>=0.4.1',
 ]
@@ -534,6 +535,7 @@ PROVIDERS_REQUIREMENTS: Dict[str, List[str]] = {
     'amazon': amazon,
     'apache.beam': apache_beam,
     'apache.cassandra': cassandra,
+    'apache.drill': drill,
     'apache.druid': druid,
     'apache.hdfs': hdfs,
     'apache.hive': hive,
@@ -724,6 +726,7 @@ ALL_PROVIDERS = list(PROVIDERS_REQUIREMENTS.keys())
 
 ALL_DB_PROVIDERS = [
     'apache.cassandra',
+    'apache.drill',
     'apache.druid',
     'apache.hdfs',
     'apache.hive',
diff --git a/tests/providers/apache/drill/__init__.py b/tests/providers/apache/drill/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/apache/drill/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/apache/drill/hooks/__init__.py b/tests/providers/apache/drill/hooks/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/apache/drill/hooks/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/apache/drill/hooks/test_drill.py b/tests/providers/apache/drill/hooks/test_drill.py
new file mode 100644
index 0000000..97ed71f
--- /dev/null
+++ b/tests/providers/apache/drill/hooks/test_drill.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+from unittest.mock import MagicMock
+
+from airflow.providers.apache.drill.hooks.drill import DrillHook
+
+
+class TestDrillHook(unittest.TestCase):
+    def setUp(self):
+        self.cur = MagicMock(rowcount=0)
+        self.conn = conn = MagicMock()
+        self.conn.login = 'drill_user'
+        self.conn.password = 'secret'
+        self.conn.host = 'host'
+        self.conn.port = '8047'
+        self.conn.conn_type = 'drill'
+        self.conn.extra_dejson = {'dialect_driver': 'drill+sadrill', 'storage_plugin': 'dfs'}
+        self.conn.cursor.return_value = self.cur
+
+        class TestDrillHook(DrillHook):
+            def get_conn(self):
+                return conn
+
+            def get_connection(self, conn_id):
+                return conn
+
+        self.db_hook = TestDrillHook
+
+    def test_get_uri(self):
+        db_hook = self.db_hook()
+        assert 'drill://host:8047/dfs?dialect_driver=drill+sadrill' == db_hook.get_uri()
+
+    def test_get_first_record(self):
+        statement = 'SQL'
+        result_sets = [('row1',), ('row2',)]
+        self.cur.fetchone.return_value = result_sets[0]
+
+        assert result_sets[0] == self.db_hook().get_first(statement)
+        assert self.conn.close.call_count == 1
+        assert self.cur.close.call_count == 1
+        self.cur.execute.assert_called_once_with(statement)
+
+    def test_get_records(self):
+        statement = 'SQL'
+        result_sets = [('row1',), ('row2',)]
+        self.cur.fetchall.return_value = result_sets
+
+        assert result_sets == self.db_hook().get_records(statement)
+        assert self.conn.close.call_count == 1
+        assert self.cur.close.call_count == 1
+        self.cur.execute.assert_called_once_with(statement)
+
+    def test_get_pandas_df(self):
+        statement = 'SQL'
+        column = 'col'
+        result_sets = [('row1',), ('row2',)]
+        self.cur.description = [(column,)]
+        self.cur.fetchall.return_value = result_sets
+        df = self.db_hook().get_pandas_df(statement)
+
+        assert column == df.columns[0]
+        for i in range(len(result_sets)):  # pylint: disable=consider-using-enumerate
+            assert result_sets[i][0] == df.values.tolist()[i][0]
+        assert self.conn.close.call_count == 1
+        assert self.cur.close.call_count == 1
+        self.cur.execute.assert_called_once_with(statement)
diff --git a/tests/providers/apache/drill/operators/__init__.py b/tests/providers/apache/drill/operators/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/tests/providers/apache/drill/operators/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/apache/drill/operators/test_drill.py b/tests/providers/apache/drill/operators/test_drill.py
new file mode 100644
index 0000000..3572d85
--- /dev/null
+++ b/tests/providers/apache/drill/operators/test_drill.py
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+import pytest
+
+from airflow.models.dag import DAG
+from airflow.providers.apache.drill.operators.drill import DrillOperator
+from airflow.utils import timezone
+
+DEFAULT_DATE = timezone.datetime(2015, 1, 1)
+DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
+DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
+TEST_DAG_ID = 'unit_test_dag'
+
+
+@pytest.mark.backend("drill")
+class TestDrillOperator(unittest.TestCase):
+    def setUp(self):
+        args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
+        dag = DAG(TEST_DAG_ID, default_args=args)
+        self.dag = dag
+
+    def tearDown(self):
+        tables_to_drop = ['dfs.tmp.test_airflow']
+        from airflow.providers.apache.drill.hooks.drill import DrillHook
+
+        with DrillHook().get_conn() as conn:
+            with conn.cursor() as cur:
+                for table in tables_to_drop:
+                    cur.execute(f"DROP TABLE IF EXISTS {table}")
+
+    def test_drill_operator_single(self):
+        sql = """
+        create table dfs.tmp.test_airflow as
+        select * from cp.`employee.json` limit 10
+        """
+        op = DrillOperator(task_id='drill_operator_test_single', sql=sql, dag=self.dag)
+        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    def test_drill_operator_multi(self):
+        sql = [
+            "create table dfs.tmp.test_airflow as" "select * from cp.`employee.json` limit 10",
+            "select sum(employee_id), any_value(full_name)" "from dfs.tmp.test_airflow",
+        ]
+        op = DrillOperator(task_id='drill_operator_test_multi', sql=sql, dag=self.dag)
+        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

[airflow] 12/39: Fix CLI 'kubernetes cleanup-pods' which fails on invalid label key (#17298)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8e8b58aa0a96164c7142d795a9c919d50e2a9aa1
Author: Damir Lampa <60...@users.noreply.github.com>
AuthorDate: Thu Jul 29 14:17:51 2021 -0600

    Fix CLI 'kubernetes cleanup-pods' which fails on invalid label key (#17298)
    
    Fix for #16013 - CLI 'kubernetes cleanup-pods' fails on invalid label key
    
    (cherry picked from commit 36bdfe8d0ef7e5fc428434f8313cf390ee9acc8f)
---
 airflow/cli/commands/kubernetes_command.py    | 12 ++----------
 tests/cli/commands/test_kubernetes_command.py |  8 ++------
 2 files changed, 4 insertions(+), 16 deletions(-)

diff --git a/airflow/cli/commands/kubernetes_command.py b/airflow/cli/commands/kubernetes_command.py
index 3c3c8e6..2660dae 100644
--- a/airflow/cli/commands/kubernetes_command.py
+++ b/airflow/cli/commands/kubernetes_command.py
@@ -96,16 +96,8 @@ def cleanup_pods(args):
         'try_number',
         'airflow_version',
     ]
-    list_kwargs = {
-        "namespace": namespace,
-        "limit": 500,
-        "label_selector": client.V1LabelSelector(
-            match_expressions=[
-                client.V1LabelSelectorRequirement(key=label, operator="Exists")
-                for label in airflow_pod_labels
-            ]
-        ),
-    }
+    list_kwargs = {"namespace": namespace, "limit": 500, "label_selector": ','.join(airflow_pod_labels)}
+
     while True:
         pod_list = kube_client.list_namespaced_pod(**list_kwargs)
         for pod in pod_list.items:
diff --git a/tests/cli/commands/test_kubernetes_command.py b/tests/cli/commands/test_kubernetes_command.py
index f2a8605..490c7fa 100644
--- a/tests/cli/commands/test_kubernetes_command.py
+++ b/tests/cli/commands/test_kubernetes_command.py
@@ -55,12 +55,8 @@ class TestGenerateDagYamlCommand(unittest.TestCase):
 
 
 class TestCleanUpPodsCommand(unittest.TestCase):
-    label_selector = kubernetes.client.V1LabelSelector(
-        match_expressions=[
-            kubernetes.client.V1LabelSelectorRequirement(key=label, operator="Exists")
-            for label in ['dag_id', 'task_id', 'execution_date', 'try_number', 'airflow_version']
-        ]
-    )
+
+    label_selector = ','.join(['dag_id', 'task_id', 'execution_date', 'try_number', 'airflow_version'])
 
     @classmethod
     def setUpClass(cls):

[airflow] 01/39: Switches to "/" convention in ghcr.io images with optimisations

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3d2a5cf7f6ed98cef2c398e46628242bb0abb270
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Sat Jul 31 11:48:15 2021 +0200

    Switches to "/" convention in ghcr.io images with optimisations
    
    We are using ghcr.io as image cache for our CI builds and Breeze
    and it seems ghcr.io is being "rebuilt" while running.
    
    We had been using "airflow-<branch>.." image convention before,
    bacause multiple nesting levels of images were not supported,
    however we experienced errors recently with pushing 2.1 images
    (https://issues.apache.org/jira/browse/INFRA-22124) and during
    investigation it turned out, that it is possible now to use "/"
    in the name of the image, and while it still does not introduce
    multiple nesting levels and folder structure, the UI of GitHub
    treats it like that and if you have image which starts wiht
    "airflow/", the airflow prefix is stripped out and you can also
    have even more "/" in then name to introduce further hierarchy.
    
    Since we have to change image naming convention due to (still
    unresolved) bug with no permission to push the v2-1-test image
    we've decided to change naming convention for all our cache
    images to follow this - now available - "/" connvention to make
    it better structured and easier to manage/understand.
    
    Some more optimisations are implemented - Python, prod-build and
    ci-manifest images are only pushed when "latest" image is prepared.
    They are not needed for the COMMIT builds because we only need
    final images for those builds. This simplified the code quite
    a bit.
    
    CONTINUE_ON_PIP_CHECK_ERROR variable has been removed in favour
    of ignoring pip error when installing dependencies from branch
    tip. This might potentially happen for a short while when
    new changes have been merged, but constraints were not yet
    regenerated and we have conflicting dependencies.
    
    The .dockerignore was reviewed and builds were optimized for
    people who locally built provider packages and documentation,
    by excluding unnecessary files. Some instructions which run after
    the COPY . which did not need sourcer were moved before
    the COPY command. Those optimisatiions save 30-40 seconds of
    overhead when building the image (especially when you build
    images incrementally rather than rebuilding from scratch).
    
    PIP and HELM versions have been updated to latest available.
    
    Backwards-compatibility was implemented to allow PRs that have
    not been upgraded to continue building after this one is merged,
    also a workaround has been implemented to make this change
    to work even if it is not merged yet to main.
    
    This "legacy" mode will be removed in ~week when everybody rebase
    on top of main.
    
    Documentation is updated reflecting those changes.
    
    (cherry picked from commit e04c2e3872aa30ed042d3f9bf66d8020cf9c2acb)
---
 .dockerignore                                      |   4 +-
 .github/workflows/build-images.yml                 |  19 ++-
 .github/workflows/ci.yml                           | 161 +++++++--------------
 BREEZE.rst                                         |   4 +-
 CI.rst                                             |  51 ++++---
 Dockerfile                                         |  27 ++--
 Dockerfile.ci                                      |  54 +++----
 IMAGES.rst                                         |  30 ++--
 README.md                                          |   4 +-
 breeze                                             |  23 +--
 breeze-complete                                    |   2 +-
 dev/retag_docker_images.py                         |   9 +-
 docs/docker-stack/build-arg-ref.rst                |   6 -
 scripts/ci/images/ci_prepare_ci_image_on_ci.sh     |  19 +--
 scripts/ci/images/ci_prepare_prod_image_on_ci.sh   |  29 +---
 .../ci_push_legacy_ci_images.sh}                   |  25 +---
 .../ci_push_legacy_prod_images.sh}                 |  25 +---
 .../images/ci_wait_for_and_verify_all_ci_images.sh |   2 +
 .../ci_wait_for_and_verify_all_prod_images.sh      |   2 +
 .../ci/images/ci_wait_for_and_verify_ci_image.sh   |  27 ++--
 .../ci/images/ci_wait_for_and_verify_prod_image.sh |  32 ++--
 scripts/ci/libraries/_build_images.sh              | 112 ++++++++------
 scripts/ci/libraries/_initialization.sh            |  26 +---
 scripts/ci/libraries/_kind.sh                      |  16 +-
 scripts/ci/libraries/_parallel.sh                  |   7 +-
 scripts/ci/libraries/_push_pull_remove_images.sh   | 117 +++++++++------
 scripts/ci/libraries/_script_init.sh               |   2 +-
 scripts/ci/selective_ci_checks.sh                  |  10 +-
 scripts/ci/tools/fix_ownership.sh                  |  14 +-
 scripts/docker/install_additional_dependencies.sh  |   5 +-
 scripts/docker/install_airflow.sh                  |   4 +-
 ...nstall_airflow_dependencies_from_branch_tip.sh} |  11 +-
 .../docker/install_from_docker_context_files.sh    |   2 +-
 33 files changed, 421 insertions(+), 460 deletions(-)

diff --git a/.dockerignore b/.dockerignore
index d10cfbc..f6113e2 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -40,9 +40,6 @@
 !scripts/in_container
 !scripts/docker
 
-# Add provider packages to the context
-!provider_packages
-
 # Add tests and kubernetes_tests to context.
 !tests
 !kubernetes_tests
@@ -129,3 +126,4 @@ airflow/www/static/docs
 # Exclude docs generated files
 docs/_build/
 docs/_api/
+docs/_doctrees/
diff --git a/.github/workflows/build-images.yml b/.github/workflows/build-images.yml
index f29e199..d2a7b97 100644
--- a/.github/workflows/build-images.yml
+++ b/.github/workflows/build-images.yml
@@ -148,7 +148,6 @@ jobs:
       BACKEND: postgres
       PYTHON_MAJOR_MINOR_VERSION: ${{ matrix.python-version }}
       UPGRADE_TO_NEWER_DEPENDENCIES: ${{ needs.build-info.outputs.upgradeToNewerDependencies }}
-      CONTINUE_ON_PIP_CHECK_FAILURE: "true"
       DOCKER_CACHE: ${{ needs.build-info.outputs.cacheDirective }}
       CHECK_IF_BASE_PYTHON_IMAGE_UPDATED: >
         ${{ github.event_name == 'pull_request_target' && 'false' || 'true' }}
@@ -204,6 +203,10 @@ jobs:
         run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
       - name: "Push CI images ${{ matrix.python-version }}:${{ env.TARGET_COMMIT_SHA }}"
         run: ./scripts/ci/images/ci_push_ci_images.sh
+      # Remove me ~ 7 August 2021
+      - name: "Push Legacy CI images ${{ matrix.python-version }}:${{ env.TARGET_COMMIT_SHA }}"
+        run: ./scripts/ci/images/ci_push_legacy_ci_images.sh
+        if: github.event_name == 'pull_request_target'
 
   build-prod-images:
     permissions:
@@ -230,8 +233,11 @@ jobs:
       VERSION_SUFFIX_FOR_PYPI: ".dev0"
     steps:
       - name: Set envs
+        # Set pull image tag for CI image build, in order to pull the image pushed
+        # Just a moment ago by build-ci-images job
         run: |
           echo "GITHUB_REGISTRY_PUSH_IMAGE_TAG=${TARGET_COMMIT_SHA}" >> "$GITHUB_ENV"
+          echo "GITHUB_REGISTRY_PULL_IMAGE_TAG=${TARGET_COMMIT_SHA}" >> "$GITHUB_ENV"
       - uses: actions/checkout@v2
         with:
           ref: ${{ env.TARGET_COMMIT_SHA }}
@@ -279,10 +285,21 @@ jobs:
         # Pull images built in the previous step
         env:
           GITHUB_REGISTRY_WAIT_FOR_IMAGE: "true"
+          # Here we are using PULL_IMAGE_TAG set in the environment variables above
       - name: "Build PROD images ${{ matrix.python-version }}:${{ env.TARGET_COMMIT_SHA }}"
         run: ./scripts/ci/images/ci_prepare_prod_image_on_ci.sh
+        env:
+          # GITHUB_REGISTRY_PULL_IMAGE_TAG is overriden to latest in order to build PROD image using "latest"
+          GITHUB_REGISTRY_PULL_IMAGE_TAG: "latest"
       - name: "Push PROD images ${{ matrix.python-version }}:${{ env.TARGET_COMMIT_SHA }}"
         run: ./scripts/ci/images/ci_push_production_images.sh
+        env:
+          # GITHUB_REGISTRY_PULL_IMAGE_TAG is overriden to latest in order to build PROD image using "latest"
+          GITHUB_REGISTRY_PULL_IMAGE_TAG: "latest"
+      # Remove me ~ 7 August 2021
+      - name: "Push Legacy PROD images ${{ matrix.python-version }}:${{ env.TARGET_COMMIT_SHA }}"
+        run: ./scripts/ci/images/ci_push_legacy_prod_images.sh
+        if: github.event_name == 'pull_request_target'
 
   cancel-on-ci-build:
     permissions:
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 8c31784..7228f37 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -561,7 +561,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           PACKAGE_FORMAT: "sdist"
 
   tests-helm:
-    timeout-minutes: 20
+    timeout-minutes: 40
     name: "Python unit tests for helm chart"
     runs-on: ${{ fromJson(needs.build-info.outputs.runsOn) }}
     needs: [build-info, ci-images]
@@ -1045,108 +1045,6 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
           path: /tmp/kind_logs_*
           retention-days: 7
 
-  push-prod-images-to-github-registry:
-    permissions:
-      packages: write
-    timeout-minutes: 10
-    name: "Push PROD images as cache to GitHub Registry"
-    runs-on: ${{ fromJson(needs.build-info.outputs.runsOn) }}
-    needs:
-      - build-info
-      - static-checks
-      - tests-sqlite
-      - tests-postgres
-      - tests-mysql
-      - tests-kubernetes
-      - prod-images
-      - docs
-    if: >
-      (github.ref == 'refs/heads/main' || github.ref == 'refs/heads/v1-10-test' ||
-      github.ref == 'refs/heads/v2-0-test' || github.ref == 'refs/heads/v2-1-test') &&
-      github.event_name != 'schedule'
-    strategy:
-      matrix:
-        python-version: ${{ fromJson(needs.build-info.outputs.pythonVersions) }}
-    env:
-      RUNS_ON: ${{ fromJson(needs.build-info.outputs.runsOn) }}
-      PYTHON_MAJOR_MINOR_VERSION: ${{ matrix.python-version }}
-      GITHUB_REGISTRY_PUSH_IMAGE_TAG: "latest"
-    steps:
-      - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
-        uses: actions/checkout@v2
-        with:
-          persist-credentials: false
-      - name: "Setup python"
-        uses: actions/setup-python@v2
-        with:
-          python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
-      - name: "Free space"
-        run: ./scripts/ci/tools/free_space.sh
-      - name: Set push-python-image
-        id: push-python-image
-        run: |
-          if [[ "${REF}" == 'refs/head/main' || "${REF}" == 'refs/head/main' ]]; then
-              echo "::set-output name=wanted::true"
-          else
-              echo "::set-output name=wanted::false"
-          fi
-        env:
-          REF: ${{ github.ref }}
-      - name:
-          "Prepare PROD image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
-        run: ./scripts/ci/images/ci_prepare_prod_image_on_ci.sh
-        env:
-          # Since we are going to push both final image and build image segment, we need to pull the
-          # build image, in case we are pulling from registry rather than building.
-          WAIT_FOR_PROD_BUILD_IMAGE: "true"
-          WAIT_FOR_PYTHON_BASE_IMAGE: ${{ steps.push-python-image.outputs.wanted}}
-      - name: "Push PROD images ${{ matrix.python-version }}:${{ env.GITHUB_REGISTRY_PUSH_IMAGE_TAG }}"
-        run: ./scripts/ci/images/ci_push_production_images.sh
-        env:
-          PUSH_PYTHON_BASE_IMAGE: ${{ steps.push-python-image.outputs.wanted}}
-
-  push-ci-images-to-github-registry:
-    permissions:
-      packages: write
-    timeout-minutes: 10
-    name: "Push CI images as cache to GitHub Registry"
-    runs-on: ${{ fromJson(needs.build-info.outputs.runsOn) }}
-    needs:
-      - build-info
-      - static-checks
-      - tests-sqlite
-      - tests-postgres
-      - tests-mysql
-      - tests-kubernetes
-      - ci-images
-      - docs
-    if: >
-      (github.ref == 'refs/heads/main' || github.ref == 'refs/heads/v1-10-test' ||
-      github.ref == 'refs/heads/v2-0-test' || github.ref == 'refs/heads/v2-1-test') &&
-      github.event_name != 'schedule'
-    strategy:
-      matrix:
-        python-version: ${{ fromJson(needs.build-info.outputs.pythonVersions) }}
-    env:
-      RUNS_ON: ${{ fromJson(needs.build-info.outputs.runsOn) }}
-      PYTHON_MAJOR_MINOR_VERSION: ${{ matrix.python-version }}
-      GITHUB_REGISTRY_PUSH_IMAGE_TAG: "latest"
-    steps:
-      - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
-        uses: actions/checkout@v2
-        with:
-          persist-credentials: false
-      - name: "Setup python"
-        uses: actions/setup-python@v2
-        with:
-          python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
-      - name: "Free space"
-        run: ./scripts/ci/tools/free_space.sh
-      - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
-        run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
-      - name: "Push CI image ${{ matrix.python-version }}:${{ env.GITHUB_REGISTRY_PUSH_IMAGE_TAG }}"
-        run: ./scripts/ci/images/ci_push_ci_images.sh
-
   constraints:
     permissions:
       contents: write
@@ -1166,10 +1064,8 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
       RUNS_ON: ${{ fromJson(needs.build-info.outputs.runsOn) }}
       PYTHON_MAJOR_MINOR_VERSION: ${{ matrix.python-version }}
       CURRENT_PYTHON_MAJOR_MINOR_VERSIONS_AS_STRING: ${{needs.build-info.outputs.pythonVersionsListAsString}}
-    # Only run it for direct pushes
-    if: >
-      github.ref == 'refs/heads/main' || github.ref == 'refs/heads/v1-10-test' ||
-      github.ref == 'refs/heads/v2-0-test' || github.ref == 'refs/heads/v2-1-test'
+    # Only run it for direct pushes and scheduled builds
+    if: github.event_name == 'push' || github.event_name == 'schedule'
     steps:
       - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
         uses: actions/checkout@v2
@@ -1203,17 +1099,68 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
       - name: "Set constraints branch name"
         id: constraints-branch
         run: ./scripts/ci/constraints/ci_branch_constraints.sh
+      # only actually push it when we are in apache/airflow repository
       - name: Checkout ${{ steps.constraints-branch.outputs.branch }}
         uses: actions/checkout@v2
+        if: github.repository == 'apache/airflow'
         with:
           path: "repo"
           ref: ${{ steps.constraints-branch.outputs.branch }}
           persist-credentials: false
       - name: "Commit changed constraint files for ${{needs.build-info.outputs.pythonVersions}}"
         run: ./scripts/ci/constraints/ci_commit_constraints.sh
+        if: github.repository == 'apache/airflow'
       - name: "Push changes"
         uses: ./.github/actions/github-push-action
+        if: github.repository == 'apache/airflow'
         with:
           github_token: ${{ secrets.GITHUB_TOKEN }}
           branch: ${{ steps.constraints-branch.outputs.branch }}
           directory: "repo"
+
+  # Push images to GitHub Registry in Apache repository, if all tests are successful and build
+  # is executed as result of direct push to "main" or one of the "test" branches
+  # It actually rebuilds all images using just-pushed constraints if they changed
+  # It will also check if a new python image was released and will pull the latest one if needed
+  # Same as build-images.yaml
+  push-images-to-github-registry:
+    permissions:
+      packages: write
+    timeout-minutes: 10
+    name: "Push images as cache to GitHub Registry"
+    runs-on: ${{ fromJson(needs.build-info.outputs.runsOn) }}
+    needs:
+      - build-info
+      - constraints
+      - docs
+    # Only run it for direct pushes and scheduled builds
+    if: github.event_name == 'push' || github.event_name == 'schedule'
+    strategy:
+      matrix:
+        python-version: ${{ fromJson(needs.build-info.outputs.pythonVersions) }}
+    env:
+      RUNS_ON: ${{ fromJson(needs.build-info.outputs.runsOn) }}
+      PYTHON_MAJOR_MINOR_VERSION: ${{ matrix.python-version }}
+      GITHUB_REGISTRY_PULL_IMAGE_TAG: "latest"
+      GITHUB_REGISTRY_PUSH_IMAGE_TAG: "latest"
+      PUSH_PYTHON_BASE_IMAGE: "true"
+      CHECK_IF_BASE_PYTHON_IMAGE_UPDATED: "true"
+    steps:
+      - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
+        uses: actions/checkout@v2
+        with:
+          persist-credentials: false
+      - name: "Setup python"
+        uses: actions/setup-python@v2
+        with:
+          python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
+      - name: "Free space"
+        run: ./scripts/ci/tools/free_space.sh
+      - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:latest"
+        run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
+      - name: "Prepare PROD image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:latest"
+        run: ./scripts/ci/images/ci_prepare_prod_image_on_ci.sh
+      - name: "Push CI image ${{ env.PYTHON_MAJOR_MINOR_VERSION }}:latest"
+        run: ./scripts/ci/images/ci_push_ci_images.sh
+      - name: "Push PROD images ${{ env.PYTHON_MAJOR_MINOR_VERSION }}:latest"
+        run: ./scripts/ci/images/ci_push_production_images.sh
diff --git a/BREEZE.rst b/BREEZE.rst
index 90d3f0b..79f27b4 100644
--- a/BREEZE.rst
+++ b/BREEZE.rst
@@ -2382,9 +2382,9 @@ This is the current syntax for  `./breeze <./breeze>`_:
           Helm version - only used in case one of kind-cluster commands is used.
           One of:
 
-                 v3.2.4
+                 v3.6.3
 
-          Default: v3.2.4
+          Default: v3.6.3
 
   --executor EXECUTOR
           Executor to use in a kubernetes cluster.
diff --git a/CI.rst b/CI.rst
index fedd300..6a704d0 100644
--- a/CI.rst
+++ b/CI.rst
@@ -568,12 +568,11 @@ This workflow is a regular workflow that performs all checks of Airflow code.
 +---------------------------+----------------------------------------------+-------+-------+------+
 | Tests Kubernetes          | Run Kubernetes test                          | Yes(2)| Yes   | Yes  |
 +---------------------------+----------------------------------------------+-------+-------+------+
-| Push PROD images          | Pushes PROD images to GitHub Registry (4)    | -     | Yes   | -    |
-+---------------------------+----------------------------------------------+-------+-------+------+
-| Push CI images            | Pushes CI images to GitHub Registry (4)      | -     | Yes   | -    |
-+---------------------------+----------------------------------------------+-------+-------+------+
 | Constraints               | Upgrade constraints to latest ones (4)       | -     | Yes   | Yes  |
 +---------------------------+----------------------------------------------+-------+-------+------+
+| Push images               | Pushes latest images to GitHub Registry (4)  | -     | Yes   | Yes  |
++---------------------------+----------------------------------------------+-------+-------+------+
+
 
 Comments:
 
@@ -584,8 +583,8 @@ Comments:
      You can set it to "false" to disable using shared images - this is slower though as the images
      are rebuilt in every job that needs them.
  (4) PROD and CI images are pushed as "latest" to GitHub Container registry and constraints are upgraded
-     only if all tests are successful. Note that images are not pushed in CRON jobs because they are rebuilt
-     from scratch and we want to push incremental changes to the Github Container registry.
+     only if all tests are successful. The images are rebuilt in this step using constraints pushed
+     in the previous step.
 
 CodeQL scan
 -----------
@@ -620,7 +619,9 @@ with the COMMIT_SHA id for images that were used in particular build.
 The image names follow the patterns (except the Python image, all the images are stored in
 https://ghcr.io/ in ``apache`` organization.
 
-The packages are available under:
+The packages are available under (CONTAINER_NAME is url-encoded name of the image). Note that "/" are
+supported now in the ``ghcr.io`` as apart of the image name within ``apache`` organization, but they
+have to be percent-encoded when you access them via UI (/ = %2F)
 
 ``https://github.com/apache/airflow/pkgs/container/<CONTAINER_NAME>``
 
@@ -631,26 +632,30 @@ The packages are available under:
 | (DockerHub)  |                                                          | Python maintainer release new versions of those image    |
 |              |                                                          | with security fixes every few weeks in DockerHub.        |
 +--------------+----------------------------------------------------------+----------------------------------------------------------+
-| Airflow      | airflow-python-v2:<X.Y>-slim-buster                      | Version of python base image used in Airflow Builds      |
-| python base  | or                                                       | We keep the "latest" version there and also each build   |
-| image        | airflow-python-v2:<X.Y>-slim-buster-<COMMIT_SHA>         | has an associated specific python version that was used. |
+| Airflow      | airflow/<BRANCH>/python:<X.Y>-slim-buster                | Version of python base image used in Airflow Builds      |
+| python base  |                                                          | We keep the "latest" version only to mark last "good"    |
+| image        |                                                          | python base that went through testing and was pushed.    |
 +--------------+----------------------------------------------------------+----------------------------------------------------------+
-| CI image     | airflow-<BRANCH>-python<X.Y>-ci-v2:latest                | CI image - this is the image used for most of the tests. |
-|              | or                                                       | Contains all provider dependencies and tools useful      |
-|              | airflow-<BRANCH>-python<X.Y>-ci-v2:<COMMIT_SHA>          | For testing. This image is used in Breeze.               |
+| PROD Build   | airflow/<BRANCH>/prod-build/python<X.Y>:latest           | Production Build image - this is the "build" stage of    |
+| image        |                                                          | production image. It contains build-essentials and all   |
+|              |                                                          | necessary apt packages to build/install PIP packages.    |
+|              |                                                          | We keep the "latest" version only to speed up builds.    |
 +--------------+----------------------------------------------------------+----------------------------------------------------------+
-| Manifest     | airflow-<BRANCH>-python<X.Y>-ci-v2-manifest:latest       | CI manifest image - this is the image used to optimize   |
-| CI image     | or                                                       | pulls and builds for Breeze development environment      |
-|              | airflow-<BRANCH>-python<X.Y>-ci-v2-manifest:<COMMIT_SHA> | They store hash indicating whether the image will be     |
+| Manifest     | airflow/<BRANCH>/ci-manifest/python<X.Y>:latest          | CI manifest image - this is the image used to optimize   |
+| CI image     |                                                          | pulls and builds for Breeze development environment      |
+|              |                                                          | They store hash indicating whether the image will be     |
 |              |                                                          | faster to build or pull.                                 |
+|              |                                                          | We keep the "latest" version only to help breeze to      |
+|              |                                                          | check if new image should be pulled.                     |
 +--------------+----------------------------------------------------------+----------------------------------------------------------+
-| PROD Build   | airflow-<BRANCH>-python<X.Y>-build-v2:latest             | Production Build image - this is the "build" segment of  |
-| image        | or                                                       | production image. It contains build-essentials and all   |
-|              | airflow-<BRANCH>-python<X.Y>-build-v2:<COMMIT_SHA>       | necessary packages to install PIP packages.              |
+| CI image     | airflow/<BRANCH>/ci/python<X.Y>:latest                   | CI image - this is the image used for most of the tests. |
+|              | or                                                       | Contains all provider dependencies and tools useful      |
+|              | airflow/<BRANCH>/ci/python<X.Y>:<COMMIT_SHA>             | For testing. This image is used in Breeze.               |
 +--------------+----------------------------------------------------------+----------------------------------------------------------+
-| PROD image   | airflow-<BRANCH>-python<X.Y>-v2:latest                   | Production image. This is the actual production image    |
+|              |                                                          | faster to build or pull.                                 |
+| PROD image   | airflow/<BRANCH>/prod/python<X.Y>:latest                 | Production image. This is the actual production image    |
 |              | or                                                       | optimized for size.                                      |
-|              | airflow-<BRANCH>-python<X.Y>-v2:<COMMIT_SHA>             | It contains only compiled libraries and minimal set of   |
+|              | airflow/<BRANCH>/prod/python<X.Y>:<COMMIT_SHA>           | It contains only compiled libraries and minimal set of   |
 |              |                                                          | dependencies to run Airflow.                             |
 +--------------+----------------------------------------------------------+----------------------------------------------------------+
 
@@ -668,9 +673,9 @@ For example knowing that the CI build was for commit ``cd27124534b46c9688a1d89e7
 
 .. code-block:: bash
 
-  docker pull ghcr.io/apache/airflow-main-python3.6-ci:cd27124534b46c9688a1d89e75fcd137ab5137e3
+  docker pull ghcr.io/apache/airflow/main/ci/python3.6:cd27124534b46c9688a1d89e75fcd137ab5137e3
 
-  docker run -it ghcr.io/apache/airflow-main-python3.6-ci:cd27124534b46c9688a1d89e75fcd137ab5137e3
+  docker run -it ghcr.io/apache/airflow/main/ci/python3.6:cd27124534b46c9688a1d89e75fcd137ab5137e3
 
 
 But you usually need to pass more variables and complex setup if you want to connect to a database or
diff --git a/Dockerfile b/Dockerfile
index 66c9649..210918c 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -44,7 +44,8 @@ ARG AIRFLOW_GID="50000"
 
 ARG PYTHON_BASE_IMAGE="python:3.6-slim-buster"
 
-ARG AIRFLOW_PIP_VERSION=21.1.2
+ARG AIRFLOW_PIP_VERSION=21.2.2
+ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
 
 # By default PIP has progress bar but you can disable it.
 ARG PIP_PROGRESS_BAR="on"
@@ -108,12 +109,13 @@ ARG DEV_APT_COMMAND="\
     && curl https://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - > /dev/null \
     && echo 'deb https://dl.yarnpkg.com/debian/ stable main' > /etc/apt/sources.list.d/yarn.list"
 ARG ADDITIONAL_DEV_APT_COMMAND="echo"
+ARG ADDITIONAL_DEV_APT_ENV=""
 
 ENV DEV_APT_DEPS=${DEV_APT_DEPS} \
     ADDITIONAL_DEV_APT_DEPS=${ADDITIONAL_DEV_APT_DEPS} \
     DEV_APT_COMMAND=${DEV_APT_COMMAND} \
     ADDITIONAL_DEV_APT_COMMAND=${ADDITIONAL_DEV_APT_COMMAND} \
-    ADDITIONAL_DEV_APT_ENV=""
+    ADDITIONAL_DEV_APT_ENV=${ADDITIONAL_DEV_APT_ENV}
 
 # Note missing man directories on debian-buster
 # https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=863199
@@ -216,7 +218,7 @@ ENV AIRFLOW_PRE_CACHED_PIP_PACKAGES=${AIRFLOW_PRE_CACHED_PIP_PACKAGES} \
 RUN bash /scripts/docker/install_pip_version.sh; \
     if [[ ${AIRFLOW_PRE_CACHED_PIP_PACKAGES} == "true" && \
           ${UPGRADE_TO_NEWER_DEPENDENCIES} == "false" ]]; then \
-        bash /scripts/docker/install_airflow_from_branch_tip.sh; \
+        bash /scripts/docker/install_airflow_dependencies_from_branch_tip.sh; \
     fi
 
 COPY ${AIRFLOW_SOURCES_FROM} ${AIRFLOW_SOURCES_TO}
@@ -236,14 +238,11 @@ ARG INSTALL_FROM_PYPI="true"
 # * pyjwt<2.0.0: flask-jwt-extended requires it
 # * dill<0.3.3 required by apache-beam
 ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="pyjwt<2.0.0 dill<0.3.3 certifi<2021.0.0"
-ARG CONTINUE_ON_PIP_CHECK_FAILURE="false"
-
 
 ENV ADDITIONAL_PYTHON_DEPS=${ADDITIONAL_PYTHON_DEPS} \
     INSTALL_FROM_DOCKER_CONTEXT_FILES=${INSTALL_FROM_DOCKER_CONTEXT_FILES} \
     INSTALL_FROM_PYPI=${INSTALL_FROM_PYPI} \
-    EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS=${EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS} \
-    CONTINUE_ON_PIP_CHECK_FAILURE=${CONTINUE_ON_PIP_CHECK_FAILURE}
+    EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS=${EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS}
 
 WORKDIR /opt/airflow
 
@@ -276,7 +275,7 @@ RUN if [[ -f /docker-context-files/requirements.txt ]]; then \
 
 ARG BUILD_ID
 ARG COMMIT_SHA
-ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
+ARG AIRFLOW_IMAGE_REPOSITORY
 ARG AIRFLOW_IMAGE_DATE_CREATED
 
 ENV BUILD_ID=${BUILD_ID} COMMIT_SHA=${COMMIT_SHA}
@@ -293,15 +292,14 @@ LABEL org.apache.airflow.distro="debian" \
   org.opencontainers.image.created=${AIRFLOW_IMAGE_DATE_CREATED} \
   org.opencontainers.image.authors="dev@airflow.apache.org" \
   org.opencontainers.image.url="https://airflow.apache.org" \
-  org.opencontainers.image.documentation="https://airflow.apache.org/docs/apache-airflow/stable/production-deployment.html" \
-  org.opencontainers.image.source="https://github.com/apache/airflow" \
+  org.opencontainers.image.documentation="https://airflow.apache.org/docs/docker-stack/index.html" \
   org.opencontainers.image.version="${AIRFLOW_VERSION}" \
   org.opencontainers.image.revision="${COMMIT_SHA}" \
   org.opencontainers.image.vendor="Apache Software Foundation" \
   org.opencontainers.image.licenses="Apache-2.0" \
   org.opencontainers.image.ref.name="airflow-build-image" \
   org.opencontainers.image.title="Build Image Segment for Production Airflow Image" \
-  org.opencontainers.image.description="Installed Apache Airflow with build-time dependencies"
+  org.opencontainers.image.description="Reference build-time dependencies image for production-ready Apache Airflow image"
 
 ##############################################################################################
 # This is the actual Airflow image - much smaller than the build one. We copy
@@ -379,7 +377,7 @@ ARG AIRFLOW_HOME
 ARG AIRFLOW_INSTALLATION_METHOD="apache-airflow"
 ARG BUILD_ID
 ARG COMMIT_SHA
-ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
+ARG AIRFLOW_IMAGE_REPOSITORY
 ARG AIRFLOW_IMAGE_DATE_CREATED
 # By default PIP will install everything in ~/.local
 ARG PIP_USER="true"
@@ -468,15 +466,14 @@ LABEL org.apache.airflow.distro="debian" \
   org.opencontainers.image.created=${AIRFLOW_IMAGE_DATE_CREATED} \
   org.opencontainers.image.authors="dev@airflow.apache.org" \
   org.opencontainers.image.url="https://airflow.apache.org" \
-  org.opencontainers.image.documentation="https://airflow.apache.org/docs/apache-airflow/stable/production-deployment.html" \
-  org.opencontainers.image.source="https://github.com/apache/airflow" \
+  org.opencontainers.image.documentation="https://airflow.apache.org/docs/docker-stack/index.html" \
   org.opencontainers.image.version="${AIRFLOW_VERSION}" \
   org.opencontainers.image.revision="${COMMIT_SHA}" \
   org.opencontainers.image.vendor="Apache Software Foundation" \
   org.opencontainers.image.licenses="Apache-2.0" \
   org.opencontainers.image.ref.name="airflow" \
   org.opencontainers.image.title="Production Airflow Image" \
-  org.opencontainers.image.description="Installed Apache Airflow"
+  org.opencontainers.image.description="Reference, production-ready Apache Airflow image"
 
 
 ENTRYPOINT ["/usr/bin/dumb-init", "--", "/entrypoint"]
diff --git a/Dockerfile.ci b/Dockerfile.ci
index 7b1dbf1..cca868e 100644
--- a/Dockerfile.ci
+++ b/Dockerfile.ci
@@ -21,7 +21,8 @@ FROM ${PYTHON_BASE_IMAGE} as main
 SHELL ["/bin/bash", "-o", "pipefail", "-e", "-u", "-x", "-c"]
 
 ARG PYTHON_BASE_IMAGE="python:3.6-slim-buster"
-ARG AIRFLOW_VERSION="2.1.0.dev0"
+ARG AIRFLOW_VERSION="2.1.3.dev0"
+ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
 # By increasing this number we can do force build of all dependencies
 ARG DEPENDENCIES_EPOCH_NUMBER="6"
 
@@ -98,6 +99,7 @@ RUN mkdir -pv /usr/share/man/man1 \
 COPY scripts/docker/*.sh /scripts/docker/
 RUN bash /scripts/docker/install_mysql.sh dev \
     && adduser airflow \
+    && echo "airflow:airflow" | chpasswd \
     && echo "airflow ALL=(ALL) NOPASSWD: ALL" > /etc/sudoers.d/airflow \
     && chmod 0440 /etc/sudoers.d/airflow
 
@@ -125,6 +127,15 @@ ARG RUNTIME_APT_DEPS="\
       unzip \
       vim \
       xxd"
+
+# Install Helm
+ARG HELM_VERSION="v3.6.3"
+
+RUN SYSTEM=$(uname -s | tr '[:upper:]' '[:lower:]') \
+    && HELM_URL="https://get.helm.sh/helm-${HELM_VERSION}-${SYSTEM}-amd64.tar.gz" \
+    && curl --location "${HELM_URL}" | tar -xvz -O "${SYSTEM}"-amd64/helm > /usr/local/bin/helm \
+    && chmod +x /usr/local/bin/helm
+
 ARG ADDITIONAL_RUNTIME_APT_DEPS=""
 ARG RUNTIME_APT_COMMAND=""
 ARG ADDITIONAL_RUNTIME_APT_COMMAND=""
@@ -207,7 +218,7 @@ ARG AIRFLOW_PRE_CACHED_PIP_PACKAGES="true"
 # By default in the image, we are installing all providers when installing from sources
 ARG INSTALL_PROVIDERS_FROM_SOURCES="true"
 ARG INSTALL_FROM_PYPI="true"
-ARG AIRFLOW_PIP_VERSION=21.1.2
+ARG AIRFLOW_PIP_VERSION=21.2.2
 # Setup PIP
 # By default PIP install run without cache to make image smaller
 ARG PIP_NO_CACHE_DIR="true"
@@ -270,7 +281,7 @@ ENV EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS=${EAGER_UPGRADE_ADDITIONAL_REQUIREMENT
 RUN bash /scripts/docker/install_pip_version.sh; \
     if [[ ${AIRFLOW_PRE_CACHED_PIP_PACKAGES} == "true" && \
           ${UPGRADE_TO_NEWER_DEPENDENCIES} == "false" ]]; then \
-        bash /scripts/docker/install_airflow_from_branch_tip.sh; \
+        bash /scripts/docker/install_airflow_dependencies_from_branch_tip.sh; \
     fi
 
 # Generate random hex dump file so that we can determine whether it's faster to rebuild the image
@@ -299,8 +310,6 @@ COPY setup.cfg ${AIRFLOW_SOURCES}/setup.cfg
 
 COPY airflow/__init__.py ${AIRFLOW_SOURCES}/airflow/__init__.py
 
-ARG CONTINUE_ON_PIP_CHECK_FAILURE="false"
-
 # The goal of this line is to install the dependencies from the most current setup.py from sources
 # This will be usually incremental small set of packages in CI optimized build, so it will be very fast
 # In non-CI optimized build this will install all dependencies before installing sources.
@@ -325,11 +334,13 @@ RUN chmod a+x /entrypoint
 
 COPY scripts/docker/load.bash /opt/bats/lib/
 
-# We can copy everything here. The Context is filtered by dockerignore. This makes sure we are not
-# copying over stuff that is accidentally generated or that we do not need (such as egg-info)
-# if you want to add something that is missing and you expect to see it in the image you can
-# add it with ! in .dockerignore next to the airflow, test etc. directories there
-COPY . ${AIRFLOW_SOURCES}/
+# Additional python deps to install
+ARG ADDITIONAL_PYTHON_DEPS=""
+
+RUN bash /scripts/docker/install_pip_version.sh; \
+    if [[ -n "${ADDITIONAL_PYTHON_DEPS}" ]]; then \
+            bash /scripts/docker/install_additional_dependencies.sh; \
+    fi
 
 # Install autocomplete for airflow
 RUN if command -v airflow; then \
@@ -339,27 +350,16 @@ RUN if command -v airflow; then \
 # Install autocomplete for Kubectl
 RUN echo "source /etc/bash_completion" >> ~/.bashrc
 
-WORKDIR ${AIRFLOW_SOURCES}
-
-# Install Helm
-ARG HELM_VERSION="v3.2.4"
-
-RUN SYSTEM=$(uname -s | tr '[:upper:]' '[:lower:]') \
-    && HELM_URL="https://get.helm.sh/helm-${HELM_VERSION}-${SYSTEM}-amd64.tar.gz" \
-    && curl --location "${HELM_URL}" | tar -xvz -O "${SYSTEM}"-amd64/helm > /usr/local/bin/helm \
-    && chmod +x /usr/local/bin/helm
-
-# Additional python deps to install
-ARG ADDITIONAL_PYTHON_DEPS=""
+# We can copy everything here. The Context is filtered by dockerignore. This makes sure we are not
+# copying over stuff that is accidentally generated or that we do not need (such as egg-info)
+# if you want to add something that is missing and you expect to see it in the image you can
+# add it with ! in .dockerignore next to the airflow, test etc. directories there
+COPY . ${AIRFLOW_SOURCES}/
 
-RUN bash /scripts/docker/install_pip_version.sh; \
-    if [[ -n "${ADDITIONAL_PYTHON_DEPS}" ]]; then \
-            bash /scripts/docker/install_additional_dependencies.sh; \
-    fi
+WORKDIR ${AIRFLOW_SOURCES}
 
 ARG BUILD_ID
 ARG COMMIT_SHA
-ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
 ARG AIRFLOW_IMAGE_DATE_CREATED
 
 ENV PATH="/files/bin/:/opt/airflow/scripts/in_container/bin/:${HOME}:${PATH}" \
diff --git a/IMAGES.rst b/IMAGES.rst
index 82e6989..a1b1ace 100644
--- a/IMAGES.rst
+++ b/IMAGES.rst
@@ -24,7 +24,7 @@ Airflow has two main images (build from Dockerfiles):
 
   * Production image (Dockerfile) - that can be used to build your own production-ready Airflow installation
     You can read more about building and using the production image in the
-    `Production Deployments <https://airflow.apache.org/docs/apache-airflow/stable/production-deployment.html>`_ document.
+    `Docker stack <https://airflow.apache.org/docs/docker-stack/index.html>`_ documentation.
     The image is built using `Dockerfile <Dockerfile>`_
 
   * CI image (Dockerfile.ci) - used for running tests and local development. The image is built using
@@ -246,19 +246,21 @@ Images with a commit SHA (built for pull requests and pushes)
 
 .. code-block:: bash
 
-  ghcr.io/apache/airflow-<BRANCH>-pythonX.Y-ci-v2:<COMMIT_SHA>    - for CI images
-  ghcr.io/apache/airflow-<BRANCH>-pythonX.Y-v2:<COMMIT_SHA>       - for production images
-  ghcr.io/apache/airflow-<BRANCH>-pythonX.Y-build-v2:<COMMIT_SHA> - for production build stage
-  ghcr.io/apache/airflow-python-v2:X.Y-slim-buster-<COMMIT_SHA>   - for base Python images
+  ghcr.io/apache/airflow/<BRANCH>/ci/python<X.Y>:<COMMIT_SHA>         - for CI images
+  ghcr.io/apache/airflow/<BRANCH>/prod/python<X.Y>:<COMMIT_SHA>       - for production images
+
+We do not push Base Python images and prod-build images when we prepare COMMIT builds, because those
+images are never rebuilt locally, so there is no need to store base images specific for those builds.
 
 Latest images (pushed when main merge succeeds):
 
 .. code-block:: bash
 
-  ghcr.io/apache/airflow-<BRANCH>-pythonX.Y-ci-v2:latest    - for CI images
-  ghcr.io/apache/airflow-<BRANCH>-pythonX.Y-v2:latest       - for production images
-  ghcr.io/apache/airflow-<BRANCH>-pythonX.Y-build-v2:latest - for production build stage
-  ghcr.io/apache/airflow-python-v2:X.Y-slim-buster          - for base Python images
+  ghcr.io/apache/airflow/<BRANCH>/python:<X.Y>-slim-buster        - for base Python images
+  ghcr.io/apache/airflow/<BRANCH>/ci/python<X.Y>:latest           - for CI images
+  ghcr.io/apache/airflow/<BRANCH>/ci-manifest/python<X.Y>:latest  - for CI Manifest images
+  ghcr.io/apache/airflow/<BRANCH>/prod/python<X.Y>:latest         - for production images
+  ghcr.io/apache/airflow/<BRANCH>/prod-build/python<X.Y>:latest   - for production build stage
 
 You can see all the current GitHub images at `<https://github.com/apache/airflow/packages>`_
 
@@ -445,12 +447,6 @@ The following build arguments (``--build-arg`` in docker build command) can be u
 |                                          |                                          | upgraded to newer versions matching      |
 |                                          |                                          | setup.py before installation.            |
 +------------------------------------------+------------------------------------------+------------------------------------------+
-| ``CONTINUE_ON_PIP_CHECK_FAILURE``        | ``false``                                | By default the image will fail if pip    |
-|                                          |                                          | check fails for it. This is good for     |
-|                                          |                                          | interactive building but on CI the       |
-|                                          |                                          | image should be built regardless - we    |
-|                                          |                                          | have a separate step to verify image.    |
-+------------------------------------------+------------------------------------------+------------------------------------------+
 | ``AIRFLOW_PRE_CACHED_PIP_PACKAGES``      | ``true``                                 | Allows to pre-cache airflow PIP packages |
 |                                          |                                          | from the GitHub of Apache Airflow        |
 |                                          |                                          | This allows to optimize iterations for   |
@@ -558,8 +554,8 @@ way of querying image details via API. You really need to download the image to
 We workaround it in the way that always when we build the image we build a very small image manifest
 containing randomly generated UUID and push it to registry together with the main CI image.
 The tag for the manifest image reflects the image it refers to with added ``-manifest`` suffix.
-The manifest image for ``ghcr.io/apache/airflow-main-python3.6-ci-v2`` is named
-``ghcr.io/apache/airflow-main-python3.6-ci-v2-manifest``.
+The manifest image for ``ghcr.io/apache/airflow/main/ci/python3.6`` is named
+``ghcr.io/apache/airflow/main/ci-manifest/python3.6``.
 
 The image is quickly pulled (it is really, really small) when important files change and the content
 of the randomly generated UUID is compared with the one in our image. If the contents are different
diff --git a/README.md b/README.md
index 9798136..d58a39f 100644
--- a/README.md
+++ b/README.md
@@ -209,8 +209,8 @@ Those are - in the order of most common ways people install Airflow:
 - [Docker Images](https://hub.docker.com/r/apache/airflow) to install airflow via
   `docker` tool, use them in Kubernetes, Helm Charts, `docker-compose`, `docker swarm` etc. You can
   read more about using, customising, and extending the images in the
-  [Latest docs](https://airflow.apache.org/docs/apache-airflow/stable/production-deployment.html), and
-  learn details on the internals in the [IMAGES.rst](IMAGES.rst) document.
+  [Latest docs](https://airflow.apache.org/docs/docker-stack/index.html), and
+  learn details on the internals in the [IMAGES.rst](https://github.com/apache/airflow/blob/main/IMAGES.rst) document.
 - [Tags in GitHub](https://github.com/apache/airflow/tags) to retrieve the git project sources that
   were used to generate official source packages via git
 
diff --git a/breeze b/breeze
index 7decdf6..2fe1458 100755
--- a/breeze
+++ b/breeze
@@ -164,6 +164,9 @@ function breeze::setup_default_breeze_constants() {
     # Can be overridden by '--force-build-images' flag.
     export FORCE_BUILD_IMAGES="false"
 
+    # When we push from breeze we always want to push base python images
+    export PUSH_PYTHON_BASE_IMAGE="true"
+
     # Determines whether to reinstall airflow at entering the image.
     export USE_AIRFLOW_VERSION=""
     # if set to true, the ci image will look for wheel packages in dist folder and will install them
@@ -569,8 +572,7 @@ EOF
 #   AIRFLOW_SOURCES
 #   AIRFLOW_CI_IMAGE
 #   AIRFLOW_PROD_IMAGE
-#   AIRFLOW_PROD_IMAGE_KUBERNETES
-#   AIRFLOW_PROD_BASE_TAG
+#   AIRFLOW_IMAGE_KUBERNETES
 #   SQLITE_URL
 #
 # Arguments:
@@ -633,8 +635,7 @@ export MYSQL_VERSION="${MYSQL_VERSION}"
 export AIRFLOW_SOURCES="${AIRFLOW_SOURCES}"
 export AIRFLOW_CI_IMAGE="${AIRFLOW_CI_IMAGE}"
 export AIRFLOW_PROD_IMAGE="${AIRFLOW_PROD_IMAGE}"
-export AIRFLOW_PROD_IMAGE_KUBERNETES="${AIRFLOW_PROD_IMAGE_KUBERNETES}"
-export AIRFLOW_PROD_BASE_TAG="${AIRFLOW_PROD_BASE_TAG}"
+export AIRFLOW_IMAGE_KUBERNETES="${AIRFLOW_IMAGE_KUBERNETES}"
 export SQLITE_URL="${SQLITE_URL}"
 export USE_AIRFLOW_VERSION="${USE_AIRFLOW_VERSION}"
 export USE_PACKAGES_FROM_DIST="${USE_PACKAGES_FROM_DIST}"
@@ -650,7 +651,6 @@ EOF
 #
 # Global constants set:
 #
-#     PYTHON_BASE_IMAGE_VERSION
 #     PYTHON_BASE_IMAGE
 #     AIRFLOW_CI_IMAGE
 #     BUILT_CI_IMAGE_FLAG_FILE
@@ -934,7 +934,6 @@ function breeze::parse_arguments() {
             echo
             export DOCKER_CACHE="disabled"
             # if not set here, docker cached is determined later, depending on type of image to be build
-            readonly DOCKER_CACHE
             export FORCE_BUILD_IMAGES="true"
             shift
             ;;
@@ -950,7 +949,6 @@ function breeze::parse_arguments() {
             echo
             export DOCKER_CACHE="local"
             # if not set here, docker cached is determined later, depending on type of image to be build
-            readonly DOCKER_CACHE
             shift
             ;;
         -U | --build-cache-pulled)
@@ -958,14 +956,12 @@ function breeze::parse_arguments() {
             echo
             export DOCKER_CACHE="pulled"
             # if not set here, docker cached is determined later, depending on type of image to be build
-            readonly DOCKER_CACHE
             shift
             ;;
         -X | --build-cache-disabled)
             echo "Use disabled cache to build images"
             echo
             export DOCKER_CACHE="disabled"
-            readonly DOCKER_CACHE
             # if not set here, docker cached is determined later, depending on type of image to be build
             shift
             ;;
@@ -1126,7 +1122,6 @@ function breeze::parse_arguments() {
             export CHECK_IMAGE_FOR_REBUILD="false"
             export SKIP_BUILDING_PROD_IMAGE="true"
             export SKIP_CHECK_REMOTE_IMAGE="true"
-            export FAIL_ON_GITHUB_DOCKER_PULL_ERROR="true"
             shift 2
             ;;
         --init-script)
@@ -1177,12 +1172,6 @@ function breeze::parse_arguments() {
             echo
             shift
             ;;
-        --continue-on-pip-check-failure)
-            export CONTINUE_ON_PIP_CHECK_FAILURE="true"
-            echo "Skip PIP check failure."
-            echo
-            shift
-            ;;
         --package-format)
             export PACKAGE_FORMAT="${2}"
             echo "Selected package type: ${PACKAGE_FORMAT}"
@@ -3571,7 +3560,7 @@ breeze::check_and_save_all_params
 
 build_images::determine_docker_cache_strategy
 
-build_images::get_docker_image_names
+build_images::get_docker_cache_image_names
 
 initialization::make_constants_read_only
 
diff --git a/breeze-complete b/breeze-complete
index db9e42a..045bb65 100644
--- a/breeze-complete
+++ b/breeze-complete
@@ -29,7 +29,7 @@ _breeze_allowed_integrations="cassandra kerberos mongo openldap pinot rabbitmq r
 _breeze_allowed_generate_constraints_modes="source-providers pypi-providers no-providers"
 _breeze_allowed_kubernetes_modes="image"
 _breeze_allowed_kubernetes_versions="v1.20.2 v1.19.7 v1.18.15"
-_breeze_allowed_helm_versions="v3.2.4"
+_breeze_allowed_helm_versions="v3.6.3"
 _breeze_allowed_kind_versions="v0.11.1"
 _breeze_allowed_mysql_versions="5.7 8"
 _breeze_allowed_postgres_versions="9.6 10 11 12 13"
diff --git a/dev/retag_docker_images.py b/dev/retag_docker_images.py
index 5eeda8e..f29ce1b 100755
--- a/dev/retag_docker_images.py
+++ b/dev/retag_docker_images.py
@@ -36,10 +36,11 @@ PYTHON_VERSIONS = ["3.6", "3.7", "3.8", "3.9"]
 GHCR_IO_PREFIX = "ghcr.io/apache/airflow"
 
 GHCR_IO_IMAGES = [
-    "{prefix}-{branch}-python{python_version}-ci-v2-manifest:latest",
-    "{prefix}-{branch}-python{python_version}-ci-v2:latest",
-    "{prefix}-{branch}-python{python_version}-v2:latest",
-    "{prefix}-{branch}-python{python_version}-build-v2:latest",
+    "{prefix}/{branch}/ci-manifest/python{python_version}:latest",
+    "{prefix}/{branch}/ci/python{python_version}:latest",
+    "{prefix}/{branch}/prod-build/python{python_version}-build-v2:latest",
+    "{prefix}/{branch}/prod/python{python_version}-build-v2:latest",
+    "{prefix}/{branch}/python:{python_version}-slim-buster",
 ]
 
 
diff --git a/docs/docker-stack/build-arg-ref.rst b/docs/docker-stack/build-arg-ref.rst
index 8780970..f2507e0 100644
--- a/docs/docker-stack/build-arg-ref.rst
+++ b/docs/docker-stack/build-arg-ref.rst
@@ -79,12 +79,6 @@ for examples of using those arguments.
 +------------------------------------------+------------------------------------------+------------------------------------------+
 | Build argument                           | Default value                            | Description                              |
 +==========================================+==========================================+==========================================+
-| ``CONTINUE_ON_PIP_CHECK_FAILURE``        | ``false``                                | By default the image build fails if pip  |
-|                                          |                                          | check fails for it. This is good for     |
-|                                          |                                          | interactive building but on CI the       |
-|                                          |                                          | image should be built regardless - we    |
-|                                          |                                          | have a separate step to verify image.    |
-+------------------------------------------+------------------------------------------+------------------------------------------+
 | ``UPGRADE_TO_NEWER_DEPENDENCIES``        | ``false``                                | If set to true, the dependencies are     |
 |                                          |                                          | upgraded to newer versions matching      |
 |                                          |                                          | setup.py before installation.            |
diff --git a/scripts/ci/images/ci_prepare_ci_image_on_ci.sh b/scripts/ci/images/ci_prepare_ci_image_on_ci.sh
index a550038..8a89a30 100755
--- a/scripts/ci/images/ci_prepare_ci_image_on_ci.sh
+++ b/scripts/ci/images/ci_prepare_ci_image_on_ci.sh
@@ -29,24 +29,13 @@ function build_ci_image_on_ci() {
 
     if [[ ${GITHUB_REGISTRY_WAIT_FOR_IMAGE} == "true" ]]; then
         # Pretend that the image was build. We already have image with the right sources baked in!
+        # so all the checksums are assumed to be correct
         md5sum::calculate_md5sum_for_all_files
 
-        # Tries to wait for the images indefinitely
-        # skips further image checks - since we already have the target image
-
-        local python_tag_suffix=""
-        if [[ ${GITHUB_REGISTRY_PULL_IMAGE_TAG} != "latest" ]]; then
-            python_tag_suffix="-${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
-        fi
-        # first we pull base python image. We will need it to re-push it after main build
-        # Becoming the new "latest" image for other builds
-        build_images::wait_for_image_tag "${AIRFLOW_PYTHON_BASE_IMAGE}" \
-            "${python_tag_suffix}"
-
-        # And then the actual image
-        build_images::wait_for_image_tag "${AIRFLOW_CI_IMAGE}" \
-            ":${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
+        # Remove me on 7th August 2021 after all users had chance to rebase
+        legacy_ci_image="ghcr.io/${GITHUB_REPOSITORY}-${BRANCH_NAME}-python${PYTHON_MAJOR_MINOR_VERSION}-ci-v2:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
 
+        build_images::wait_for_image_tag "${AIRFLOW_CI_IMAGE}" ":${GITHUB_REGISTRY_PULL_IMAGE_TAG}" "${legacy_ci_image}"
         md5sum::update_all_md5_with_group
     else
         build_images::rebuild_ci_image_if_needed
diff --git a/scripts/ci/images/ci_prepare_prod_image_on_ci.sh b/scripts/ci/images/ci_prepare_prod_image_on_ci.sh
index dbcb07d..14bd71f 100755
--- a/scripts/ci/images/ci_prepare_prod_image_on_ci.sh
+++ b/scripts/ci/images/ci_prepare_prod_image_on_ci.sh
@@ -33,36 +33,13 @@ function build_prod_images_on_ci() {
 
     if [[ ${GITHUB_REGISTRY_WAIT_FOR_IMAGE} == "true" ]]; then
         # Tries to wait for the images indefinitely
-        # skips further image checks - since we already have the target image
-
-        local python_tag_suffix=""
-        if [[ ${GITHUB_REGISTRY_PULL_IMAGE_TAG} != "latest" ]]; then
-            python_tag_suffix="-${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
-        fi
-
-        if [[ "${WAIT_FOR_PYTHON_BASE_IMAGE=}" == "true" ]]; then
-            # first we pull base python image. We will need it to re-push it after main build
-            # Becoming the new "latest" image for other builds
-            build_images::wait_for_image_tag "${AIRFLOW_PYTHON_BASE_IMAGE}" \
-                "${python_tag_suffix}"
-        fi
-
-        # And then the actual image
-        build_images::wait_for_image_tag "${AIRFLOW_PROD_IMAGE}" \
-            ":${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
-
-        # And the prod build image
-        if [[ "${WAIT_FOR_PROD_BUILD_IMAGE=}" == "true" ]]; then
-            # If specified in variable - also waits for the build image
-            build_images::wait_for_image_tag "${AIRFLOW_PROD_BUILD_IMAGE}" \
-                ":${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
-        fi
-
+        # Remove me on 7th August 2021 after all users had chance to rebase
+        legacy_prod_image="ghcr.io/${GITHUB_REPOSITORY}-${BRANCH_NAME}-python${PYTHON_MAJOR_MINOR_VERSION}-v2:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
+        build_images::wait_for_image_tag "${AIRFLOW_PROD_IMAGE}" ":${GITHUB_REGISTRY_PULL_IMAGE_TAG}" "${legacy_prod_image}"
     else
         build_images::build_prod_images_from_locally_built_airflow_packages
     fi
 
-
     # Disable force pulling forced above this is needed for the subsequent scripts so that
     # They do not try to pull/build images again
     unset FORCE_PULL_IMAGES
diff --git a/scripts/ci/tools/fix_ownership.sh b/scripts/ci/images/ci_push_legacy_ci_images.sh
similarity index 58%
copy from scripts/ci/tools/fix_ownership.sh
copy to scripts/ci/images/ci_push_legacy_ci_images.sh
index 6ed1161..aa6696b 100755
--- a/scripts/ci/tools/fix_ownership.sh
+++ b/scripts/ci/images/ci_push_legacy_ci_images.sh
@@ -15,26 +15,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-#
-# Fixes ownership for files created inside container (files owned by root will be owned by host user)
-#
 # shellcheck source=scripts/ci/libraries/_script_init.sh
 . "$( dirname "${BASH_SOURCE[0]}" )/../libraries/_script_init.sh"
 
-if [[ ${OSTYPE} == "darwin"* ]]; then
-    # No need to fix ownership on MacOS - the filesystem there takes care about ownership mapping
-    exit
-fi
-
-declare -a EXTRA_DOCKER_FLAGS
-
-sanity_checks::sanitize_mounted_files
-
-read -r -a EXTRA_DOCKER_FLAGS <<<"$(local_mounts::convert_local_mounts_to_docker_params)"
+# This script pushes legacy images to old-naming-convention images
+# It should be removed ~ 7th of August, giving users time to rebase their old pull requests
+build_images::prepare_ci_build
 
-docker_v run --entrypoint /bin/bash "${EXTRA_DOCKER_FLAGS[@]}" \
-    --rm \
-    --env-file "${AIRFLOW_SOURCES}/scripts/ci/docker-compose/_docker.env" \
-    "${AIRFLOW_CI_IMAGE}" \
-    -c /opt/airflow/scripts/in_container/run_fix_ownership.sh || true
+legacy_ci_image="ghcr.io/${GITHUB_REPOSITORY}-${BRANCH_NAME}-python${PYTHON_MAJOR_MINOR_VERSION}-ci-v2:${GITHUB_REGISTRY_PUSH_IMAGE_TAG}"
+docker tag "${AIRFLOW_CI_IMAGE}" "${legacy_ci_image}"
+docker push "${legacy_ci_image}"
diff --git a/scripts/ci/tools/fix_ownership.sh b/scripts/ci/images/ci_push_legacy_prod_images.sh
similarity index 58%
copy from scripts/ci/tools/fix_ownership.sh
copy to scripts/ci/images/ci_push_legacy_prod_images.sh
index 6ed1161..3f74874 100755
--- a/scripts/ci/tools/fix_ownership.sh
+++ b/scripts/ci/images/ci_push_legacy_prod_images.sh
@@ -15,26 +15,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-#
-# Fixes ownership for files created inside container (files owned by root will be owned by host user)
-#
 # shellcheck source=scripts/ci/libraries/_script_init.sh
 . "$( dirname "${BASH_SOURCE[0]}" )/../libraries/_script_init.sh"
 
-if [[ ${OSTYPE} == "darwin"* ]]; then
-    # No need to fix ownership on MacOS - the filesystem there takes care about ownership mapping
-    exit
-fi
-
-declare -a EXTRA_DOCKER_FLAGS
-
-sanity_checks::sanitize_mounted_files
-
-read -r -a EXTRA_DOCKER_FLAGS <<<"$(local_mounts::convert_local_mounts_to_docker_params)"
+# This script pushes legacy images to old-naming-convention images which will keep old PRs working before
+# Rebasing to main. It should be removed ~7th of August 2021, giving users time to rebase their old pull requests
+build_images::prepare_ci_build
 
-docker_v run --entrypoint /bin/bash "${EXTRA_DOCKER_FLAGS[@]}" \
-    --rm \
-    --env-file "${AIRFLOW_SOURCES}/scripts/ci/docker-compose/_docker.env" \
-    "${AIRFLOW_CI_IMAGE}" \
-    -c /opt/airflow/scripts/in_container/run_fix_ownership.sh || true
+legacy_prod_image="ghcr.io/${GITHUB_REPOSITORY}-${BRANCH_NAME}-python${PYTHON_MAJOR_MINOR_VERSION}-v2:${GITHUB_REGISTRY_PUSH_IMAGE_TAG}"
+docker tag "${AIRFLOW_PROD_IMAGE}" "${legacy_prod_image}"
+docker push "${legacy_prod_image}"
diff --git a/scripts/ci/images/ci_wait_for_and_verify_all_ci_images.sh b/scripts/ci/images/ci_wait_for_and_verify_all_ci_images.sh
index 4255374..39dfe8f 100755
--- a/scripts/ci/images/ci_wait_for_and_verify_all_ci_images.sh
+++ b/scripts/ci/images/ci_wait_for_and_verify_all_ci_images.sh
@@ -25,6 +25,8 @@ source "${LIBRARIES_DIR}/_all_libs.sh"
 
 initialization::set_output_color_variables
 
+PARALLEL_TAIL_LENGTH=5
+
 parallel::make_sure_gnu_parallel_is_installed
 
 parallel::make_sure_python_versions_are_specified
diff --git a/scripts/ci/images/ci_wait_for_and_verify_all_prod_images.sh b/scripts/ci/images/ci_wait_for_and_verify_all_prod_images.sh
index 08ed54b..6786a31 100755
--- a/scripts/ci/images/ci_wait_for_and_verify_all_prod_images.sh
+++ b/scripts/ci/images/ci_wait_for_and_verify_all_prod_images.sh
@@ -25,6 +25,8 @@ source "${LIBRARIES_DIR}/_all_libs.sh"
 
 initialization::set_output_color_variables
 
+PARALLEL_TAIL_LENGTH=5
+
 parallel::make_sure_gnu_parallel_is_installed
 
 parallel::make_sure_python_versions_are_specified
diff --git a/scripts/ci/images/ci_wait_for_and_verify_ci_image.sh b/scripts/ci/images/ci_wait_for_and_verify_ci_image.sh
index 105bcfb..bb18100 100755
--- a/scripts/ci/images/ci_wait_for_and_verify_ci_image.sh
+++ b/scripts/ci/images/ci_wait_for_and_verify_ci_image.sh
@@ -28,25 +28,34 @@ shift
 # shellcheck source=scripts/ci/libraries/_script_init.sh
 . "$( dirname "${BASH_SOURCE[0]}" )/../libraries/_script_init.sh"
 
+image_name_with_tag="${AIRFLOW_CI_IMAGE}:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
+
+# Remove me on 7th August 2021 after all users had chance to rebase
+legacy_ci_image="ghcr.io/${GITHUB_REPOSITORY}-${BRANCH_NAME}-python${PYTHON_MAJOR_MINOR_VERSION}-ci-v2:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
+
 function pull_ci_image() {
-    local image_name_with_tag="${AIRFLOW_CI_IMAGE}:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
-    start_end::group_start "Pulling ${image_name_with_tag} image"
-    push_pull_remove_images::pull_image_if_not_present_or_forced "${image_name_with_tag}"
+    start_end::group_start "Pulling image: ${IMAGE_AVAILABLE}"
+    push_pull_remove_images::pull_image_if_not_present_or_forced "${IMAGE_AVAILABLE}"
+    # Remove me on 7th August 2021 after all users had chance to rebase
+    if [[ ${IMAGE_AVAILABLE} != "${image_name_with_tag}" ]]; then
+        verbosity::print_info "Tagging the legacy ${IMAGE_AVAILABLE} with ${image_name_with_tag}"
+        docker tag "${IMAGE_AVAILABLE}" "${image_name_with_tag}"
+    fi
     start_end::group_end
-
 }
 
 start_end::group_start "Configure Docker Registry"
 build_images::configure_docker_registry
 start_end::group_end
 
-start_end::group_start "Waiting for ${AIRFLOW_CI_IMAGE}"
-
-push_pull_remove_images::wait_for_image "${AIRFLOW_CI_IMAGE}:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
+start_end::group_start "Waiting for ${image_name_with_tag}"
+# Remove me on 7th August 2021 after all users had chance to rebase
+push_pull_remove_images::wait_for_image "${image_name_with_tag}" "${legacy_ci_image}"
 build_images::prepare_ci_build
-pull_ci_image
 start_end::group_end
 
+pull_ci_image
+
 if [[ ${VERIFY_IMAGE=} != "false" ]]; then
-    verify_image::verify_ci_image "${AIRFLOW_CI_IMAGE}:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
+    verify_image::verify_ci_image "${image_name_with_tag}"
 fi
diff --git a/scripts/ci/images/ci_wait_for_and_verify_prod_image.sh b/scripts/ci/images/ci_wait_for_and_verify_prod_image.sh
index 482b0a5..d93da72 100755
--- a/scripts/ci/images/ci_wait_for_and_verify_prod_image.sh
+++ b/scripts/ci/images/ci_wait_for_and_verify_prod_image.sh
@@ -28,22 +28,34 @@ shift
 # shellcheck source=scripts/ci/libraries/_script_init.sh
 . "$( dirname "${BASH_SOURCE[0]}" )/../libraries/_script_init.sh"
 
-start_end::group_start "Configure Docker Registry"
-build_images::configure_docker_registry
-start_end::group_end
+image_name_with_tag="${AIRFLOW_PROD_IMAGE}:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
+
+# Remove me on 7th August 2021 after all users had chance to rebase
+legacy_prod_image="ghcr.io/${GITHUB_REPOSITORY}-${BRANCH_NAME}-python${PYTHON_MAJOR_MINOR_VERSION}-v2:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
 
-start_end::group_start "Waiting for ${AIRFLOW_PROD_IMAGE}:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
+function pull_prod_image() {
+    start_end::group_start  "Pulling image: ${IMAGE_AVAILABLE}"
+    push_pull_remove_images::pull_image_if_not_present_or_forced "${IMAGE_AVAILABLE}"
+    # Remove me on 7th August 2021 after all users had chance to rebase
+    if [[ ${IMAGE_AVAILABLE} != "${image_name_with_tag}" ]]; then
+        verbosity::print_info "Tagging the legacy ${IMAGE_AVAILABLE} with ${image_name_with_tag}"
+        docker tag "${IMAGE_AVAILABLE}" "${image_name_with_tag}"
+    fi
+    start_end::group_end
+}
 
-push_pull_remove_images::wait_for_image "${AIRFLOW_PROD_IMAGE}:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
+start_end::group_start "Configure Docker Registry"
+build_images::configure_docker_registry
 start_end::group_end
 
-start_end::group_start "Pulling the PROD Image"
+start_end::group_start "Waiting for ${image_name_with_tag}"
+# Remove me on 7th August 2021 after all users had chance to rebase
+push_pull_remove_images::wait_for_image "${image_name_with_tag}" "${legacy_prod_image}"
 build_images::prepare_prod_build
-image_name_with_tag="${AIRFLOW_PROD_IMAGE}:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
-verbosity::print_info "Pulling the ${image_name_with_tag} image and tagging with ${AIRFLOW_PROD_IMAGE}"
-push_pull_remove_images::pull_image_if_not_present_or_forced "${image_name_with_tag}"
 start_end::group_end
 
+pull_prod_image
+
 if [[ ${VERIFY_IMAGE=} != "false" ]]; then
-    verify_image::verify_prod_image "${AIRFLOW_PROD_IMAGE}:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
+    verify_image::verify_prod_image "${image_name_with_tag}"
 fi
diff --git a/scripts/ci/libraries/_build_images.sh b/scripts/ci/libraries/_build_images.sh
index ca94e4c..dfbfe1a 100644
--- a/scripts/ci/libraries/_build_images.sh
+++ b/scripts/ci/libraries/_build_images.sh
@@ -251,7 +251,6 @@ EOF
 # Retrieves information about build cache hash random file from the local image
 #
 function build_images::get_local_build_cache_hash() {
-
     set +e
     # Remove the container just in case
     docker_v rm --force "local-airflow-ci-container" 2>/dev/null >/dev/null
@@ -262,6 +261,7 @@ function build_images::get_local_build_cache_hash() {
         LOCAL_MANIFEST_IMAGE_UNAVAILABLE="true"
         export LOCAL_MANIFEST_IMAGE_UNAVAILABLE
         touch "${LOCAL_IMAGE_BUILD_CACHE_HASH_FILE}"
+        set -e
         return
 
     fi
@@ -296,6 +296,7 @@ function build_images::get_remote_image_build_cache_hash() {
         REMOTE_DOCKER_REGISTRY_UNREACHABLE="true"
         export REMOTE_DOCKER_REGISTRY_UNREACHABLE
         touch "${REMOTE_IMAGE_BUILD_CACHE_HASH_FILE}"
+        set -e
         return
     fi
     set -e
@@ -358,49 +359,44 @@ function build_images::get_github_container_registry_image_prefix() {
     echo "${GITHUB_REPOSITORY}" | tr '[:upper:]' '[:lower:]'
 }
 
-function build_images::get_docker_image_names() {
-    # python image version to use
-    export PYTHON_BASE_IMAGE_VERSION=${PYTHON_BASE_IMAGE_VERSION:=${PYTHON_MAJOR_MINOR_VERSION}}
-
+function build_images::get_docker_cache_image_names() {
     # Python base image to use
-    export PYTHON_BASE_IMAGE="python:${PYTHON_BASE_IMAGE_VERSION}-slim-buster"
+    export PYTHON_BASE_IMAGE="python:${PYTHON_MAJOR_MINOR_VERSION}-slim-buster"
 
     local image_name
     image_name="${GITHUB_REGISTRY}/$(build_images::get_github_container_registry_image_prefix)"
 
-    # CI image base tag
-    export AIRFLOW_CI_BASE_TAG="${BRANCH_NAME}-python${PYTHON_MAJOR_MINOR_VERSION}-ci"
+    # Example:
+    #  ghcr.io/apache/airflow/main/python:3.8-slim-buster
+    export AIRFLOW_PYTHON_BASE_IMAGE="${image_name}/${BRANCH_NAME}/python:${PYTHON_MAJOR_MINOR_VERSION}-slim-buster"
 
     # Example:
-    #  ghcr.io/apache/airflow-main-python3.8-ci-v2
-    export AIRFLOW_CI_IMAGE="${image_name}-${AIRFLOW_CI_BASE_TAG}${GITHUB_REGISTRY_IMAGE_SUFFIX}"
+    #  ghcr.io/apache/airflow/main/ci/python3.8
+    export AIRFLOW_CI_IMAGE="${image_name}/${BRANCH_NAME}/ci/python${PYTHON_MAJOR_MINOR_VERSION}"
 
-    export AIRFLOW_CI_LOCAL_MANIFEST_IMAGE="local-airflow-ci-manifest:${AIRFLOW_CI_BASE_TAG}"
+    # Example:
+    #  local-airflow-ci-manifest/main/python3.8
+    export AIRFLOW_CI_LOCAL_MANIFEST_IMAGE="local-airflow-ci-manifest/${BRANCH_NAME}/python${PYTHON_MAJOR_MINOR_VERSION}"
 
     # Example:
-    #  ghcr.io/apache/airflow-main-python3.8-ci-v2-manifest
-    export AIRFLOW_CI_REMOTE_MANIFEST_IMAGE="${image_name}-${AIRFLOW_CI_BASE_TAG}${GITHUB_REGISTRY_IMAGE_SUFFIX}-manifest"
+    #  ghcr.io/apache/airflow/main/ci-manifest/python3.8
+    export AIRFLOW_CI_REMOTE_MANIFEST_IMAGE="${image_name}/${BRANCH_NAME}/ci-manifest/python${PYTHON_MAJOR_MINOR_VERSION}"
 
     # File that is touched when the CI image is built for the first time locally
     export BUILT_CI_IMAGE_FLAG_FILE="${BUILD_CACHE_DIR}/${BRANCH_NAME}/.built_${PYTHON_MAJOR_MINOR_VERSION}"
 
-    # PROD image to build
-    export AIRFLOW_PROD_BASE_TAG="${BRANCH_NAME}-python${PYTHON_MAJOR_MINOR_VERSION}"
-
     # Example:
-    #  ghcr.io/apache/airflow-v2-1-test-python-v2:3.6-slim-buster
-    export AIRFLOW_PROD_IMAGE="${image_name}-${AIRFLOW_PROD_BASE_TAG}${GITHUB_REGISTRY_IMAGE_SUFFIX}"
-
-    # PROD Kubernetes image to build
-    export AIRFLOW_PROD_IMAGE_KUBERNETES="${AIRFLOW_PROD_IMAGE}-kubernetes"
+    #  ghcr.io/apache/airflow/main/prod/python3.8
+    export AIRFLOW_PROD_IMAGE="${image_name}/${BRANCH_NAME}/prod/python${PYTHON_MAJOR_MINOR_VERSION}"
 
     # Example:
-    #   ghcr.io/apache/airflow-main-python3.6-build-v2
-    export AIRFLOW_PROD_BUILD_IMAGE="${image_name}-${AIRFLOW_PROD_BASE_TAG}-build${GITHUB_REGISTRY_IMAGE_SUFFIX}"
+    #   ghcr.io/apache/airflow/main/prod-build/python3.8
+    export AIRFLOW_PROD_BUILD_IMAGE="${image_name}/${BRANCH_NAME}/prod-build/python${PYTHON_MAJOR_MINOR_VERSION}"
+
+    # Kubernetes image to build
+    #  ghcr.io/apache/airflow/main/kubernetes/python3.8
+    export AIRFLOW_IMAGE_KUBERNETES="${image_name}/${BRANCH_NAME}/kubernetes/python${PYTHON_MAJOR_MINOR_VERSION}"
 
-    # Example:
-    #  ghcr.io/apache/airflow-python-v2:3.6-slim-buster
-    export AIRFLOW_PYTHON_BASE_IMAGE="${image_name}-python${GITHUB_REGISTRY_IMAGE_SUFFIX}:${PYTHON_BASE_IMAGE_VERSION}-slim-buster"
 
 
 }
@@ -669,7 +665,6 @@ Docker building ${AIRFLOW_CI_IMAGE}.
         --build-arg ADDITIONAL_RUNTIME_APT_DEPS="${ADDITIONAL_RUNTIME_APT_DEPS}" \
         --build-arg ADDITIONAL_RUNTIME_APT_ENV="${ADDITIONAL_RUNTIME_APT_ENV}" \
         --build-arg UPGRADE_TO_NEWER_DEPENDENCIES="${UPGRADE_TO_NEWER_DEPENDENCIES}" \
-        --build-arg CONTINUE_ON_PIP_CHECK_FAILURE="${CONTINUE_ON_PIP_CHECK_FAILURE}" \
         --build-arg CONSTRAINTS_GITHUB_REPOSITORY="${CONSTRAINTS_GITHUB_REPOSITORY}" \
         --build-arg AIRFLOW_CONSTRAINTS_REFERENCE="${DEFAULT_CONSTRAINTS_BRANCH}" \
         --build-arg AIRFLOW_CONSTRAINTS="${AIRFLOW_CONSTRAINTS}" \
@@ -810,7 +805,6 @@ function build_images::build_prod_images() {
         --build-arg INSTALL_FROM_PYPI="${INSTALL_FROM_PYPI}" \
         --build-arg INSTALL_FROM_DOCKER_CONTEXT_FILES="${INSTALL_FROM_DOCKER_CONTEXT_FILES}" \
         --build-arg UPGRADE_TO_NEWER_DEPENDENCIES="${UPGRADE_TO_NEWER_DEPENDENCIES}" \
-        --build-arg CONTINUE_ON_PIP_CHECK_FAILURE="${CONTINUE_ON_PIP_CHECK_FAILURE}" \
         --build-arg BUILD_ID="${CI_BUILD_ID}" \
         --build-arg COMMIT_SHA="${COMMIT_SHA}" \
         --build-arg CONSTRAINTS_GITHUB_REPOSITORY="${CONSTRAINTS_GITHUB_REPOSITORY}" \
@@ -845,7 +839,6 @@ function build_images::build_prod_images() {
         --build-arg INSTALL_FROM_PYPI="${INSTALL_FROM_PYPI}" \
         --build-arg INSTALL_FROM_DOCKER_CONTEXT_FILES="${INSTALL_FROM_DOCKER_CONTEXT_FILES}" \
         --build-arg UPGRADE_TO_NEWER_DEPENDENCIES="${UPGRADE_TO_NEWER_DEPENDENCIES}" \
-        --build-arg CONTINUE_ON_PIP_CHECK_FAILURE="${CONTINUE_ON_PIP_CHECK_FAILURE}" \
         --build-arg AIRFLOW_VERSION="${AIRFLOW_VERSION}" \
         --build-arg AIRFLOW_BRANCH="${AIRFLOW_BRANCH_FOR_PYPI_PRELOADING}" \
         --build-arg AIRFLOW_EXTRAS="${AIRFLOW_EXTRAS}" \
@@ -868,17 +861,37 @@ function build_images::build_prod_images() {
     fi
 }
 
+# Tags source image with names provided
+# $1 source image
+# $2, $3 - target image names
+function build_images::tag_image() {
+    local source_image_name="$1"
+    shift
+    local target_image_name
+    for target_image_name in "${@}"; do
+        echo
+        echo "Tagging ${source_image_name} as ${target_image_name}."
+        echo
+        docker_v tag "${source_image_name}" "${target_image_name}"
+    done
+}
+
 # Waits for image tag to appear in GitHub Registry, pulls it and tags with the target tag
 # Parameters:
 #  $1 - image name to wait for
-#  $2 - suffix of the image to wait for
-#  $3, $4, ... - target tags to tag the image with
+#  $2 - fallback image to wait for
+#  $3 - suffix of the image to wait for (Remove me on 7th August 2021 after all users had chance to rebase)
+#  $4, $5, ... - target tags to tag the image with
 function build_images::wait_for_image_tag() {
 
     local image_name="${1}"
     local image_suffix="${2}"
     shift 2
 
+    # Remove me 7th of August 2021
+    local legacy_image_to_pull="${1}"
+    shift
+
     local image_to_wait_for="${image_name}${image_suffix}"
     start_end::group_start "Wait for image tag ${image_to_wait_for}"
     while true; do
@@ -891,26 +904,34 @@ function build_images::wait_for_image_tag() {
         image_hash="$(docker images -q "${image_to_wait_for}" 2>>"${OUTPUT_LOG}" || true)"
         if [[ -z "${image_hash}" ]]; then
             echo
-            echo "The image ${image_to_wait_for} is not yet available. No local hash for the image. Waiting."
+            echo "The image ${image_to_wait_for} is not yet available. No local hash for the image. Falling bacl to legacy."
             echo
             echo "Last log:"
             cat "${OUTPUT_LOG}" || true
             echo
-            sleep 10
-        else
-            echo
-            echo "The image ${image_to_wait_for} with '${image_name}' tag"
-            echo
-            echo
-            echo "Tagging ${image_to_wait_for} as ${image_name}."
-            echo
-            docker_v tag "${image_to_wait_for}" "${image_name}"
-            for TARGET_TAG in "${@}"; do
+            echo "Checking Legacy image!"
+            # Legacy - Remove me 7th of August 2021
+            set +e
+            echo "${COLOR_BLUE}Docker pull ${legacy_image_to_pull} ${COLOR_RESET}" >"${OUTPUT_LOG}"
+            docker_v pull "${legacy_image_to_pull}" >>"${OUTPUT_LOG}" 2>&1
+            set -e
+            echo "${COLOR_BLUE} Docker images -q ${legacy_image_to_pull}${COLOR_RESET}" >>"${OUTPUT_LOG}"
+            image_hash="$(docker images -q "${legacy_image_to_pull}" 2>>"${OUTPUT_LOG}" || true)"
+            if [[ -z "${image_hash}" ]]; then
                 echo
-                echo "Tagging ${image_to_wait_for} as ${TARGET_TAG}."
+                echo "The image ${legacy_image_to_pull} is not yet available. No local hash for the image. Waiting."
                 echo
-                docker_v tag "${image_to_wait_for}" "${TARGET_TAG}"
-            done
+                echo "Last log:"
+                cat "${OUTPUT_LOG}" || true
+                sleep 10
+            else
+                # Legacy - Rremove me 7th of August 2021
+                # Pretend that the image we waited for was downloaded :)
+                build_images::tag_image "${legacy_image_to_pull}" "${image_to_wait_for}" "${image_name}:latest" "${@}"
+                break
+            fi
+        else
+            build_images::tag_image "${image_to_wait_for}" "${image_name}:latest" "${@}"
             break
         fi
     done
@@ -927,7 +948,6 @@ function build_images::determine_docker_cache_strategy() {
             export DOCKER_CACHE="pulled"
         fi
     fi
-    readonly DOCKER_CACHE
     verbosity::print_info
     verbosity::print_info "Using ${DOCKER_CACHE} cache strategy for the build."
     verbosity::print_info
diff --git a/scripts/ci/libraries/_initialization.sh b/scripts/ci/libraries/_initialization.sh
index cbeb932..a51cdd0 100644
--- a/scripts/ci/libraries/_initialization.sh
+++ b/scripts/ci/libraries/_initialization.sh
@@ -153,6 +153,10 @@ function initialization::initialize_base_variables() {
 
     # Dry run - only show docker-compose and docker commands but do not execute them
     export DRY_RUN_DOCKER=${DRY_RUN_DOCKER:="false"}
+
+    # By default we only push built ci/prod images - base python images are only pushed
+    # When requested
+    export PUSH_PYTHON_BASE_IMAGE=${PUSH_PYTHON_BASE_IMAGE:="false"}
 }
 
 # Determine current branch
@@ -200,7 +204,7 @@ function initialization::initialize_files_for_rebuild_check() {
         "scripts/docker/common.sh"
         "scripts/docker/install_additional_dependencies.sh"
         "scripts/docker/install_airflow.sh"
-        "scripts/docker/install_airflow_from_branch_tip.sh"
+        "scripts/docker/install_airflow_dependencies_from_branch_tip.sh"
         "scripts/docker/install_from_docker_context_files.sh"
         "scripts/docker/install_mysql.sh"
         "airflow/www/package.json"
@@ -282,9 +286,6 @@ function initialization::initialize_force_variables() {
 
     # Can be set to true to skip if the image is newer in registry
     export SKIP_CHECK_REMOTE_IMAGE=${SKIP_CHECK_REMOTE_IMAGE:="false"}
-
-    # Should be set to true if you expect image frm GitHub to be present and downloaded
-    export FAIL_ON_GITHUB_DOCKER_PULL_ERROR=${FAIL_ON_GITHUB_DOCKER_PULL_ERROR:="false"}
 }
 
 # Determine information about the host
@@ -401,7 +402,7 @@ function initialization::initialize_image_build_variables() {
     export INSTALLED_PROVIDERS
     export INSTALLED_EXTRAS="async,amazon,celery,cncf.kubernetes,docker,dask,elasticsearch,ftp,grpc,hashicorp,http,imap,ldap,google,microsoft.azure,mysql,postgres,redis,sendgrid,sftp,slack,ssh,statsd,virtualenv"
 
-    AIRFLOW_PIP_VERSION=${AIRFLOW_PIP_VERSION:="21.1"}
+    AIRFLOW_PIP_VERSION=${AIRFLOW_PIP_VERSION:="21.2.2"}
     export AIRFLOW_PIP_VERSION
 
     # We also pin version of wheel used to get consistent builds
@@ -425,9 +426,6 @@ function initialization::initialize_image_build_variables() {
     # Installs different airflow version than current from the sources
     export INSTALL_AIRFLOW_VERSION=${INSTALL_AIRFLOW_VERSION:=""}
 
-    # Continue on PIP CHECK failure
-    export CONTINUE_ON_PIP_CHECK_FAILURE=${CONTINUE_ON_PIP_CHECK_FAILURE:="false"}
-
     # Determines if airflow should be installed from a specified reference in GitHub
     export INSTALL_AIRFLOW_REFERENCE=${INSTALL_AIRFLOW_REFERENCE:=""}
 
@@ -482,7 +480,7 @@ function initialization::initialize_kubernetes_variables() {
     CURRENT_KIND_VERSIONS+=("v0.11.1")
     export CURRENT_KIND_VERSIONS
     # Currently supported versions of Helm
-    CURRENT_HELM_VERSIONS+=("v3.2.4")
+    CURRENT_HELM_VERSIONS+=("v3.6.3")
     export CURRENT_HELM_VERSIONS
     # Current executor in chart
     CURRENT_EXECUTOR+=("KubernetesExecutor")
@@ -535,7 +533,6 @@ function initialization::initialize_git_variables() {
 function initialization::initialize_github_variables() {
     # Defaults for interacting with GitHub
     export GITHUB_REGISTRY="ghcr.io"
-    export GITHUB_REGISTRY_IMAGE_SUFFIX=${GITHUB_REGISTRY_IMAGE_SUFFIX:="-v2"}
     export GITHUB_REGISTRY_WAIT_FOR_IMAGE=${GITHUB_REGISTRY_WAIT_FOR_IMAGE:="false"}
     export GITHUB_REGISTRY_PULL_IMAGE_TAG=${GITHUB_REGISTRY_PULL_IMAGE_TAG:="latest"}
     export GITHUB_REGISTRY_PUSH_IMAGE_TAG=${GITHUB_REGISTRY_PUSH_IMAGE_TAG:="latest"}
@@ -634,7 +631,6 @@ Force variables:
     FORCE_BUILD_IMAGES: ${FORCE_BUILD_IMAGES}
     FORCE_ANSWER_TO_QUESTIONS: ${FORCE_ANSWER_TO_QUESTIONS}
     SKIP_CHECK_REMOTE_IMAGE: ${SKIP_CHECK_REMOTE_IMAGE}
-    FAIL_ON_GITHUB_DOCKER_PULL_ERROR: ${FAIL_ON_GITHUB_DOCKER_PULL_ERROR}
 
 Host variables:
 
@@ -663,7 +659,6 @@ Common image build variables:
     INSTALL_FROM_PYPI: '${INSTALL_FROM_PYPI}'
     AIRFLOW_PRE_CACHED_PIP_PACKAGES: '${AIRFLOW_PRE_CACHED_PIP_PACKAGES}'
     UPGRADE_TO_NEWER_DEPENDENCIES: '${UPGRADE_TO_NEWER_DEPENDENCIES}'
-    CONTINUE_ON_PIP_CHECK_FAILURE: '${CONTINUE_ON_PIP_CHECK_FAILURE}'
     CHECK_IMAGE_FOR_REBUILD: '${CHECK_IMAGE_FOR_REBUILD}'
     AIRFLOW_CONSTRAINTS_LOCATION: '${AIRFLOW_CONSTRAINTS_LOCATION}'
     AIRFLOW_CONSTRAINTS_REFERENCE: '${AIRFLOW_CONSTRAINTS_REFERENCE}'
@@ -830,8 +825,6 @@ function initialization::make_constants_read_only() {
     readonly ADDITIONAL_RUNTIME_APT_DEPS
     readonly ADDITIONAL_RUNTIME_APT_ENV
 
-    readonly DOCKER_CACHE
-
     readonly GITHUB_REGISTRY
     readonly GITHUB_REGISTRY_WAIT_FOR_IMAGE
     readonly GITHUB_REGISTRY_PULL_IMAGE_TAG
@@ -847,11 +840,8 @@ function initialization::make_constants_read_only() {
 
     readonly VERSION_SUFFIX_FOR_PYPI
 
-    readonly PYTHON_BASE_IMAGE_VERSION
     readonly PYTHON_BASE_IMAGE
-    readonly AIRFLOW_CI_BASE_TAG
-    readonly AIRFLOW_PROD_BASE_TAG
-    readonly AIRFLOW_PROD_IMAGE_KUBERNETES
+    readonly AIRFLOW_IMAGE_KUBERNETES
     readonly BUILT_CI_IMAGE_FLAG_FILE
     readonly INIT_SCRIPT_FILE
 
diff --git a/scripts/ci/libraries/_kind.sh b/scripts/ci/libraries/_kind.sh
index d4910d9..1fb77eb 100644
--- a/scripts/ci/libraries/_kind.sh
+++ b/scripts/ci/libraries/_kind.sh
@@ -262,8 +262,8 @@ function kind::build_image_for_kubernetes_tests() {
     if [[ -n ${GITHUB_REGISTRY_PULL_IMAGE_TAG=} ]]; then
         image_tag="${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
     fi
-    echo "Building ${AIRFLOW_PROD_IMAGE_KUBERNETES}:latest from ${AIRFLOW_PROD_IMAGE}:${image_tag}"
-    docker_v build --tag "${AIRFLOW_PROD_IMAGE_KUBERNETES}:latest" . -f - <<EOF
+    echo "Building ${AIRFLOW_IMAGE_KUBERNETES}:latest from ${AIRFLOW_PROD_IMAGE}:${image_tag}"
+    docker_v build --tag "${AIRFLOW_IMAGE_KUBERNETES}:latest" . -f - <<EOF
 FROM ${AIRFLOW_PROD_IMAGE}:${image_tag}
 
 COPY airflow/example_dags/ \${AIRFLOW_HOME}/dags/
@@ -271,11 +271,11 @@ COPY airflow/example_dags/ \${AIRFLOW_HOME}/dags/
 COPY airflow/kubernetes_executor_templates/ \${AIRFLOW_HOME}/pod_templates/
 
 EOF
-    echo "The ${AIRFLOW_PROD_IMAGE_KUBERNETES}:${image_tag} is prepared for test kubernetes deployment."
+    echo "The ${AIRFLOW_IMAGE_KUBERNETES}:${image_tag} is prepared for test kubernetes deployment."
 }
 
 function kind::load_image_to_kind_cluster() {
-    kind load docker-image --name "${KIND_CLUSTER_NAME}" "${AIRFLOW_PROD_IMAGE_KUBERNETES}:latest"
+    kind load docker-image --name "${KIND_CLUSTER_NAME}" "${AIRFLOW_IMAGE_KUBERNETES}:latest"
 }
 
 MAX_NUM_TRIES_FOR_HEALTH_CHECK=12
@@ -343,8 +343,8 @@ function kind::deploy_airflow_with_helm() {
     helm install airflow . \
         --timeout 10m0s \
         --namespace "${HELM_AIRFLOW_NAMESPACE}" \
-        --set "defaultAirflowRepository=${AIRFLOW_PROD_IMAGE_KUBERNETES}" \
-        --set "images.airflow.repository=${AIRFLOW_PROD_IMAGE_KUBERNETES}" \
+        --set "defaultAirflowRepository=${AIRFLOW_IMAGE_KUBERNETES}" \
+        --set "images.airflow.repository=${AIRFLOW_IMAGE_KUBERNETES}" \
         --set "images.airflow.tag=latest" -v 1 \
         --set "defaultAirflowTag=latest" -v 1 \
         --set "config.api.auth_backend=airflow.api.auth.backend.basic_auth" \
@@ -376,8 +376,8 @@ function kind::upgrade_airflow_with_helm() {
     helm repo add stable https://charts.helm.sh/stable/
     helm dep update
     helm upgrade airflow . --namespace "${HELM_AIRFLOW_NAMESPACE}" \
-        --set "defaultAirflowRepository=${AIRFLOW_PROD_IMAGE_KUBERNETES}" \
-        --set "images.airflow.repository=${AIRFLOW_PROD_IMAGE_KUBERNETES}" \
+        --set "defaultAirflowRepository=${AIRFLOW_IMAGE_KUBERNETES}" \
+        --set "images.airflow.repository=${AIRFLOW_IMAGE_KUBERNETES}" \
         --set "images.airflow.tag=latest" -v 1 \
         --set "defaultAirflowTag=latest" -v 1 \
         --set "config.api.auth_backend=airflow.api.auth.backend.basic_auth" \
diff --git a/scripts/ci/libraries/_parallel.sh b/scripts/ci/libraries/_parallel.sh
index f81fee0..69d362a 100644
--- a/scripts/ci/libraries/_parallel.sh
+++ b/scripts/ci/libraries/_parallel.sh
@@ -22,6 +22,9 @@
 function parallel::initialize_monitoring() {
     PARALLEL_MONITORED_DIR="$(mktemp -d)"
     export PARALLEL_MONITORED_DIR
+
+    PARALLEL_TAIL_LENGTH=${PARALLEL_TAIL_LENGTH:=2}
+    export PARALLEL_TAIL_LENGTH
 }
 
 function parallel::make_sure_gnu_parallel_is_installed() {
@@ -81,9 +84,9 @@ function parallel::monitor_loop() {
               continue
             fi
 
-            echo "${COLOR_BLUE}### The last lines for ${parallel_process} process: ${directory}/stdout ###${COLOR_RESET}"
+            echo "${COLOR_BLUE}### The last ${PARALLEL_TAIL_LENGTH} lines for ${parallel_process} process: ${directory}/stdout ###${COLOR_RESET}"
             echo
-            tail -2 "${directory}/stdout" || true
+            tail "-${PARALLEL_TAIL_LENGTH}" "${directory}/stdout" || true
             echo
 
             if [[ -s "${directory}/status" ]]; then
diff --git a/scripts/ci/libraries/_push_pull_remove_images.sh b/scripts/ci/libraries/_push_pull_remove_images.sh
index 0e99b10..847d010 100644
--- a/scripts/ci/libraries/_push_pull_remove_images.sh
+++ b/scripts/ci/libraries/_push_pull_remove_images.sh
@@ -48,6 +48,7 @@ function push_pull_remove_images::push_image_with_retries() {
 # Should be run with set +e
 # Parameters:
 #   $1 -> image to pull
+#   $2 - fallback image
 function push_pull_remove_images::pull_image_if_not_present_or_forced() {
     local image_to_pull="${1}"
     local image_hash
@@ -62,25 +63,6 @@ function push_pull_remove_images::pull_image_if_not_present_or_forced() {
         echo "Pulling the image ${image_to_pull}"
         echo
         docker_v pull "${image_to_pull}"
-        local exit_value="$?"
-        if [[ ${exit_value} != "0" && ${FAIL_ON_GITHUB_DOCKER_PULL_ERROR} == "true" ]]; then
-            echo
-            echo """
-${COLOR_RED}ERROR: Exiting on docker pull error
-
-If you have authorisation problems, you might want to run:
-
-docker login ${image_to_pull%%\/*}
-
-You need to use generate token as the password, not your personal password.
-You can generate one at https://github.com/settings/tokens
-Make sure to choose 'read:packages' scope.
-${COLOR_RESET}
-"""
-            exit ${exit_value}
-        fi
-        echo
-        return ${exit_value}
     fi
 }
 
@@ -90,7 +72,7 @@ function push_pull_remove_images::check_and_rebuild_python_base_image_if_needed(
    local dockerhub_python_version
    dockerhub_python_version=$(docker run "${PYTHON_BASE_IMAGE}" python -c 'import sys; print(sys.version)')
    local local_python_version
-   local_python_version=$(docker run "${AIRFLOW_PYTHON_BASE_IMAGE}" python -c 'import sys; print(sys.version)')
+   local_python_version=$(docker run "${AIRFLOW_PYTHON_BASE_IMAGE}" python -c 'import sys; print(sys.version)' || true)
    if [[ ${local_python_version} != "${dockerhub_python_version}" ]]; then
        echo
        echo "There is a new Python Base image updated!"
@@ -102,6 +84,10 @@ function push_pull_remove_images::check_and_rebuild_python_base_image_if_needed(
             docker_v build \
                 --label "org.opencontainers.image.source=https://github.com/${GITHUB_REPOSITORY}" \
                 -t "${AIRFLOW_PYTHON_BASE_IMAGE}" -
+  else
+      echo
+      echo "Not rebuilding the base python image - the image has the same python version ${dockerhub_python_version}"
+      echo
   fi
 }
 
@@ -116,10 +102,10 @@ function push_pull_remove_images::check_and_rebuild_python_base_image_if_needed(
 #     it will pull the right image using the specified suffix
 function push_pull_remove_images::pull_base_python_image() {
     echo
-    echo "Docker pulling base python image. Upgrade to newer deps: ${UPGRADE_TO_NEWER_DEPENDENCIES}"
+    echo "Docker pull base python image. Upgrade to newer deps: ${UPGRADE_TO_NEWER_DEPENDENCIES}"
     echo
     if [[ -n ${DETECTED_TERMINAL=} ]]; then
-        echo -n "Docker pulling base python image. Upgrade to newer deps: ${UPGRADE_TO_NEWER_DEPENDENCIES}
+        echo -n "Docker pull base python image. Upgrade to newer deps: ${UPGRADE_TO_NEWER_DEPENDENCIES}
 " > "${DETECTED_TERMINAL}"
     fi
     if [[ ${GITHUB_REGISTRY_PULL_IMAGE_TAG} != "latest" ]]; then
@@ -132,8 +118,14 @@ function push_pull_remove_images::pull_base_python_image() {
             return 1
         fi
     else
+        set +e
         push_pull_remove_images::pull_image_if_not_present_or_forced "${AIRFLOW_PYTHON_BASE_IMAGE}"
-        if [[ ${CHECK_IF_BASE_PYTHON_IMAGE_UPDATED} == "true" ]] ; then
+        local res="$?"
+        set -e
+        if [[ ${CHECK_IF_BASE_PYTHON_IMAGE_UPDATED} == "true" || ${res} != "0" ]] ; then
+            # Rebuild the base python image using DockerHub - either when we explicitly want it
+            # or when there is no image available yet in ghcr.io (usually when you build it for the
+            # first time in your repository
             push_pull_remove_images::check_and_rebuild_python_base_image_if_needed
         fi
     fi
@@ -151,8 +143,26 @@ function push_pull_remove_images::pull_ci_images_if_needed() {
         fi
     fi
     if [[ "${DOCKER_CACHE}" == "pulled" ]]; then
+        set +e
         push_pull_remove_images::pull_image_if_not_present_or_forced \
             "${AIRFLOW_CI_IMAGE}:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
+        local res="$?"
+        set -e
+        if [[ ${res} != "0" ]]; then
+            if [[ ${GITHUB_REGISTRY_PULL_IMAGE_TAG} == "latest" ]] ; then
+                echo
+                echo "The CI image cache does not exist. This is likely the first time you build the image"
+                echo "Switching to 'local' cache for docker images"
+                echo
+                DOCKER_CACHE="local"
+            else
+                echo
+                echo "The CI image cache does not exist and we want to pull tag ${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
+                echo "Failing as we have to pull the tagged image in order to continue"
+                echo
+                return "${res}"
+            fi
+        fi
     fi
 }
 
@@ -169,12 +179,33 @@ function push_pull_remove_images::pull_prod_images_if_needed() {
         fi
     fi
     if [[ "${DOCKER_CACHE}" == "pulled" ]]; then
+        set +e
         # "Build" segment of production image
         push_pull_remove_images::pull_image_if_not_present_or_forced \
             "${AIRFLOW_PROD_BUILD_IMAGE}:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
-        # "Main" segment of production image
-        push_pull_remove_images::pull_image_if_not_present_or_forced \
-            "${AIRFLOW_PROD_IMAGE}:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
+        local res="$?"
+        if [[ ${res} == "0" ]]; then
+            # "Main" segment of production image
+            push_pull_remove_images::pull_image_if_not_present_or_forced \
+                "${AIRFLOW_PROD_IMAGE}:${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
+            res="$?"
+        fi
+        set -e
+        if [[ ${res} != "0" ]]; then
+            if [[ ${GITHUB_REGISTRY_PULL_IMAGE_TAG} == "latest" ]] ; then
+                echo
+                echo "The PROD image cache does not exist. This is likely the first time you build the image"
+                echo "Switching to 'local' cache for docker images"
+                echo
+                DOCKER_CACHE="local"
+            else
+                echo
+                echo "The PROD image cache does not exist and we want to pull tag ${GITHUB_REGISTRY_PULL_IMAGE_TAG}"
+                echo "Failing as we have to pull the tagged image in order to continue"
+                echo
+                return "${res}"
+            fi
+        fi
     fi
 }
 
@@ -203,17 +234,12 @@ function push_pull_remove_images::push_ci_images_to_github() {
     local airflow_ci_tagged_image="${AIRFLOW_CI_IMAGE}:${GITHUB_REGISTRY_PUSH_IMAGE_TAG}"
     docker_v tag "${AIRFLOW_CI_IMAGE}" "${airflow_ci_tagged_image}"
     push_pull_remove_images::push_image_with_retries "${airflow_ci_tagged_image}"
+    # Also push ci manifest iumage image if GITHUB_REGISTRY_PUSH_IMAGE_TAG is "latest"
     if [[ ${GITHUB_REGISTRY_PUSH_IMAGE_TAG} == "latest" ]]; then
-        local airflow_ci_manifest_tagged_image="${AIRFLOW_CI_REMOTE_MANIFEST_IMAGE}:${GITHUB_REGISTRY_PUSH_IMAGE_TAG}"
+        local airflow_ci_manifest_tagged_image="${AIRFLOW_CI_REMOTE_MANIFEST_IMAGE}:latest"
         docker_v tag "${AIRFLOW_CI_LOCAL_MANIFEST_IMAGE}" "${airflow_ci_manifest_tagged_image}"
         push_pull_remove_images::push_image_with_retries "${airflow_ci_manifest_tagged_image}"
     fi
-    if [[ -n ${GITHUB_SHA=} ]]; then
-        # Also push image to GitHub registry with commit SHA
-        local airflow_ci_sha_image="${AIRFLOW_CI_IMAGE}:${COMMIT_SHA}"
-        docker_v tag "${AIRFLOW_CI_IMAGE}" "${airflow_ci_sha_image}"
-        push_pull_remove_images::push_image_with_retries "${airflow_ci_sha_image}"
-    fi
 }
 
 # Pushes PROD image to registry in GitHub
@@ -222,19 +248,18 @@ function push_pull_remove_images::push_ci_images_to_github() {
 #     "${COMMIT_SHA}" - in case of pull-request triggered 'workflow_run' builds
 #     "latest"        - in case of push builds
 function push_pull_remove_images::push_prod_images_to_github () {
+    if [[ "${PUSH_PYTHON_BASE_IMAGE=}" != "false" ]]; then
+        push_pull_remove_images::push_python_image_to_github
+    fi
     local airflow_prod_tagged_image="${AIRFLOW_PROD_IMAGE}:${GITHUB_REGISTRY_PUSH_IMAGE_TAG}"
     docker_v tag "${AIRFLOW_PROD_IMAGE}" "${airflow_prod_tagged_image}"
     push_pull_remove_images::push_image_with_retries "${airflow_prod_tagged_image}"
-    if [[ -n ${COMMIT_SHA=} ]]; then
-        # Also push image to GitHub registry with commit SHA
-        local airflow_prod_sha_image="${AIRFLOW_PROD_IMAGE}:${COMMIT_SHA}"
-        docker_v tag "${AIRFLOW_PROD_IMAGE}" "${airflow_prod_sha_image}"
-        push_pull_remove_images::push_image_with_retries "${airflow_prod_sha_image}"
+    # Also push prod build image if GITHUB_REGISTRY_PUSH_IMAGE_TAG is "latest"
+    if [[ ${GITHUB_REGISTRY_PUSH_IMAGE_TAG} == "latest" ]]; then
+        local airflow_prod_build_tagged_image="${AIRFLOW_PROD_BUILD_IMAGE}:latest"
+        docker_v tag "${AIRFLOW_PROD_BUILD_IMAGE}" "${airflow_prod_build_tagged_image}"
+        push_pull_remove_images::push_image_with_retries "${airflow_prod_build_tagged_image}"
     fi
-    # Also push prod build image
-    local airflow_prod_build_tagged_image="${AIRFLOW_PROD_BUILD_IMAGE}:${GITHUB_REGISTRY_PUSH_IMAGE_TAG}"
-    docker_v tag "${AIRFLOW_PROD_BUILD_IMAGE}" "${airflow_prod_build_tagged_image}"
-    push_pull_remove_images::push_image_with_retries "${airflow_prod_build_tagged_image}"
 }
 
 # waits for an image to be available in GitHub Container Registry. Should be run with `set +e`
@@ -253,12 +278,18 @@ function push_pull_remove_images::check_image_manifest() {
 }
 
 # waits for an image to be available in the GitHub registry
+# Remove the fallback on 7th of August 2021
 function push_pull_remove_images::wait_for_image() {
     set +e
-    echo " Waiting for github registry image: " "$1"
+    echo " Waiting for github registry image: $1 with $2 fallback"
     while true
     do
         if push_pull_remove_images::check_image_manifest "$1"; then
+            export IMAGE_AVAILABLE="$1"
+            break
+        fi
+        if push_pull_remove_images::check_image_manifest "$2"; then
+            export IMAGE_AVAILABLE="$2"
             break
         fi
         sleep 30
diff --git a/scripts/ci/libraries/_script_init.sh b/scripts/ci/libraries/_script_init.sh
index 0f3c862..dc79fd5 100755
--- a/scripts/ci/libraries/_script_init.sh
+++ b/scripts/ci/libraries/_script_init.sh
@@ -41,7 +41,7 @@ build_images::determine_docker_cache_strategy
 
 initialization::get_environment_for_builds_on_ci
 
-build_images::get_docker_image_names
+build_images::get_docker_cache_image_names
 
 initialization::make_constants_read_only
 
diff --git a/scripts/ci/selective_ci_checks.sh b/scripts/ci/selective_ci_checks.sh
index e7a9144..ee3b506 100755
--- a/scripts/ci/selective_ci_checks.sh
+++ b/scripts/ci/selective_ci_checks.sh
@@ -46,10 +46,12 @@ function check_upgrade_to_newer_dependencies_needed() {
     # shellcheck disable=SC2153
     if [[ "${UPGRADE_TO_NEWER_DEPENDENCIES}" != "false" ||
             ${GITHUB_EVENT_NAME=} == 'push' || ${GITHUB_EVENT_NAME=} == "scheduled" ]]; then
-        # Trigger upgrading to latest constraints where label is set or when
-        # SHA of the merge commit triggers rebuilding layer in the docker image
+        # Trigger upgrading to latest constraints when we are in push or schedule event or when it is forced
+        # By UPGRADE_TO_NEWER_DEPENDENCIES set to non-false. The variable is set to
+        # SHA of the merge commit - so that it always triggers rebuilding layer in the docker image
         # Each build that upgrades to latest constraints will get truly latest constraints, not those
-        # Cached in the image this way
+        # Cached in the image if we set it to "true". This upgrade_to_newer_dependencies variable
+        # can later be overridden in case we find that setup.* files changed (see below)
         upgrade_to_newer_dependencies="${INCOMING_COMMIT_SHA}"
     fi
 }
@@ -338,6 +340,8 @@ function check_if_setup_files_changed() {
     show_changed_files
 
     if [[ $(count_changed_files) != "0" ]]; then
+        # In case the setup files changed, we automatically force upgrading to newer dependencies
+        # no matter what was set before
         upgrade_to_newer_dependencies="${INCOMING_COMMIT_SHA}"
     fi
     start_end::group_end
diff --git a/scripts/ci/tools/fix_ownership.sh b/scripts/ci/tools/fix_ownership.sh
index 6ed1161..de15621 100755
--- a/scripts/ci/tools/fix_ownership.sh
+++ b/scripts/ci/tools/fix_ownership.sh
@@ -33,8 +33,12 @@ sanity_checks::sanitize_mounted_files
 
 read -r -a EXTRA_DOCKER_FLAGS <<<"$(local_mounts::convert_local_mounts_to_docker_params)"
 
-docker_v run --entrypoint /bin/bash "${EXTRA_DOCKER_FLAGS[@]}" \
-    --rm \
-    --env-file "${AIRFLOW_SOURCES}/scripts/ci/docker-compose/_docker.env" \
-    "${AIRFLOW_CI_IMAGE}" \
-    -c /opt/airflow/scripts/in_container/run_fix_ownership.sh || true
+if docker image inspect "${AIRFLOW_CI_IMAGE}" >/dev/null 2>&1; then
+    docker_v run --entrypoint /bin/bash "${EXTRA_DOCKER_FLAGS[@]}" \
+        --rm \
+        --env-file "${AIRFLOW_SOURCES}/scripts/ci/docker-compose/_docker.env" \
+        "${AIRFLOW_CI_IMAGE}" \
+        -c /opt/airflow/scripts/in_container/run_fix_ownership.sh || true
+else
+    echo "Skip fixing ownership as seems that you do not have the ${AIRFLOW_CI_IMAGE} image yet"
+fi
diff --git a/scripts/docker/install_additional_dependencies.sh b/scripts/docker/install_additional_dependencies.sh
index 6c035ae..4f9c05f 100755
--- a/scripts/docker/install_additional_dependencies.sh
+++ b/scripts/docker/install_additional_dependencies.sh
@@ -23,7 +23,6 @@ test -v ADDITIONAL_PYTHON_DEPS
 test -v EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS
 test -v AIRFLOW_INSTALL_USER_FLAG
 test -v AIRFLOW_PIP_VERSION
-test -v CONTINUE_ON_PIP_CHECK_FAILURE
 
 # shellcheck source=scripts/docker/common.sh
 . "$( dirname "${BASH_SOURCE[0]}" )/common.sh"
@@ -41,7 +40,7 @@ function install_additional_dependencies() {
             ${ADDITIONAL_PYTHON_DEPS} ${EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS}
         # make sure correct PIP version is used
         pip install ${AIRFLOW_INSTALL_USER_FLAG} --upgrade "pip==${AIRFLOW_PIP_VERSION}"
-        pip check || ${CONTINUE_ON_PIP_CHECK_FAILURE}
+        pip check
     else
         echo
         echo Installing additional dependencies upgrading only if needed
@@ -51,7 +50,7 @@ function install_additional_dependencies() {
             ${ADDITIONAL_PYTHON_DEPS}
         # make sure correct PIP version is used
         pip install ${AIRFLOW_INSTALL_USER_FLAG} --upgrade "pip==${AIRFLOW_PIP_VERSION}"
-        pip check || ${CONTINUE_ON_PIP_CHECK_FAILURE}
+        pip check
     fi
 }
 
diff --git a/scripts/docker/install_airflow.sh b/scripts/docker/install_airflow.sh
index 4904027..e2bca4f 100755
--- a/scripts/docker/install_airflow.sh
+++ b/scripts/docker/install_airflow.sh
@@ -60,7 +60,7 @@ function install_airflow() {
 
         # make sure correct PIP version is used
         pip install ${AIRFLOW_INSTALL_USER_FLAG} --upgrade "pip==${AIRFLOW_PIP_VERSION}"
-        pip check || ${CONTINUE_ON_PIP_CHECK_FAILURE}
+        pip check
     else \
         echo
         echo Installing all packages with constraints and upgrade if needed
@@ -76,7 +76,7 @@ function install_airflow() {
             "${AIRFLOW_INSTALLATION_METHOD}[${AIRFLOW_EXTRAS}]${AIRFLOW_VERSION_SPECIFICATION}" \
         # make sure correct PIP version is used
         pip install ${AIRFLOW_INSTALL_USER_FLAG} --upgrade "pip==${AIRFLOW_PIP_VERSION}"
-        pip check || ${CONTINUE_ON_PIP_CHECK_FAILURE}
+        pip check
     fi
 
 }
diff --git a/scripts/docker/install_airflow_from_branch_tip.sh b/scripts/docker/install_airflow_dependencies_from_branch_tip.sh
similarity index 85%
rename from scripts/docker/install_airflow_from_branch_tip.sh
rename to scripts/docker/install_airflow_dependencies_from_branch_tip.sh
index 925a872..61aaa13 100755
--- a/scripts/docker/install_airflow_from_branch_tip.sh
+++ b/scripts/docker/install_airflow_dependencies_from_branch_tip.sh
@@ -30,28 +30,29 @@
 . "$( dirname "${BASH_SOURCE[0]}" )/common.sh"
 
 
-function install_airflow_from_branch_tip() {
+function install_airflow_dependencies_from_branch_tip() {
     echo
     echo "Installing airflow from ${AIRFLOW_BRANCH}. It is used to cache dependencies"
     echo
     if [[ ${INSTALL_MYSQL_CLIENT} != "true" ]]; then
        AIRFLOW_EXTRAS=${AIRFLOW_EXTRAS/mysql,}
     fi
-    # Install latest set of dependencies using constraints
+    # Install latest set of dependencies using constraints. In case constraints were upgraded and there
+    # are conflicts, this might fail, but it should be fixed in the following installation steps
     pip install ${AIRFLOW_INSTALL_USER_FLAG} \
       "https://github.com/${AIRFLOW_REPO}/archive/${AIRFLOW_BRANCH}.tar.gz#egg=apache-airflow[${AIRFLOW_EXTRAS}]" \
-      --constraint "${AIRFLOW_CONSTRAINTS_LOCATION}"
+      --constraint "${AIRFLOW_CONSTRAINTS_LOCATION}" || true
     # make sure correct PIP version is used
     pip install ${AIRFLOW_INSTALL_USER_FLAG} --upgrade "pip==${AIRFLOW_PIP_VERSION}"
     pip freeze | grep apache-airflow-providers | xargs pip uninstall --yes || true
     echo
     echo Uninstalling just airflow. Dependencies remain.
     echo
-    pip uninstall --yes apache-airflow
+    pip uninstall --yes apache-airflow || true
 }
 
 common::get_airflow_version_specification
 common::override_pip_version_if_needed
 common::get_constraints_location
 
-install_airflow_from_branch_tip
+install_airflow_dependencies_from_branch_tip
diff --git a/scripts/docker/install_from_docker_context_files.sh b/scripts/docker/install_from_docker_context_files.sh
index 813d1b0..d8ed6bc 100755
--- a/scripts/docker/install_from_docker_context_files.sh
+++ b/scripts/docker/install_from_docker_context_files.sh
@@ -96,7 +96,7 @@ function install_airflow_and_providers_from_docker_context_files(){
 
     # make sure correct PIP version is left installed
     pip install ${AIRFLOW_INSTALL_USER_FLAG} --upgrade "pip==${AIRFLOW_PIP_VERSION}"
-    pip check || ${CONTINUE_ON_PIP_CHECK_FAILURE}
+    pip check
 
 }
 

[airflow] 39/39: Proper warning message when recorded PID is different from current PID (#17411)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0bd955831657a0ce628577983e5fb9ca16b7875c
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Aug 4 13:32:05 2021 +0100

    Proper warning message when recorded PID is different from current PID (#17411)
    
    Currently, when the recorded PID is different from the current PID, in
    the case of run_as_user, the warning is not clear because ti.pid is used
    as the recorded PID instead of parent process of ti.pid. In this case,
    users would see that the PIDs are the same but there was a warning that
    they are not the same
    
    This change fixes it.
    
    (cherry picked from commit a4b6f1c1c5ab92fd5b623f119263d83bd46ab2e6)
---
 airflow/jobs/local_task_job.py | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py
index af7b317..3afc801 100644
--- a/airflow/jobs/local_task_job.py
+++ b/airflow/jobs/local_task_job.py
@@ -187,14 +187,17 @@ class LocalTaskJob(BaseJob):
                 )
                 raise AirflowException("Hostname of job runner does not match")
             current_pid = self.task_runner.process.pid
-
-            same_process = ti.pid == current_pid
+            recorded_pid = ti.pid
+            same_process = recorded_pid == current_pid
 
             if ti.run_as_user or self.task_runner.run_as_user:
-                same_process = psutil.Process(ti.pid).ppid() == current_pid
+                recorded_pid = psutil.Process(ti.pid).ppid()
+                same_process = recorded_pid == current_pid
 
-            if ti.pid is not None and not same_process:
-                self.log.warning("Recorded pid %s does not match " "the current pid %s", ti.pid, current_pid)
+            if recorded_pid is not None and not same_process:
+                self.log.warning(
+                    "Recorded pid %s does not match the current pid %s", recorded_pid, current_pid
+                )
                 raise AirflowException("PID of job runner does not match")
         elif self.task_runner.return_code() is None and hasattr(self.task_runner, 'process'):
             self.log.warning(

[airflow] 36/39: Redact conn secrets in webserver logs (#16579)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4d2db2e8ff50c9ef1933aa07d4802c3418ea1486
Author: Sumit Maheshwari <ms...@users.noreply.github.com>
AuthorDate: Tue Jun 22 14:40:45 2021 +0530

    Redact conn secrets in webserver logs (#16579)
    
    (cherry picked from commit 2a59de3e558e3b60caad876dee8fa4b43a7a17cf)
---
 airflow/hooks/base.py | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/airflow/hooks/base.py b/airflow/hooks/base.py
index f286a6f..3531bf3 100644
--- a/airflow/hooks/base.py
+++ b/airflow/hooks/base.py
@@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Any, Dict, List
 
 from airflow.typing_compat import Protocol
 from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.log.secrets_masker import redact
 
 if TYPE_CHECKING:
     from airflow.models.connection import Connection  # Avoid circular imports.
@@ -74,8 +75,8 @@ class BaseHook(LoggingMixin):
                 conn.port,
                 conn.schema,
                 conn.login,
-                conn.password,
-                conn.extra_dejson,
+                redact(conn.password),
+                redact(conn.extra_dejson),
             )
         return conn