You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/14 23:30:56 UTC

[airflow] 02/04: Add ClusterPolicyViolation support to airflow local settings (#10282)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 87986f3f12109f7144fbc20a1c9dcde07f3c4f58
Author: Jacob Ferriero <jf...@google.com>
AuthorDate: Wed Aug 12 15:06:29 2020 -0700

    Add ClusterPolicyViolation support to airflow local settings (#10282)
    
    This change will allow users to throw other exceptions (namely `AirflowClusterPolicyViolation`) than `DagCycleException` as part of Cluster Policies.
    
    This can be helpful for running checks on tasks / DAGs (e.g. asserting task has a non-airflow owner) and failing to run tasks aren't compliant with these checks.
    
    This is meant as a tool for airflow admins to prevent user mistakes (especially in shared Airflow infrastructure with newbies) than as a strong technical control for security/compliance posture.
    
    (cherry picked from commit 7f76b8b94241c57dc7de5b17657433841289744e)
---
 airflow/exceptions.py                     |  4 +++
 airflow/models/dagbag.py                  |  7 ++--
 docs/concepts.rst                         | 36 ++++++++++++++++++++
 tests/cluster_policies/__init__.py        | 56 +++++++++++++++++++++++++++++++
 tests/dags/test_missing_owner.py          | 32 ++++++++++++++++++
 tests/dags/test_with_non_default_owner.py | 32 ++++++++++++++++++
 tests/models/test_dagbag.py               | 32 ++++++++++++++++++
 7 files changed, 196 insertions(+), 3 deletions(-)

diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index badf156..51e3cb4 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -83,6 +83,10 @@ class AirflowDagCycleException(AirflowException):
     """Raise when there is a cycle in Dag definition"""
 
 
+class AirflowClusterPolicyViolation(AirflowException):
+    """Raise when there is a violation of a Cluster Policy in Dag definition"""
+
+
 class DagNotFound(AirflowNotFoundException):
     """Raise when a DAG is not available in the system"""
 
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 1b8be89..106dff0 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -36,7 +36,7 @@ import six
 from airflow import settings
 from airflow.configuration import conf
 from airflow.dag.base_dag import BaseDagBag
-from airflow.exceptions import AirflowDagCycleException
+from airflow.exceptions import AirflowClusterPolicyViolation, AirflowDagCycleException
 from airflow.executors import get_default_executor
 from airflow.settings import Stats
 from airflow.utils import timezone
@@ -317,9 +317,10 @@ class DagBag(BaseDagBag, LoggingMixin):
                             "Invalid Cron expression: " + str(cron_e)
                         self.file_last_changed[dag.full_filepath] = \
                             file_last_changed_on_disk
-                    except AirflowDagCycleException as cycle_exception:
+                    except (AirflowDagCycleException,
+                            AirflowClusterPolicyViolation) as exception:
                         self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
-                        self.import_errors[dag.full_filepath] = str(cycle_exception)
+                        self.import_errors[dag.full_filepath] = str(exception)
                         self.file_last_changed[dag.full_filepath] = \
                             file_last_changed_on_disk
 
diff --git a/docs/concepts.rst b/docs/concepts.rst
index dd48003..6908ab2 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -1066,7 +1066,11 @@ state.
 
 Cluster Policy
 ==============
+Cluster policies provide an interface for taking action on every Airflow task
+either at DAG load time or just before task execution.
 
+Cluster Policies for Task Mutation
+-----------------------------------
 In case you want to apply cluster-wide mutations to the Airflow tasks,
 you can either mutate the task right after the DAG is loaded or
 mutate the task instance before task execution.
@@ -1117,6 +1121,38 @@ queue during retries:
             ti.queue = 'retry_queue'
 
 
+Cluster Policies for Custom Task Checks
+-------------------------------------------
+You may also use Cluster Policies to apply cluster-wide checks on Airflow
+tasks. You can raise :class:`~airflow.exceptions.AirflowClusterPolicyViolation`
+in a policy or task mutation hook (described below) to prevent a DAG from being
+imported or prevent a task from being executed if the task is not compliant with
+your check.
+
+These checks are intended to help teams using Airflow to protect against common
+beginner errors that may get past a code reviewer, rather than as technical
+security controls.
+
+For example, don't run tasks without airflow owners:
+
+.. literalinclude:: /../tests/cluster_policies/__init__.py
+      :language: python
+      :start-after: [START example_cluster_policy_rule]
+      :end-before: [END example_cluster_policy_rule]
+
+If you have multiple checks to apply, it is best practice to curate these rules
+in a separate python module and have a single policy / task mutation hook that
+performs multiple of these custom checks and aggregates the various error
+messages so that a single ``AirflowClusterPolicyViolation`` can be reported in
+the UI (and import errors table in the database).
+
+For Example in ``airflow_local_settings.py``:
+
+.. literalinclude:: /../tests/cluster_policies/__init__.py
+      :language: python
+      :start-after: [START example_list_of_cluster_policy_rules]
+      :end-before: [END example_list_of_cluster_policy_rules]
+
 Where to put ``airflow_local_settings.py``?
 -------------------------------------------
 
diff --git a/tests/cluster_policies/__init__.py b/tests/cluster_policies/__init__.py
new file mode 100644
index 0000000..565b6f8
--- /dev/null
+++ b/tests/cluster_policies/__init__.py
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Callable, List
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowClusterPolicyViolation
+from airflow.models.baseoperator import BaseOperator
+
+
+# [START example_cluster_policy_rule]
+def task_must_have_owners(task):
+    if not task.owner or task.owner.lower() == conf.get('operators', 'default_owner'):
+        raise AirflowClusterPolicyViolation(
+            'Task must have non-None non-default owner. Current value: {}'.format(task.owner))
+# [END example_cluster_policy_rule]
+
+
+# [START example_list_of_cluster_policy_rules]
+TASK_RULES = [task_must_have_owners]   # type: List[Callable[[BaseOperator], None]]
+
+
+def _check_task_rules(current_task):
+    """Check task rules for given task."""
+    notices = []
+    for rule in TASK_RULES:
+        try:
+            rule(current_task)
+        except AirflowClusterPolicyViolation as ex:
+            notices.append(str(ex))
+    if notices:
+        notices_list = " * " + "\n * ".join(notices)
+        raise AirflowClusterPolicyViolation(
+            "DAG policy violation (DAG ID: {0}, Path: {1}):\n"
+            "Notices:\n"
+            "{2}".format(current_task.dag_id, current_task.dag.filepath, notices_list))
+
+
+def cluster_policy(task):
+    """Ensure Tasks have non-default owners."""
+    _check_task_rules(task)
+# [END example_list_of_cluster_policy_rules]
diff --git a/tests/dags/test_missing_owner.py b/tests/dags/test_missing_owner.py
new file mode 100644
index 0000000..16f715c
--- /dev/null
+++ b/tests/dags/test_missing_owner.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+    dag_id="test_missing_owner",
+    schedule_interval="0 0 * * *",
+    start_date=days_ago(2),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=["example"],
+) as dag:
+    run_this_last = DummyOperator(task_id="test_task",)
diff --git a/tests/dags/test_with_non_default_owner.py b/tests/dags/test_with_non_default_owner.py
new file mode 100644
index 0000000..eebbb64
--- /dev/null
+++ b/tests/dags/test_with_non_default_owner.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+    dag_id="test_with_non_default_owner",
+    schedule_interval="0 0 * * *",
+    start_date=days_ago(2),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=["example"],
+) as dag:
+    run_this_last = DummyOperator(task_id="test_task", owner="John",)
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index b9d18ac..1595cd4 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -38,6 +38,7 @@ from airflow.utils.dates import timezone as tz
 from airflow.utils.db import create_session
 from airflow.utils.state import State
 from airflow.utils.timezone import utc
+from tests import cluster_policies
 from tests.models import TEST_DAGS_FOLDER, DEFAULT_DATE
 from tests.test_utils.asserts import assert_queries_count
 from tests.test_utils.config import conf_vars
@@ -695,3 +696,34 @@ class DagBagTest(unittest.TestCase):
 
         six.assertCountEqual(self, updated_ser_dag_1.tags, ["example", "new_tag"])
         self.assertGreater(updated_ser_dag_1_update_time, ser_dag_1_update_time)
+
+    @patch("airflow.settings.policy", cluster_policies.cluster_policy)
+    def test_cluster_policy_violation(self):
+        """test that file processing results in import error when task does not
+        obey cluster policy.
+        """
+        dag_file = os.path.join(TEST_DAGS_FOLDER, "test_missing_owner.py")
+
+        dagbag = DagBag(dag_folder=dag_file)
+        self.assertEqual(set(), set(dagbag.dag_ids))
+        expected_import_errors = {
+            dag_file: (
+                "DAG policy violation (DAG ID: test_missing_owner, Path: {}):\n"
+                "Notices:\n"
+                " * Task must have non-None non-default owner. Current value: airflow".format(dag_file)
+            )
+        }
+        self.maxDiff = None
+        self.assertEqual(expected_import_errors, dagbag.import_errors)
+
+    @patch("airflow.settings.policy", cluster_policies.cluster_policy)
+    def test_cluster_policy_obeyed(self):
+        """test that dag successfully imported without import errors when tasks
+        obey cluster policy.
+        """
+        dag_file = os.path.join(TEST_DAGS_FOLDER, "test_with_non_default_owner.py")
+
+        dagbag = DagBag(dag_folder=dag_file)
+        self.assertEqual({"test_with_non_default_owner"}, set(dagbag.dag_ids))
+
+        self.assertEqual({}, dagbag.import_errors)