You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/14 23:30:56 UTC
[airflow] 02/04: Add ClusterPolicyViolation support to airflow
local settings (#10282)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 87986f3f12109f7144fbc20a1c9dcde07f3c4f58
Author: Jacob Ferriero <jf...@google.com>
AuthorDate: Wed Aug 12 15:06:29 2020 -0700
Add ClusterPolicyViolation support to airflow local settings (#10282)
This change will allow users to throw other exceptions (namely `AirflowClusterPolicyViolation`) than `DagCycleException` as part of Cluster Policies.
This can be helpful for running checks on tasks / DAGs (e.g. asserting task has a non-airflow owner) and failing to run tasks aren't compliant with these checks.
This is meant as a tool for airflow admins to prevent user mistakes (especially in shared Airflow infrastructure with newbies) than as a strong technical control for security/compliance posture.
(cherry picked from commit 7f76b8b94241c57dc7de5b17657433841289744e)
---
airflow/exceptions.py | 4 +++
airflow/models/dagbag.py | 7 ++--
docs/concepts.rst | 36 ++++++++++++++++++++
tests/cluster_policies/__init__.py | 56 +++++++++++++++++++++++++++++++
tests/dags/test_missing_owner.py | 32 ++++++++++++++++++
tests/dags/test_with_non_default_owner.py | 32 ++++++++++++++++++
tests/models/test_dagbag.py | 32 ++++++++++++++++++
7 files changed, 196 insertions(+), 3 deletions(-)
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index badf156..51e3cb4 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -83,6 +83,10 @@ class AirflowDagCycleException(AirflowException):
"""Raise when there is a cycle in Dag definition"""
+class AirflowClusterPolicyViolation(AirflowException):
+ """Raise when there is a violation of a Cluster Policy in Dag definition"""
+
+
class DagNotFound(AirflowNotFoundException):
"""Raise when a DAG is not available in the system"""
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 1b8be89..106dff0 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -36,7 +36,7 @@ import six
from airflow import settings
from airflow.configuration import conf
from airflow.dag.base_dag import BaseDagBag
-from airflow.exceptions import AirflowDagCycleException
+from airflow.exceptions import AirflowClusterPolicyViolation, AirflowDagCycleException
from airflow.executors import get_default_executor
from airflow.settings import Stats
from airflow.utils import timezone
@@ -317,9 +317,10 @@ class DagBag(BaseDagBag, LoggingMixin):
"Invalid Cron expression: " + str(cron_e)
self.file_last_changed[dag.full_filepath] = \
file_last_changed_on_disk
- except AirflowDagCycleException as cycle_exception:
+ except (AirflowDagCycleException,
+ AirflowClusterPolicyViolation) as exception:
self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
- self.import_errors[dag.full_filepath] = str(cycle_exception)
+ self.import_errors[dag.full_filepath] = str(exception)
self.file_last_changed[dag.full_filepath] = \
file_last_changed_on_disk
diff --git a/docs/concepts.rst b/docs/concepts.rst
index dd48003..6908ab2 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -1066,7 +1066,11 @@ state.
Cluster Policy
==============
+Cluster policies provide an interface for taking action on every Airflow task
+either at DAG load time or just before task execution.
+Cluster Policies for Task Mutation
+-----------------------------------
In case you want to apply cluster-wide mutations to the Airflow tasks,
you can either mutate the task right after the DAG is loaded or
mutate the task instance before task execution.
@@ -1117,6 +1121,38 @@ queue during retries:
ti.queue = 'retry_queue'
+Cluster Policies for Custom Task Checks
+-------------------------------------------
+You may also use Cluster Policies to apply cluster-wide checks on Airflow
+tasks. You can raise :class:`~airflow.exceptions.AirflowClusterPolicyViolation`
+in a policy or task mutation hook (described below) to prevent a DAG from being
+imported or prevent a task from being executed if the task is not compliant with
+your check.
+
+These checks are intended to help teams using Airflow to protect against common
+beginner errors that may get past a code reviewer, rather than as technical
+security controls.
+
+For example, don't run tasks without airflow owners:
+
+.. literalinclude:: /../tests/cluster_policies/__init__.py
+ :language: python
+ :start-after: [START example_cluster_policy_rule]
+ :end-before: [END example_cluster_policy_rule]
+
+If you have multiple checks to apply, it is best practice to curate these rules
+in a separate python module and have a single policy / task mutation hook that
+performs multiple of these custom checks and aggregates the various error
+messages so that a single ``AirflowClusterPolicyViolation`` can be reported in
+the UI (and import errors table in the database).
+
+For Example in ``airflow_local_settings.py``:
+
+.. literalinclude:: /../tests/cluster_policies/__init__.py
+ :language: python
+ :start-after: [START example_list_of_cluster_policy_rules]
+ :end-before: [END example_list_of_cluster_policy_rules]
+
Where to put ``airflow_local_settings.py``?
-------------------------------------------
diff --git a/tests/cluster_policies/__init__.py b/tests/cluster_policies/__init__.py
new file mode 100644
index 0000000..565b6f8
--- /dev/null
+++ b/tests/cluster_policies/__init__.py
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Callable, List
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowClusterPolicyViolation
+from airflow.models.baseoperator import BaseOperator
+
+
+# [START example_cluster_policy_rule]
+def task_must_have_owners(task):
+ if not task.owner or task.owner.lower() == conf.get('operators', 'default_owner'):
+ raise AirflowClusterPolicyViolation(
+ 'Task must have non-None non-default owner. Current value: {}'.format(task.owner))
+# [END example_cluster_policy_rule]
+
+
+# [START example_list_of_cluster_policy_rules]
+TASK_RULES = [task_must_have_owners] # type: List[Callable[[BaseOperator], None]]
+
+
+def _check_task_rules(current_task):
+ """Check task rules for given task."""
+ notices = []
+ for rule in TASK_RULES:
+ try:
+ rule(current_task)
+ except AirflowClusterPolicyViolation as ex:
+ notices.append(str(ex))
+ if notices:
+ notices_list = " * " + "\n * ".join(notices)
+ raise AirflowClusterPolicyViolation(
+ "DAG policy violation (DAG ID: {0}, Path: {1}):\n"
+ "Notices:\n"
+ "{2}".format(current_task.dag_id, current_task.dag.filepath, notices_list))
+
+
+def cluster_policy(task):
+ """Ensure Tasks have non-default owners."""
+ _check_task_rules(task)
+# [END example_list_of_cluster_policy_rules]
diff --git a/tests/dags/test_missing_owner.py b/tests/dags/test_missing_owner.py
new file mode 100644
index 0000000..16f715c
--- /dev/null
+++ b/tests/dags/test_missing_owner.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+ dag_id="test_missing_owner",
+ schedule_interval="0 0 * * *",
+ start_date=days_ago(2),
+ dagrun_timeout=timedelta(minutes=60),
+ tags=["example"],
+) as dag:
+ run_this_last = DummyOperator(task_id="test_task",)
diff --git a/tests/dags/test_with_non_default_owner.py b/tests/dags/test_with_non_default_owner.py
new file mode 100644
index 0000000..eebbb64
--- /dev/null
+++ b/tests/dags/test_with_non_default_owner.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.dates import days_ago
+
+with DAG(
+ dag_id="test_with_non_default_owner",
+ schedule_interval="0 0 * * *",
+ start_date=days_ago(2),
+ dagrun_timeout=timedelta(minutes=60),
+ tags=["example"],
+) as dag:
+ run_this_last = DummyOperator(task_id="test_task", owner="John",)
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index b9d18ac..1595cd4 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -38,6 +38,7 @@ from airflow.utils.dates import timezone as tz
from airflow.utils.db import create_session
from airflow.utils.state import State
from airflow.utils.timezone import utc
+from tests import cluster_policies
from tests.models import TEST_DAGS_FOLDER, DEFAULT_DATE
from tests.test_utils.asserts import assert_queries_count
from tests.test_utils.config import conf_vars
@@ -695,3 +696,34 @@ class DagBagTest(unittest.TestCase):
six.assertCountEqual(self, updated_ser_dag_1.tags, ["example", "new_tag"])
self.assertGreater(updated_ser_dag_1_update_time, ser_dag_1_update_time)
+
+ @patch("airflow.settings.policy", cluster_policies.cluster_policy)
+ def test_cluster_policy_violation(self):
+ """test that file processing results in import error when task does not
+ obey cluster policy.
+ """
+ dag_file = os.path.join(TEST_DAGS_FOLDER, "test_missing_owner.py")
+
+ dagbag = DagBag(dag_folder=dag_file)
+ self.assertEqual(set(), set(dagbag.dag_ids))
+ expected_import_errors = {
+ dag_file: (
+ "DAG policy violation (DAG ID: test_missing_owner, Path: {}):\n"
+ "Notices:\n"
+ " * Task must have non-None non-default owner. Current value: airflow".format(dag_file)
+ )
+ }
+ self.maxDiff = None
+ self.assertEqual(expected_import_errors, dagbag.import_errors)
+
+ @patch("airflow.settings.policy", cluster_policies.cluster_policy)
+ def test_cluster_policy_obeyed(self):
+ """test that dag successfully imported without import errors when tasks
+ obey cluster policy.
+ """
+ dag_file = os.path.join(TEST_DAGS_FOLDER, "test_with_non_default_owner.py")
+
+ dagbag = DagBag(dag_folder=dag_file)
+ self.assertEqual({"test_with_non_default_owner"}, set(dagbag.dag_ids))
+
+ self.assertEqual({}, dagbag.import_errors)