You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/29 11:14:37 UTC

[airflow] branch v2-0-test updated (c25ae81 -> 8a2a33a)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a change to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from c25ae81  Adds dill exclusion to Dockerfiles to accomodate upcoming beam fix (#15048)
     new 64f3277  Remove extra/needless deprecation warnings from airflow.contrib module (#15065)
     new 8a2a33a  Scheduler: Remove TIs from starved pools from the critical path. (#14476)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/contrib/__init__.py           |  4 --
 airflow/contrib/operators/__init__.py |  8 ----
 airflow/jobs/scheduler_job.py         |  6 ++-
 tests/jobs/test_scheduler_job.py      | 75 +++++++++++++++++++++++++++++++++++
 4 files changed, 80 insertions(+), 13 deletions(-)

[airflow] 01/02: Remove extra/needless deprecation warnings from airflow.contrib module (#15065)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 64f327710736226bf639415594a270bedfc5c9bc
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Mon Mar 29 11:57:56 2021 +0100

    Remove extra/needless deprecation warnings from airflow.contrib module (#15065)
    
    If you have `from airflow.contrib.operators.emr_add_steps_operator
    import EmrAddStepsOperator` line in your DAG file, you get three
    warnings for this one line
    
    ```
    /home/ash/airflow/dags/foo.py:3 DeprecationWarning: This module is deprecated.
    /home/ash/airflow/dags/foo.py:3 DeprecationWarning: This package is deprecated. Please use `airflow.operators` or `airflow.providers.*.operators`.
    /home/ash/airflow/dags/foo.py:3 DeprecationWarning: This module is deprecated. Please use `airflow.providers.amazon.aws.operators.emr_add_steps`.
    ```
    
    All but the last is not helpful.
---
 airflow/contrib/__init__.py           | 4 ----
 airflow/contrib/operators/__init__.py | 8 --------
 2 files changed, 12 deletions(-)

diff --git a/airflow/contrib/__init__.py b/airflow/contrib/__init__.py
index 3a89862..37bd67f 100644
--- a/airflow/contrib/__init__.py
+++ b/airflow/contrib/__init__.py
@@ -16,7 +16,3 @@
 # specific language governing permissions and limitations
 # under the License.
 """This package is deprecated."""
-
-import warnings
-
-warnings.warn("This module is deprecated.", DeprecationWarning, stacklevel=2)
diff --git a/airflow/contrib/operators/__init__.py b/airflow/contrib/operators/__init__.py
index ad3fa4b..2041adb 100644
--- a/airflow/contrib/operators/__init__.py
+++ b/airflow/contrib/operators/__init__.py
@@ -17,11 +17,3 @@
 # under the License.
 #
 """This package is deprecated. Please use `airflow.operators` or `airflow.providers.*.operators`."""
-
-import warnings
-
-warnings.warn(
-    "This package is deprecated. Please use `airflow.operators` or `airflow.providers.*.operators`.",
-    DeprecationWarning,
-    stacklevel=2,
-)

[airflow] 02/02: Scheduler: Remove TIs from starved pools from the critical path. (#14476)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8a2a33ae548db60cff46f9a0618d5489c7a83509
Author: Benoit Person <be...@gmail.com>
AuthorDate: Mon Mar 29 11:13:27 2021 +0000

    Scheduler: Remove TIs from starved pools from the critical path. (#14476)
    
    Co-authored-by: Ash Berlin-Taylor <as...@apache.org>
---
 airflow/jobs/scheduler_job.py    |  6 +++-
 tests/jobs/test_scheduler_job.py | 75 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 80 insertions(+), 1 deletion(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 3970df9..e380512 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -920,8 +920,12 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
             .filter(not_(DM.is_paused))
             .filter(TI.state == State.SCHEDULED)
             .options(selectinload('dag_model'))
-            .limit(max_tis)
         )
+        starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
+        if starved_pools:
+            query = query.filter(not_(TI.pool.in_(starved_pools)))
+
+        query = query.limit(max_tis)
 
         task_instances_to_examine: List[TI] = with_row_locks(
             query,
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index f01c139..9347fa4 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2670,6 +2670,81 @@ class TestSchedulerJob(unittest.TestCase):
         # As tasks require 2 slots, only 3 can fit into 6 available
         assert len(task_instances_list) == 3
 
+    def test_scheduler_keeps_scheduling_pool_full(self):
+        """
+        Test task instances in a pool that isn't full keep getting scheduled even when a pool is full.
+        """
+        dag_d1 = DAG(dag_id='test_scheduler_keeps_scheduling_pool_full_d1', start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_pool_full_t1',
+            dag=dag_d1,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_pool_full_p1',
+            bash_command='echo hi',
+        )
+
+        dag_d2 = DAG(dag_id='test_scheduler_keeps_scheduling_pool_full_d2', start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_pool_full_t2',
+            dag=dag_d2,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_pool_full_p2',
+            bash_command='echo hi',
+        )
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+        dagbag.bag_dag(dag=dag_d1, root_dag=dag_d1)
+        dagbag.bag_dag(dag=dag_d2, root_dag=dag_d2)
+        dagbag.sync_to_db()
+
+        session = settings.Session()
+        pool_p1 = Pool(pool='test_scheduler_keeps_scheduling_pool_full_p1', slots=1)
+        pool_p2 = Pool(pool='test_scheduler_keeps_scheduling_pool_full_p2', slots=10)
+        session.add(pool_p1)
+        session.add(pool_p2)
+        session.commit()
+
+        dag_d1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_d1))
+
+        scheduler = SchedulerJob(executor=self.null_exec)
+        scheduler.processor_agent = mock.MagicMock()
+
+        # Create 5 dagruns for each DAG.
+        # To increase the chances the TIs from the "full" pool will get retrieved first, we schedule all
+        # TIs from the first dag first.
+        date = DEFAULT_DATE
+        for _ in range(5):
+            dr = dag_d1.create_dagrun(
+                run_type=DagRunType.SCHEDULED,
+                execution_date=date,
+                state=State.RUNNING,
+            )
+            scheduler._schedule_dag_run(dr, {}, session)
+            date = dag_d1.following_schedule(date)
+
+        date = DEFAULT_DATE
+        for _ in range(5):
+            dr = dag_d2.create_dagrun(
+                run_type=DagRunType.SCHEDULED,
+                execution_date=date,
+                state=State.RUNNING,
+            )
+            scheduler._schedule_dag_run(dr, {}, session)
+            date = dag_d2.following_schedule(date)
+
+        scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
+        task_instances_list2 = scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
+
+        # Make sure we get TIs from a non-full pool in the 2nd list
+        assert len(task_instances_list2) > 0
+        assert all(
+            task_instance.pool != 'test_scheduler_keeps_scheduling_pool_full_p1'
+            for task_instance in task_instances_list2
+        )
+
     def test_scheduler_verify_priority_and_slots(self):
         """
         Test task instances with higher priority are not queued