You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ashb (via GitHub)" <gi...@apache.org> on 2023/02/11 17:57:55 UTC

[GitHub] [airflow] ashb commented on a diff in pull request #28558: Make the policy functions pluggable

ashb commented on code in PR #28558:
URL: https://github.com/apache/airflow/pull/28558#discussion_r1103671445


##########
airflow/policies.py:
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import pluggy
+
+local_settings_hookspec = pluggy.HookspecMarker("airflow.policy")
+hookimpl = pluggy.HookimplMarker("airflow.policy")
+
+__all__: list[str] = ["hookimpl"]
+
+if TYPE_CHECKING:
+    from airflow.models.baseoperator import BaseOperator
+    from airflow.models.dag import DAG
+    from airflow.models.taskinstance import TaskInstance
+
+
+@local_settings_hookspec
+def task_policy(task: BaseOperator) -> None:
+    """
+    This policy setting allows altering tasks after they are loaded in the DagBag.
+
+    It allows administrator to rewire some task's parameters.  Alternatively you can raise
+    ``AirflowClusterPolicyViolation`` exception to stop DAG from being executed.
+
+    Here are a few examples of how this can be useful:
+
+    * You could enforce a specific queue (say the ``spark`` queue) for tasks using the ``SparkOperator`` to
+      make sure that these tasks get wired to the right workers
+    * You could enforce a task timeout policy, making sure that no tasks run for more than 48 hours
+
+    :param task: task to be mutated
+    """
+
+
+@local_settings_hookspec
+def dag_policy(dag: DAG) -> None:
+    """
+    This policy setting allows altering DAGs after they are loaded in the DagBag.
+
+    It allows administrator to rewire some DAG's parameters.
+    Alternatively you can raise ``AirflowClusterPolicyViolation`` exception
+    to stop DAG from being executed.
+
+    Here are a few examples of how this can be useful:
+
+    * You could enforce default user for DAGs
+    * Check if every DAG has configured tags
+
+    :param dag: dag to be mutated
+    """
+
+
+@local_settings_hookspec
+def task_instance_mutation_hook(task_instance: TaskInstance) -> None:
+    """
+    This setting allows altering task instances before being queued by the Airflow scheduler.
+
+    This could be used, for instance, to modify the task instance during retries.
+
+    :param task_instance: task instance to be mutated
+    """
+
+
+@local_settings_hookspec
+def pod_mutation_hook(pod) -> None:
+    """
+    Mutate pod before scheduling.
+
+
+    This setting allows altering ``kubernetes.client.models.V1Pod`` object before they are passed to the
+    Kubernetes client for scheduling.
+
+    This could be used, for instance, to add sidecar or init containers to every worker pod launched by
+    KubernetesExecutor or KubernetesPodOperator.
+    """
+
+
+@local_settings_hookspec(firstresult=True)
+def get_airflow_context_vars(context) -> dict[str, str]:
+    """
+    This setting allows getting the airflow context vars, which are key value pairs.  They are then injected
+    to default airflow context vars, which in the end are available as environment variables when running
+    tasks dag_id, task_id, execution_date, dag_run_id, try_number are reserved keys.
+
+    :param context: The context for the task_instance of interest.
+    """
+    ...
+
+
+@local_settings_hookspec(firstresult=True)
+def get_dagbag_import_timeout(dag_file_path: str) -> int | float:
+    """
+    This setting allows for dynamic control of the DAG file parsing timeout based on the DAG file path.
+
+    It is useful when there are a few DAG files requiring longer parsing times, while others do not.
+    You can control them separately instead of having one value for all DAG files.
+
+    If the return value is less than or equal to 0, it means no timeout during the DAG parsing.
+    """
+    ...
+
+
+class DefaultPolicy:
+    """:meta private:"""
+
+    # Default implementations of the policy functions
+
+    @staticmethod
+    @hookimpl
+    def get_dagbag_import_timeout(dag_file_path: str):
+        from airflow.configuration import conf
+
+        return conf.getfloat("core", "DAGBAG_IMPORT_TIMEOUT")
+
+    @staticmethod
+    @hookimpl
+    def get_airflow_context_vars(context):
+        return {}
+
+
+def make_plugin_from_local_settings(pm: pluggy.PluginManager, module, names: list[str]):
+    """
+    Turn the functions from airflow_local_settings module into a custom/local plugin, so that
+    plugin-registered functions can co-operate with pluggy/setuptool entrypoint plugins of the same methods.
+
+    Airflow local settings will "win" as they are the last plugin registered.

Review Comment:
   ```suggestion
       Airflow local settings will be "win" (i.e. they have the final say) as they are the last plugin registered.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org