You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/12 22:07:24 UTC

[airflow] branch master updated: Add ClusterPolicyViolation support to airflow local settings (#10282)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f76b8b  Add ClusterPolicyViolation support to airflow local settings (#10282)
7f76b8b is described below

commit 7f76b8b94241c57dc7de5b17657433841289744e
Author: Jacob Ferriero <jf...@google.com>
AuthorDate: Wed Aug 12 15:06:29 2020 -0700

    Add ClusterPolicyViolation support to airflow local settings (#10282)
    
    This change will allow users to throw other exceptions (namely `AirflowClusterPolicyViolation`) than `DagCycleException` as part of Cluster Policies.
    
    This can be helpful for running checks on tasks / DAGs (e.g. asserting task has a non-airflow owner) and failing to run tasks aren't compliant with these checks.
    
    This is meant as a tool for airflow admins to prevent user mistakes (especially in shared Airflow infrastructure with newbies) than as a strong technical control for security/compliance posture.
---
 airflow/exceptions.py                     |  4 +++
 airflow/models/dagbag.py                  |  7 ++--
 docs/concepts.rst                         | 36 +++++++++++++++++++
 tests/cluster_policies/__init__.py        | 59 +++++++++++++++++++++++++++++++
 tests/dags/test_missing_owner.py          | 32 +++++++++++++++++
 tests/dags/test_with_non_default_owner.py | 32 +++++++++++++++++
 tests/models/test_dagbag.py               | 33 ++++++++++++++++-
 7 files changed, 199 insertions(+), 4 deletions(-)

diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 3038bb6..22d20c5 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -87,6 +87,10 @@ class AirflowDagCycleException(AirflowException):
     """Raise when there is a cycle in Dag definition"""
 
 
+class AirflowClusterPolicyViolation(AirflowException):
+    """Raise when there is a violation of a Cluster Policy in Dag definition"""
+
+
 class DagNotFound(AirflowNotFoundException):
     """Raise when a DAG is not available in the system"""
 
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 89afbda..5f78215 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -34,7 +34,7 @@ from tabulate import tabulate
 from airflow import settings
 from airflow.configuration import conf
 from airflow.dag.base_dag import BaseDagBag
-from airflow.exceptions import AirflowDagCycleException
+from airflow.exceptions import AirflowClusterPolicyViolation, AirflowDagCycleException
 from airflow.plugins_manager import integrate_dag_plugins
 from airflow.stats import Stats
 from airflow.utils import timezone
@@ -343,9 +343,10 @@ class DagBag(BaseDagBag, LoggingMixin):
                 self.import_errors[dag.full_filepath] = f"Invalid Cron expression: {cron_e}"
                 self.file_last_changed[dag.full_filepath] = \
                     file_last_changed_on_disk
-            except AirflowDagCycleException as cycle_exception:
+            except (AirflowDagCycleException,
+                    AirflowClusterPolicyViolation) as exception:
                 self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
-                self.import_errors[dag.full_filepath] = str(cycle_exception)
+                self.import_errors[dag.full_filepath] = str(exception)
                 self.file_last_changed[dag.full_filepath] = file_last_changed_on_disk
         return found_dags
 
diff --git a/docs/concepts.rst b/docs/concepts.rst
index 740953b..faa3c2d 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -1104,7 +1104,11 @@ state.
 
 Cluster Policy
 ==============
+Cluster policies provide an interface for taking action on every Airflow task
+either at DAG load time or just before task execution.
 
+Cluster Policies for Task Mutation
+-----------------------------------
 In case you want to apply cluster-wide mutations to the Airflow tasks,
 you can either mutate the task right after the DAG is loaded or
 mutate the task instance before task execution.
@@ -1155,6 +1159,38 @@ queue during retries:
             ti.queue = 'retry_queue'
 
 
+Cluster Policies for Custom Task Checks
+-------------------------------------------
+You may also use Cluster Policies to apply cluster-wide checks on Airflow
+tasks. You can raise :class:`~airflow.exceptions.AirflowClusterPolicyViolation`
+in a policy or task mutation hook (described below) to prevent a DAG from being
+imported or prevent a task from being executed if the task is not compliant with
+your check.
+
+These checks are intended to help teams using Airflow to protect against common
+beginner errors that may get past a code reviewer, rather than as technical
+security controls.
+
+For example, don't run tasks without airflow owners:
+
+.. literalinclude:: /../tests/cluster_policies/__init__.py
+      :language: python
+      :start-after: [START example_cluster_policy_rule]
+      :end-before: [END example_cluster_policy_rule]
+
+If you have multiple checks to apply, it is best practice to curate these rules
+in a separate python module and have a single policy / task mutation hook that
+performs multiple of these custom checks and aggregates the various error
+messages so that a single ``AirflowClusterPolicyViolation`` can be reported in
+the UI (and import errors table in the database).
+
+For Example in ``airflow_local_settings.py``:
+
+.. literalinclude:: /../tests/cluster_policies/__init__.py
+      :language: python
+      :start-after: [START example_list_of_cluster_policy_rules]
+      :end-before: [END example_list_of_cluster_policy_rules]
+
 Where to put ``airflow_local_settings.py``?
 -------------------------------------------
 
diff --git a/tests/cluster_policies/__init__.py b/tests/cluster_policies/__init__.py
new file mode 100644
index 0000000..4d9c4b3
--- /dev/null
+++ b/tests/cluster_policies/__init__.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Callable, List
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowClusterPolicyViolation
+from airflow.models.baseoperator import BaseOperator
+
+
+# [START example_cluster_policy_rule]
+def task_must_have_owners(task: BaseOperator):
+    if not task.owner or task.owner.lower() == conf.get('operators',
+                                                        'default_owner'):
+        raise AirflowClusterPolicyViolation(
+            f'''Task must have non-None non-default owner. Current value: {task.owner}''')
+# [END example_cluster_policy_rule]
+
+
+# [START example_list_of_cluster_policy_rules]
+TASK_RULES: List[Callable[[BaseOperator], None]] = [
+    task_must_have_owners,
+]
+
+
+def _check_task_rules(current_task: BaseOperator):
+    """Check task rules for given task."""
+    notices = []
+    for rule in TASK_RULES:
+        try:
+            rule(current_task)
+        except AirflowClusterPolicyViolation as ex:
+            notices.append(str(ex))
+    if notices:
+        notices_list = " * " + "\n * ".join(notices)
+        raise AirflowClusterPolicyViolation(
+            f"DAG policy violation (DAG ID: {current_task.dag_id}, Path: {current_task.dag.filepath}):\n"
+            f"Notices:\n"
+            f"{notices_list}")
+
+
+def cluster_policy(task: BaseOperator):
+    """Ensure Tasks have non-default owners."""
+    _check_task_rules(task)
+# [END example_list_of_cluster_policy_rules]
diff --git a/tests/dags/test_missing_owner.py b/tests/dags/test_missing_owner.py
new file mode 100644
index 0000000..16f715c
--- /dev/null
+++ b/tests/dags/test_missing_owner.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+    dag_id="test_missing_owner",
+    schedule_interval="0 0 * * *",
+    start_date=days_ago(2),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=["example"],
+) as dag:
+    run_this_last = DummyOperator(task_id="test_task",)
diff --git a/tests/dags/test_with_non_default_owner.py b/tests/dags/test_with_non_default_owner.py
new file mode 100644
index 0000000..eebbb64
--- /dev/null
+++ b/tests/dags/test_with_non_default_owner.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+    dag_id="test_with_non_default_owner",
+    schedule_interval="0 0 * * *",
+    start_date=days_ago(2),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=["example"],
+) as dag:
+    run_this_last = DummyOperator(task_id="test_task", owner="John",)
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index 5e1bb89..f895638 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -33,6 +32,7 @@ from airflow.models import DagBag, DagModel
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.utils.dates import timezone as tz
 from airflow.utils.session import create_session
+from tests import cluster_policies
 from tests.models import TEST_DAGS_FOLDER
 from tests.test_utils import db
 from tests.test_utils.asserts import assert_queries_count
@@ -700,3 +700,34 @@ class TestDagBag(unittest.TestCase):
 
         self.assertCountEqual(updated_ser_dag_1.tags, ["example", "new_tag"])
         self.assertGreater(updated_ser_dag_1_update_time, ser_dag_1_update_time)
+
+    @patch("airflow.settings.policy", cluster_policies.cluster_policy)
+    def test_cluster_policy_violation(self):
+        """test that file processing results in import error when task does not
+        obey cluster policy.
+        """
+        dag_file = os.path.join(TEST_DAGS_FOLDER, "test_missing_owner.py")
+
+        dagbag = DagBag(dag_folder=dag_file)
+        self.assertEqual(set(), set(dagbag.dag_ids))
+        expected_import_errors = {
+            dag_file: (
+                f"""DAG policy violation (DAG ID: test_missing_owner, Path: {dag_file}):\n"""
+                """Notices:\n"""
+                """ * Task must have non-None non-default owner. Current value: airflow"""
+            )
+        }
+        self.assertEqual(expected_import_errors, dagbag.import_errors)
+
+    @patch("airflow.settings.policy", cluster_policies.cluster_policy)
+    def test_cluster_policy_obeyed(self):
+        """test that dag successfully imported without import errors when tasks
+        obey cluster policy.
+        """
+        dag_file = os.path.join(TEST_DAGS_FOLDER,
+                                "test_with_non_default_owner.py")
+
+        dagbag = DagBag(dag_folder=dag_file)
+        self.assertEqual({"test_with_non_default_owner"}, set(dagbag.dag_ids))
+
+        self.assertEqual({}, dagbag.import_errors)