You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/05 17:05:21 UTC

[airflow] branch v2-1-test updated (ac832a2 -> 4678cf54)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a change to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from ac832a2  Updates to FlaskAppBuilder 3.3.2+ (#17208)
     new 673d78a  fix(smart_sensor): Unbound variable errors (#14774)
     new 012321b  Fail tasks in scheduler when executor reports they failed (#15929)
     new 8e8b58a  Fix CLI 'kubernetes cleanup-pods' which fails on invalid label key (#17298)
     new b558205  Fix calculating duration in tree view (#16695)
     new 4678cf54 Validate type of `priority_weight` during parsing (#16765)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/cli/commands/kubernetes_command.py    | 12 ++----------
 airflow/jobs/scheduler_job.py                 |  4 +++-
 airflow/models/baseoperator.py                |  7 ++++++-
 airflow/sensors/smart_sensor.py               |  1 +
 airflow/www/static/js/tree.js                 |  6 +++++-
 tests/cli/commands/test_kubernetes_command.py |  8 ++------
 tests/jobs/test_scheduler_job.py              |  2 +-
 tests/models/test_baseoperator.py             |  5 +++++
 8 files changed, 25 insertions(+), 20 deletions(-)

[airflow] 03/05: Fix CLI 'kubernetes cleanup-pods' which fails on invalid label key (#17298)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8e8b58aa0a96164c7142d795a9c919d50e2a9aa1
Author: Damir Lampa <60...@users.noreply.github.com>
AuthorDate: Thu Jul 29 14:17:51 2021 -0600

    Fix CLI 'kubernetes cleanup-pods' which fails on invalid label key (#17298)
    
    Fix for #16013 - CLI 'kubernetes cleanup-pods' fails on invalid label key
    
    (cherry picked from commit 36bdfe8d0ef7e5fc428434f8313cf390ee9acc8f)
---
 airflow/cli/commands/kubernetes_command.py    | 12 ++----------
 tests/cli/commands/test_kubernetes_command.py |  8 ++------
 2 files changed, 4 insertions(+), 16 deletions(-)

diff --git a/airflow/cli/commands/kubernetes_command.py b/airflow/cli/commands/kubernetes_command.py
index 3c3c8e6..2660dae 100644
--- a/airflow/cli/commands/kubernetes_command.py
+++ b/airflow/cli/commands/kubernetes_command.py
@@ -96,16 +96,8 @@ def cleanup_pods(args):
         'try_number',
         'airflow_version',
     ]
-    list_kwargs = {
-        "namespace": namespace,
-        "limit": 500,
-        "label_selector": client.V1LabelSelector(
-            match_expressions=[
-                client.V1LabelSelectorRequirement(key=label, operator="Exists")
-                for label in airflow_pod_labels
-            ]
-        ),
-    }
+    list_kwargs = {"namespace": namespace, "limit": 500, "label_selector": ','.join(airflow_pod_labels)}
+
     while True:
         pod_list = kube_client.list_namespaced_pod(**list_kwargs)
         for pod in pod_list.items:
diff --git a/tests/cli/commands/test_kubernetes_command.py b/tests/cli/commands/test_kubernetes_command.py
index f2a8605..490c7fa 100644
--- a/tests/cli/commands/test_kubernetes_command.py
+++ b/tests/cli/commands/test_kubernetes_command.py
@@ -55,12 +55,8 @@ class TestGenerateDagYamlCommand(unittest.TestCase):
 
 
 class TestCleanUpPodsCommand(unittest.TestCase):
-    label_selector = kubernetes.client.V1LabelSelector(
-        match_expressions=[
-            kubernetes.client.V1LabelSelectorRequirement(key=label, operator="Exists")
-            for label in ['dag_id', 'task_id', 'execution_date', 'try_number', 'airflow_version']
-        ]
-    )
+
+    label_selector = ','.join(['dag_id', 'task_id', 'execution_date', 'try_number', 'airflow_version'])
 
     @classmethod
     def setUpClass(cls):

[airflow] 05/05: Validate type of `priority_weight` during parsing (#16765)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4678cf54b5cd651e0232a42746f9be80db43a609
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Jul 2 01:52:50 2021 +0100

    Validate type of `priority_weight` during parsing (#16765)
    
    closes https://github.com/apache/airflow/issues/16762
    
    Without this the scheduler crashes as validation does not happen at DAG Parsing time.
    
    (cherry picked from commit 9d170279a60d9d4ed513bae1c35999926f042662)
---
 airflow/models/baseoperator.py    | 7 ++++++-
 tests/models/test_baseoperator.py | 5 +++++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 10e8bfd..1fec8cf 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -586,10 +586,15 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
             if isinstance(max_retry_delay, timedelta):
                 self.max_retry_delay = max_retry_delay
             else:
-                self.log.debug("Max_retry_delay isn't timedelta object, assuming secs")
+                self.log.debug("max_retry_delay isn't a timedelta object, assuming secs")
                 self.max_retry_delay = timedelta(seconds=max_retry_delay)
 
         self.params = params or {}  # Available in templates!
+        if priority_weight is not None and not isinstance(priority_weight, int):
+            raise AirflowException(
+                f"`priority_weight` for task '{self.task_id}' only accepts integers, "
+                f"received '{type(priority_weight)}'."
+            )
         self.priority_weight = priority_weight
         if not WeightRule.is_valid(weight_rule):
             raise AirflowException(
diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py
index fa02b4e..04d3f54 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -109,6 +109,11 @@ class TestBaseOperator(unittest.TestCase):
         with pytest.raises(AirflowException, match='Argument.*test_param.*required'):
             DummyClass(default_args=default_args)
 
+    def test_incorrect_priority_weight(self):
+        error_msg = "`priority_weight` for task 'test_op' only accepts integers, received '<class 'str'>'."
+        with pytest.raises(AirflowException, match=error_msg):
+            DummyOperator(task_id="test_op", priority_weight="2")
+
     @parameterized.expand(
         [
             ("{{ foo }}", {"foo": "bar"}, "bar"),

[airflow] 04/05: Fix calculating duration in tree view (#16695)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b558205e817f168df7e5c37c9f006a6af31d576e
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Mon Jun 28 11:23:19 2021 -0400

    Fix calculating duration in tree view (#16695)
    
    Make sure moment doesn't default the end_date to now and show the wrong duration
    
    (cherry picked from commit f0b3345ddc489627d73d190a1401804e7b0d9c4e)
---
 airflow/www/static/js/tree.js | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/airflow/www/static/js/tree.js b/airflow/www/static/js/tree.js
index 07daf0e..4bf366a 100644
--- a/airflow/www/static/js/tree.js
+++ b/airflow/www/static/js/tree.js
@@ -305,7 +305,11 @@ document.addEventListener('DOMContentLoaded', () => {
       .style('stroke-opacity', (d) => (d.external_trigger ? '0' : '1'))
       .on('mouseover', function (d) {
         // Calculate duration if it doesn't exist
-        const tt = tiTooltip({ ...d, duration: d.duration || moment(d.end_date).diff(d.start_date, 'seconds') });
+        const tt = tiTooltip({
+          ...d,
+          // if end_date is undefined then moment will default to now instead of null
+          duration: d.duration || d.end_date ? moment(d.end_date).diff(d.start_date, 'seconds') : null,
+        });
         taskTip.direction('n');
         taskTip.show(tt, this);
         d3.select(this).transition().duration(duration)

[airflow] 01/05: fix(smart_sensor): Unbound variable errors (#14774)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 673d78a6cc78038121d3f5e99caa6ded488d654a
Author: Shivansh Saini <sh...@gmail.com>
AuthorDate: Thu Jun 24 03:52:27 2021 +0530

    fix(smart_sensor): Unbound variable errors (#14774)
    
    Signed-off-by: Shivansh Saini <sh...@gmail.com>
    
    Closes #14770
    
    (cherry picked from commit 4aec25a80e3803238cf658c416c8e6d3975a30f6)
---
 airflow/sensors/smart_sensor.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/airflow/sensors/smart_sensor.py b/airflow/sensors/smart_sensor.py
index c8c5ba7..8755eb5 100644
--- a/airflow/sensors/smart_sensor.py
+++ b/airflow/sensors/smart_sensor.py
@@ -435,6 +435,7 @@ class SmartSensorOperator(BaseOperator, SkipMixin):
         TI = TaskInstance
 
         count_marked = 0
+        query_result = []
         try:
             query_result = (
                 session.query(TI, SI)

[airflow] 02/05: Fail tasks in scheduler when executor reports they failed (#15929)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 012321b1325c8d810ae60ad7006ab9f22dfaf95e
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu May 20 11:22:01 2021 +0100

    Fail tasks in scheduler when executor reports they failed (#15929)
    
    When a task fails in executor while still queued in scheduler, the executor reports
    this failure but scheduler doesn't change the task state resulting in the task
    being queued until the scheduler is restarted. This commit fixes it by ensuring
    that when a task is reported to have failed in the executor, the task is failed
    in scheduler
    
    (cherry picked from commit deececcabc080844ca89272a2e4ab1183cd51e3f)
---
 airflow/jobs/scheduler_job.py    | 4 +++-
 tests/jobs/test_scheduler_job.py | 2 +-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index b99f4b2..1758ae1 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1252,12 +1252,14 @@ class SchedulerJob(BaseJob):
                     "task says its %s. (Info: %s) Was the task killed externally?"
                 )
                 self.log.error(msg, ti, state, ti.state, info)
+
                 request = TaskCallbackRequest(
                     full_filepath=ti.dag_model.fileloc,
                     simple_task_instance=SimpleTaskInstance(ti),
                     msg=msg % (ti, state, ti.state, info),
                 )
-
+                self.log.info('Setting task instance %s state to %s as reported by executor', ti, state)
+                ti.set_state(state)
                 self.processor_agent.send_callback_to_execute(request)
 
         return len(event_buffer)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 0d1f530..37ae65b 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -907,7 +907,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
-        assert ti1.state == State.QUEUED
+        assert ti1.state == State.FAILED
         mock_task_callback.assert_called_once_with(
             full_filepath='/test_path1/',
             simple_task_instance=mock.ANY,