You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/14 18:52:51 UTC

[airflow] branch v1-10-test updated (762e37b -> a758af0)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard 762e37b  Fix airflow-webserver startup errors when using Kerberos Auth (#10047)
 discard 1c38ef3  [AIRFLOW-5897] Allow setting -1 as pool slots value in webserver (#6550)
    omit 491899f  Remove chart from exported sources
    omit b66600f  Fixes treatment of open slots in scheduler (#9316) (#9505)
     new c5a941b  Fixes treatment of open slots in scheduler (#9316) (#9505)
     new 440bbfe  Remove chart from exported sources
     new 84e52e8  [AIRFLOW-5897] Allow setting -1 as pool slots value in webserver (#6550)
     new a758af0  Fix airflow-webserver startup errors when using Kerberos Auth (#10047)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (762e37b)
            \
             N -- N -- N   refs/heads/v1-10-test (a758af0)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[airflow] 01/04: Fixes treatment of open slots in scheduler (#9316) (#9505)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c5a941b9b590eaa528e0b41efa62b63027327664
Author: pulsar314 <pu...@gmail.com>
AuthorDate: Thu Jun 25 22:42:03 2020 +0300

    Fixes treatment of open slots in scheduler (#9316) (#9505)
    
    Makes scheduler count with number of slots required by tasks.
    If there's less open slots than required, a task isn't taken to a queue.
    
    (cherry picked from commit 0e31f186d38b776710080ba07be50eedf42c48a7)
---
 airflow/jobs/scheduler_job.py    |  29 +++++---
 tests/jobs/test_scheduler_job.py | 144 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 165 insertions(+), 8 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 97f3929..685b57f 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -969,6 +969,9 @@ class SchedulerJob(BaseJob):
         dag_concurrency_map, task_concurrency_map = self.__get_concurrency_maps(
             states=STATES_TO_COUNT_AS_RUNNING, session=session)
 
+        num_tasks_in_executor = 0
+        num_starving_tasks_total = 0
+
         # Go through each pool, and queue up a task for execution if there are
         # any open slots in the pool.
         for pool, task_instances in pool_to_task_instances.items():
@@ -992,9 +995,7 @@ class SchedulerJob(BaseJob):
             priority_sorted_task_instances = sorted(
                 task_instances, key=lambda ti: (-ti.priority_weight, ti.execution_date))
 
-            # Number of tasks that cannot be scheduled because of no open slot in pool
             num_starving_tasks = 0
-            num_tasks_in_executor = 0
             for current_index, task_instance in enumerate(priority_sorted_task_instances):
                 if open_slots <= 0:
                     self.log.info(
@@ -1002,7 +1003,9 @@ class SchedulerJob(BaseJob):
                         open_slots, pool
                     )
                     # Can't schedule any more since there are no more open slots.
-                    num_starving_tasks = len(priority_sorted_task_instances) - current_index
+                    num_unhandled = len(priority_sorted_task_instances) - current_index
+                    num_starving_tasks += num_unhandled
+                    num_starving_tasks_total += num_unhandled
                     break
 
                 # Check to make sure that the task concurrency of the DAG hasn't been
@@ -1045,8 +1048,17 @@ class SchedulerJob(BaseJob):
                     num_tasks_in_executor += 1
                     continue
 
+                if task_instance.pool_slots > open_slots:
+                    self.log.info("Not executing %s since it requires %s slots "
+                                  "but there are %s open slots in the pool %s.",
+                                  task_instance, task_instance.pool_slots, open_slots, pool)
+                    num_starving_tasks += 1
+                    num_starving_tasks_total += 1
+                    # Though we can execute tasks with lower priority if there's enough room
+                    continue
+
                 executable_tis.append(task_instance)
-                open_slots -= 1
+                open_slots -= task_instance.pool_slots
                 dag_concurrency_map[dag_id] += 1
                 task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1
 
@@ -1056,10 +1068,11 @@ class SchedulerJob(BaseJob):
                         pools[pool_name].open_slots())
             Stats.gauge('pool.used_slots.{pool_name}'.format(pool_name=pool_name),
                         pools[pool_name].occupied_slots())
-            Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
-            Stats.gauge('scheduler.tasks.running', num_tasks_in_executor)
-            Stats.gauge('scheduler.tasks.starving', num_starving_tasks)
-            Stats.gauge('scheduler.tasks.executable', len(executable_tis))
+
+        Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
+        Stats.gauge('scheduler.tasks.running', num_tasks_in_executor)
+        Stats.gauge('scheduler.tasks.starving', num_starving_tasks_total)
+        Stats.gauge('scheduler.tasks.executable', len(executable_tis))
 
         task_instance_str = "\n\t".join(
             [repr(x) for x in executable_tis])
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 161e479..2188d8b 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -48,6 +48,7 @@ from airflow.models import DAG, DagBag, DagModel, DagRun, Pool, SlaMiss, \
     TaskInstance as TI, errors
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.serialization.serialized_objects import SerializedDAG
 from airflow.utils import timezone
 from airflow.utils.dag_processing import SimpleDag, SimpleDagBag, list_py_file_paths
 from airflow.utils.dates import days_ago
@@ -1901,6 +1902,149 @@ class SchedulerJobTest(unittest.TestCase):
         self.assertIsNotNone(dr)
         self.assertEqual(dr.execution_date, timezone.datetime(2016, 1, 1, 10, 10))
 
+    def test_scheduler_verify_pool_full_2_slots_per_task(self):
+        """
+        Test task instances not queued when pool is full.
+
+        Variation with non-default pool_slots
+        """
+        dag = DAG(
+            dag_id='test_scheduler_verify_pool_full_2_slots_per_task',
+            start_date=DEFAULT_DATE)
+
+        DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow',
+            pool='test_scheduler_verify_pool_full_2_slots_per_task',
+            pool_slots=2,
+        )
+
+        session = settings.Session()
+        pool = Pool(pool='test_scheduler_verify_pool_full_2_slots_per_task', slots=6)
+        session.add(pool)
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        orm_dag.is_paused = False
+        session.merge(orm_dag)
+        session.commit()
+
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
+        scheduler = SchedulerJob(executor=self.null_exec)
+
+        # Create 5 dagruns, which will create 5 task instances.
+        for _ in range(5):
+            scheduler.create_dag_run(dag)
+        task_instances_list = []
+        scheduler._process_task_instances(dag, task_instances_list=task_instances_list)
+        self.assertEqual(len(task_instances_list), 5)
+        dagbag = self._make_simple_dag_bag([dag])
+
+        # Recreated part of the scheduler here, to kick off tasks -> executor
+        for ti_key in task_instances_list:
+            task = dag.get_task(ti_key[1])
+            ti = TaskInstance(task, ti_key[2])
+            # Task starts out in the scheduled state. All tasks in the
+            # scheduled state will be sent to the executor
+            ti.state = State.SCHEDULED
+
+            # Also save this task instance to the DB.
+            session.merge(ti)
+        session.commit()
+
+        self.assertEqual(len(scheduler.executor.queued_tasks), 0, "Check test pre-condition")
+        scheduler._execute_task_instances(dagbag, (State.SCHEDULED,), session=session)
+
+        # As tasks require 2 slots, only 3 can fit into 6 available
+        self.assertEqual(len(scheduler.executor.queued_tasks), 3)
+
+    def test_scheduler_verify_priority_and_slots(self):
+        """
+        Test task instances with higher priority are not queued
+        when pool does not have enough slots.
+
+        Though tasks with lower priority might be executed.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_verify_priority_and_slots',
+            start_date=DEFAULT_DATE)
+
+        # Medium priority, not enough slots
+        DummyOperator(
+            task_id='test_scheduler_verify_priority_and_slots_t0',
+            dag=dag,
+            owner='airflow',
+            pool='test_scheduler_verify_priority_and_slots',
+            pool_slots=2,
+            priority_weight=2,
+        )
+        # High priority, occupies first slot
+        DummyOperator(
+            task_id='test_scheduler_verify_priority_and_slots_t1',
+            dag=dag,
+            owner='airflow',
+            pool='test_scheduler_verify_priority_and_slots',
+            pool_slots=1,
+            priority_weight=3,
+        )
+        # Low priority, occupies second slot
+        DummyOperator(
+            task_id='test_scheduler_verify_priority_and_slots_t2',
+            dag=dag,
+            owner='airflow',
+            pool='test_scheduler_verify_priority_and_slots',
+            pool_slots=1,
+            priority_weight=1,
+        )
+
+        session = settings.Session()
+        pool = Pool(pool='test_scheduler_verify_priority_and_slots', slots=2)
+        session.add(pool)
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        orm_dag.is_paused = False
+        session.merge(orm_dag)
+        session.commit()
+
+        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
+        scheduler = SchedulerJob(executor=self.null_exec)
+
+        scheduler.create_dag_run(dag)
+        task_instances_list = []
+        scheduler._process_task_instances(dag, task_instances_list=task_instances_list)
+        self.assertEqual(len(task_instances_list), 3)
+        dagbag = self._make_simple_dag_bag([dag])
+
+        # Recreated part of the scheduler here, to kick off tasks -> executor
+        for ti_key in task_instances_list:
+            task = dag.get_task(ti_key[1])
+            ti = TaskInstance(task, ti_key[2])
+            # Task starts out in the scheduled state. All tasks in the
+            # scheduled state will be sent to the executor
+            ti.state = State.SCHEDULED
+
+            # Also save this task instance to the DB.
+            session.merge(ti)
+        session.commit()
+
+        self.assertEqual(len(scheduler.executor.queued_tasks), 0, "Check test pre-condition")
+        scheduler._execute_task_instances(dagbag, (State.SCHEDULED, ), session=session)
+
+        # Only second and third
+        self.assertEqual(len(scheduler.executor.queued_tasks), 2)
+
+        ti0 = session.query(TaskInstance)\
+            .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t0').first()
+        self.assertEqual(ti0.state, State.SCHEDULED)
+
+        ti1 = session.query(TaskInstance)\
+            .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t1').first()
+        self.assertEqual(ti1.state, State.QUEUED)
+
+        ti2 = session.query(TaskInstance)\
+            .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t2').first()
+        self.assertEqual(ti2.state, State.QUEUED)
+
     def test_scheduler_reschedule(self):
         """
         Checks if tasks that are not taken up by the executor


[airflow] 03/04: [AIRFLOW-5897] Allow setting -1 as pool slots value in webserver (#6550)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 84e52e84484b8f8f9a1e6e8c3925d474fae7c897
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Fri Aug 14 16:13:34 2020 +0200

    [AIRFLOW-5897] Allow setting -1 as pool slots value in webserver (#6550)
    
    This is a follow-up to 5d9216201b061cb35c9e349da0c4bb4c22c774e0.
    
    The original fix only applied to the www UI not the www_rbac one.
    
    (cherry picked from commit 03a46ddb0919d5327471a9442f961cf0636c1cb1)
---
 airflow/www_rbac/views.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 96d4079..f098b25 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -2382,7 +2382,7 @@ class PoolModelView(AirflowModelView):
 
     validators_columns = {
         'pool': [validators.DataRequired()],
-        'slots': [validators.NumberRange(min=0)]
+        'slots': [validators.NumberRange(min=-1)]
     }
 
 


[airflow] 04/04: Fix airflow-webserver startup errors when using Kerberos Auth (#10047)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a758af00958b06f38721143f9404b8a0c5250cf1
Author: Kurganov <ku...@gmail.com>
AuthorDate: Wed Jul 29 17:17:06 2020 +0300

    Fix airflow-webserver startup errors when using Kerberos Auth (#10047)
    
    fix airflow-webserer startup errors if airflow.api.auth.backend.kerberos_auth enabled
    
    (cherry picked from commit 87b495b8cfc28c9887eb5dc52ad50eccb71579b6)
---
 airflow/contrib/auth/backends/kerberos_auth.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/airflow/contrib/auth/backends/kerberos_auth.py b/airflow/contrib/auth/backends/kerberos_auth.py
index e84a0b2..63cea8d 100644
--- a/airflow/contrib/auth/backends/kerberos_auth.py
+++ b/airflow/contrib/auth/backends/kerberos_auth.py
@@ -20,7 +20,9 @@
 import logging
 import flask_login
 from airflow.exceptions import AirflowConfigException
-from flask_login import current_user
+# Need to expose these downstream
+# flake8: noqa: F401
+from flask_login import current_user, login_required, logout_user
 from flask import flash
 from wtforms import Form, PasswordField, StringField
 from wtforms.validators import InputRequired


[airflow] 02/04: Remove chart from exported sources

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 440bbfe016c74e37db7b1bc50e209de7cdf927bb
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Fri Aug 14 16:08:17 2020 +0200

    Remove chart from exported sources
---
 .gitattributes | 1 +
 .gitignore     | 2 --
 2 files changed, 1 insertion(+), 2 deletions(-)

diff --git a/.gitattributes b/.gitattributes
new file mode 100644
index 0000000..d872017
--- /dev/null
+++ b/.gitattributes
@@ -0,0 +1 @@
+/chart export-ignore
diff --git a/.gitignore b/.gitignore
index 593cc53..4f6b451 100644
--- a/.gitignore
+++ b/.gitignore
@@ -149,8 +149,6 @@ tramp
 # Spark
 rat-results.txt
 
-# Git stuff
-.gitattributes
 # Kubernetes generated templated files
 *.generated
 *.tar.gz