You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/14 23:30:54 UTC

[airflow] branch v1-10-test updated (3251f6b -> 6b77671)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from 3251f6b  Get Airflow configs with sensitive data from Secret Backends (#9645)
     new 687b29b  Fix clear future recursive when ExternalTaskMarker is used (#9515)
     new 87986f3  Add ClusterPolicyViolation support to airflow local settings (#10282)
     new 667ef75  Remove duplicate line from 1.10.10 CHANGELOG (#10289)
     new 6b77671  Set language on code-block on docs/howto/email-config.rst (#10238)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGELOG.txt                                      |  1 -
 airflow/exceptions.py                              |  4 ++
 airflow/models/dag.py                              |  2 +-
 airflow/models/dagbag.py                           |  7 +--
 docs/concepts.rst                                  | 36 ++++++++++++++
 docs/howto/email-config.rst                        |  4 +-
 tests/cluster_policies/__init__.py                 | 56 ++++++++++++++++++++++
 .../test_missing_owner.py}                         | 24 +++++-----
 .../dags/test_with_non_default_owner.py            | 24 +++++-----
 tests/models/test_dagbag.py                        | 32 +++++++++++++
 tests/sensors/test_external_task_sensor.py         | 28 ++++++++++-
 11 files changed, 183 insertions(+), 35 deletions(-)
 create mode 100644 tests/cluster_policies/__init__.py
 copy tests/{dags_with_system_exit/b_test_scheduler_dags.py => dags/test_missing_owner.py} (71%)
 copy airflow/example_dags/test_utils.py => tests/dags/test_with_non_default_owner.py (71%)


[airflow] 03/04: Remove duplicate line from 1.10.10 CHANGELOG (#10289)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 667ef759f462bd034d193320f22033aca832f154
Author: Vinnie Guimaraes <vg...@gmail.com>
AuthorDate: Tue Aug 11 13:49:44 2020 +0100

    Remove duplicate line from 1.10.10 CHANGELOG (#10289)
    
    (cherry picked from commit a9f7222a3fdc9bb55c697bdad17f4e60e8d9e70f)
---
 CHANGELOG.txt | 1 -
 1 file changed, 1 deletion(-)

diff --git a/CHANGELOG.txt b/CHANGELOG.txt
index 7b33921..ae1119d 100644
--- a/CHANGELOG.txt
+++ b/CHANGELOG.txt
@@ -317,7 +317,6 @@ Misc/Internal
 - Requirements now depend on python version (#7841)
 - Bring back reset db explicitly called at CI entry (#7798)
 - Fixes unclean installation of Airflow 1.10 (#7796)
-- Change name of the common environment initialization function (#7805)
 - [AIRFLOW-7029] Use separate docker image for running license check (#7678)
 - [AIRFLOW-5842] Swtch to Debian buster image as a base (#7647)
 - [AIRFLOW-5828] Move build logic out from hooks/build (#7618)


[airflow] 04/04: Set language on code-block on docs/howto/email-config.rst (#10238)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6b77671301b93a3b8e2250ded98bfbff230658f0
Author: Kamil BreguĊ‚a <mi...@users.noreply.github.com>
AuthorDate: Sat Aug 8 18:57:27 2020 +0200

    Set language on code-block on docs/howto/email-config.rst (#10238)
    
    * Set language on code-block on docs/howto/email-config.rst
    
    * fixup! Set language on code-block on docs/howto/email-config.rst
    
    (cherry picked from commit aba29bf4ec4c8e1d50ae28a5a4f8999759a496b4)
---
 docs/howto/email-config.rst | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/docs/howto/email-config.rst b/docs/howto/email-config.rst
index fe2fdc2..89e3ed8 100644
--- a/docs/howto/email-config.rst
+++ b/docs/howto/email-config.rst
@@ -22,12 +22,10 @@ You can configure the email that is being sent in your ``airflow.cfg``
 by setting a ``subject_template`` and/or a ``html_content_template``
 in the ``email`` section.
 
-.. code-block::
+.. code-block:: ini
 
   [email]
-
   email_backend = airflow.utils.email.send_email_smtp
-
   subject_template = /path/to/my_subject_template_file
   html_content_template = /path/to/my_html_content_template_file
 


[airflow] 02/04: Add ClusterPolicyViolation support to airflow local settings (#10282)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 87986f3f12109f7144fbc20a1c9dcde07f3c4f58
Author: Jacob Ferriero <jf...@google.com>
AuthorDate: Wed Aug 12 15:06:29 2020 -0700

    Add ClusterPolicyViolation support to airflow local settings (#10282)
    
    This change will allow users to throw other exceptions (namely `AirflowClusterPolicyViolation`) than `DagCycleException` as part of Cluster Policies.
    
    This can be helpful for running checks on tasks / DAGs (e.g. asserting task has a non-airflow owner) and failing to run tasks aren't compliant with these checks.
    
    This is meant as a tool for airflow admins to prevent user mistakes (especially in shared Airflow infrastructure with newbies) than as a strong technical control for security/compliance posture.
    
    (cherry picked from commit 7f76b8b94241c57dc7de5b17657433841289744e)
---
 airflow/exceptions.py                     |  4 +++
 airflow/models/dagbag.py                  |  7 ++--
 docs/concepts.rst                         | 36 ++++++++++++++++++++
 tests/cluster_policies/__init__.py        | 56 +++++++++++++++++++++++++++++++
 tests/dags/test_missing_owner.py          | 32 ++++++++++++++++++
 tests/dags/test_with_non_default_owner.py | 32 ++++++++++++++++++
 tests/models/test_dagbag.py               | 32 ++++++++++++++++++
 7 files changed, 196 insertions(+), 3 deletions(-)

diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index badf156..51e3cb4 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -83,6 +83,10 @@ class AirflowDagCycleException(AirflowException):
     """Raise when there is a cycle in Dag definition"""
 
 
+class AirflowClusterPolicyViolation(AirflowException):
+    """Raise when there is a violation of a Cluster Policy in Dag definition"""
+
+
 class DagNotFound(AirflowNotFoundException):
     """Raise when a DAG is not available in the system"""
 
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 1b8be89..106dff0 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -36,7 +36,7 @@ import six
 from airflow import settings
 from airflow.configuration import conf
 from airflow.dag.base_dag import BaseDagBag
-from airflow.exceptions import AirflowDagCycleException
+from airflow.exceptions import AirflowClusterPolicyViolation, AirflowDagCycleException
 from airflow.executors import get_default_executor
 from airflow.settings import Stats
 from airflow.utils import timezone
@@ -317,9 +317,10 @@ class DagBag(BaseDagBag, LoggingMixin):
                             "Invalid Cron expression: " + str(cron_e)
                         self.file_last_changed[dag.full_filepath] = \
                             file_last_changed_on_disk
-                    except AirflowDagCycleException as cycle_exception:
+                    except (AirflowDagCycleException,
+                            AirflowClusterPolicyViolation) as exception:
                         self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
-                        self.import_errors[dag.full_filepath] = str(cycle_exception)
+                        self.import_errors[dag.full_filepath] = str(exception)
                         self.file_last_changed[dag.full_filepath] = \
                             file_last_changed_on_disk
 
diff --git a/docs/concepts.rst b/docs/concepts.rst
index dd48003..6908ab2 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -1066,7 +1066,11 @@ state.
 
 Cluster Policy
 ==============
+Cluster policies provide an interface for taking action on every Airflow task
+either at DAG load time or just before task execution.
 
+Cluster Policies for Task Mutation
+-----------------------------------
 In case you want to apply cluster-wide mutations to the Airflow tasks,
 you can either mutate the task right after the DAG is loaded or
 mutate the task instance before task execution.
@@ -1117,6 +1121,38 @@ queue during retries:
             ti.queue = 'retry_queue'
 
 
+Cluster Policies for Custom Task Checks
+-------------------------------------------
+You may also use Cluster Policies to apply cluster-wide checks on Airflow
+tasks. You can raise :class:`~airflow.exceptions.AirflowClusterPolicyViolation`
+in a policy or task mutation hook (described below) to prevent a DAG from being
+imported or prevent a task from being executed if the task is not compliant with
+your check.
+
+These checks are intended to help teams using Airflow to protect against common
+beginner errors that may get past a code reviewer, rather than as technical
+security controls.
+
+For example, don't run tasks without airflow owners:
+
+.. literalinclude:: /../tests/cluster_policies/__init__.py
+      :language: python
+      :start-after: [START example_cluster_policy_rule]
+      :end-before: [END example_cluster_policy_rule]
+
+If you have multiple checks to apply, it is best practice to curate these rules
+in a separate python module and have a single policy / task mutation hook that
+performs multiple of these custom checks and aggregates the various error
+messages so that a single ``AirflowClusterPolicyViolation`` can be reported in
+the UI (and import errors table in the database).
+
+For Example in ``airflow_local_settings.py``:
+
+.. literalinclude:: /../tests/cluster_policies/__init__.py
+      :language: python
+      :start-after: [START example_list_of_cluster_policy_rules]
+      :end-before: [END example_list_of_cluster_policy_rules]
+
 Where to put ``airflow_local_settings.py``?
 -------------------------------------------
 
diff --git a/tests/cluster_policies/__init__.py b/tests/cluster_policies/__init__.py
new file mode 100644
index 0000000..565b6f8
--- /dev/null
+++ b/tests/cluster_policies/__init__.py
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Callable, List
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowClusterPolicyViolation
+from airflow.models.baseoperator import BaseOperator
+
+
+# [START example_cluster_policy_rule]
+def task_must_have_owners(task):
+    if not task.owner or task.owner.lower() == conf.get('operators', 'default_owner'):
+        raise AirflowClusterPolicyViolation(
+            'Task must have non-None non-default owner. Current value: {}'.format(task.owner))
+# [END example_cluster_policy_rule]
+
+
+# [START example_list_of_cluster_policy_rules]
+TASK_RULES = [task_must_have_owners]   # type: List[Callable[[BaseOperator], None]]
+
+
+def _check_task_rules(current_task):
+    """Check task rules for given task."""
+    notices = []
+    for rule in TASK_RULES:
+        try:
+            rule(current_task)
+        except AirflowClusterPolicyViolation as ex:
+            notices.append(str(ex))
+    if notices:
+        notices_list = " * " + "\n * ".join(notices)
+        raise AirflowClusterPolicyViolation(
+            "DAG policy violation (DAG ID: {0}, Path: {1}):\n"
+            "Notices:\n"
+            "{2}".format(current_task.dag_id, current_task.dag.filepath, notices_list))
+
+
+def cluster_policy(task):
+    """Ensure Tasks have non-default owners."""
+    _check_task_rules(task)
+# [END example_list_of_cluster_policy_rules]
diff --git a/tests/dags/test_missing_owner.py b/tests/dags/test_missing_owner.py
new file mode 100644
index 0000000..16f715c
--- /dev/null
+++ b/tests/dags/test_missing_owner.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+    dag_id="test_missing_owner",
+    schedule_interval="0 0 * * *",
+    start_date=days_ago(2),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=["example"],
+) as dag:
+    run_this_last = DummyOperator(task_id="test_task",)
diff --git a/tests/dags/test_with_non_default_owner.py b/tests/dags/test_with_non_default_owner.py
new file mode 100644
index 0000000..eebbb64
--- /dev/null
+++ b/tests/dags/test_with_non_default_owner.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+    dag_id="test_with_non_default_owner",
+    schedule_interval="0 0 * * *",
+    start_date=days_ago(2),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=["example"],
+) as dag:
+    run_this_last = DummyOperator(task_id="test_task", owner="John",)
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index b9d18ac..1595cd4 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -38,6 +38,7 @@ from airflow.utils.dates import timezone as tz
 from airflow.utils.db import create_session
 from airflow.utils.state import State
 from airflow.utils.timezone import utc
+from tests import cluster_policies
 from tests.models import TEST_DAGS_FOLDER, DEFAULT_DATE
 from tests.test_utils.asserts import assert_queries_count
 from tests.test_utils.config import conf_vars
@@ -695,3 +696,34 @@ class DagBagTest(unittest.TestCase):
 
         six.assertCountEqual(self, updated_ser_dag_1.tags, ["example", "new_tag"])
         self.assertGreater(updated_ser_dag_1_update_time, ser_dag_1_update_time)
+
+    @patch("airflow.settings.policy", cluster_policies.cluster_policy)
+    def test_cluster_policy_violation(self):
+        """test that file processing results in import error when task does not
+        obey cluster policy.
+        """
+        dag_file = os.path.join(TEST_DAGS_FOLDER, "test_missing_owner.py")
+
+        dagbag = DagBag(dag_folder=dag_file)
+        self.assertEqual(set(), set(dagbag.dag_ids))
+        expected_import_errors = {
+            dag_file: (
+                "DAG policy violation (DAG ID: test_missing_owner, Path: {}):\n"
+                "Notices:\n"
+                " * Task must have non-None non-default owner. Current value: airflow".format(dag_file)
+            )
+        }
+        self.maxDiff = None
+        self.assertEqual(expected_import_errors, dagbag.import_errors)
+
+    @patch("airflow.settings.policy", cluster_policies.cluster_policy)
+    def test_cluster_policy_obeyed(self):
+        """test that dag successfully imported without import errors when tasks
+        obey cluster policy.
+        """
+        dag_file = os.path.join(TEST_DAGS_FOLDER, "test_with_non_default_owner.py")
+
+        dagbag = DagBag(dag_folder=dag_file)
+        self.assertEqual({"test_with_non_default_owner"}, set(dagbag.dag_ids))
+
+        self.assertEqual({}, dagbag.import_errors)


[airflow] 01/04: Fix clear future recursive when ExternalTaskMarker is used (#9515)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 687b29baa28893e6156fa6fd7b89e02844ebfa0c
Author: yuqian90 <yu...@gmail.com>
AuthorDate: Sat Aug 15 06:39:57 2020 +0800

    Fix clear future recursive when ExternalTaskMarker is used (#9515)
    
    (cherry picked from commit 4454224b682e07a641f1a8878197170c167de03c)
---
 airflow/models/dag.py                      |  2 +-
 tests/sensors/test_external_task_sensor.py | 28 ++++++++++++++++++++++++++--
 2 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 94c6d2e..de610e2 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1055,7 +1055,7 @@ class DAG(BaseDag, LoggingMixin):
             instances = tis.all()
             for ti in instances:
                 if ti.operator == ExternalTaskMarker.__name__:
-                    ti.task = self.get_task(ti.task_id)
+                    ti.task = copy.copy(self.get_task(ti.task_id))
 
                     if recursion_depth == 0:
                         # Maximum recursion depth allowed is the recursion_depth of the first
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 0e5e960..e2a58ec 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -433,12 +433,12 @@ def assert_ti_state_equal(task_instance, state):
     assert task_instance.state == state
 
 
-def clear_tasks(dag_bag, dag, task):
+def clear_tasks(dag_bag, dag, task, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE):
     """
     Clear the task and its downstream tasks recursively for the dag in the given dagbag.
     """
     subdag = dag.sub_dag(task_regex="^{}$".format(task.task_id), include_downstream=True)
-    subdag.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, dag_bag=dag_bag)
+    subdag.clear(start_date=start_date, end_date=end_date, dag_bag=dag_bag)
 
 
 # pylint: disable=redefined-outer-name
@@ -456,6 +456,30 @@ def test_external_task_marker_transitive(dag_bag_ext):
     assert_ti_state_equal(ti_b_3, State.NONE)
 
 
+def test_external_task_marker_future(dag_bag_ext):
+    """
+    Test clearing tasks with no end_date. This is the case when users clear tasks with
+    Future, Downstream and Recursive selected.
+    """
+    date_0 = DEFAULT_DATE
+    date_1 = DEFAULT_DATE + timedelta(days=1)
+
+    tis_date_0 = run_tasks(dag_bag_ext, execution_date=date_0)
+    tis_date_1 = run_tasks(dag_bag_ext, execution_date=date_1)
+
+    dag_0 = dag_bag_ext.get_dag("dag_0")
+    task_a_0 = dag_0.get_task("task_a_0")
+    # This should clear all tasks on dag_0 to dag_3 on both date_0 and date_1
+    clear_tasks(dag_bag_ext, dag_0, task_a_0, end_date=None)
+
+    ti_a_0_date_0 = tis_date_0["task_a_0"]
+    ti_b_3_date_0 = tis_date_0["task_b_3"]
+    ti_b_3_date_1 = tis_date_1["task_b_3"]
+    assert_ti_state_equal(ti_a_0_date_0, State.NONE)
+    assert_ti_state_equal(ti_b_3_date_0, State.NONE)
+    assert_ti_state_equal(ti_b_3_date_1, State.NONE)
+
+
 def test_external_task_marker_exception(dag_bag_ext):
     """
     Clearing across multiple DAGs should raise AirflowException if more levels are being cleared