You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/12/09 14:02:03 UTC

[airflow] branch master updated: Rename airflow.operators.dagrun_operator to airflow.operators.trigger_dagrun (#12933)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bf386f  Rename airflow.operators.dagrun_operator to airflow.operators.trigger_dagrun (#12933)
0bf386f is described below

commit 0bf386fdf2a994c1b92b5c093cba9c3e7e09f134
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Wed Dec 9 14:00:51 2020 +0000

    Rename airflow.operators.dagrun_operator to airflow.operators.trigger_dagrun (#12933)
    
    Part of AIP-21
    
    Co-authored-by: Kishore Vancheeshwaran <24...@users.noreply.github.com>
---
 UPDATING.md                                        |   3 +-
 .../example_dags/example_trigger_controller_dag.py |   2 +-
 airflow/operators/dagrun_operator.py               | 158 ++-------------------
 .../{dagrun_operator.py => trigger_dagrun.py}      |   0
 airflow/serialization/serialized_objects.py        |   2 +-
 .../refactor_provider_packages.py                  |   1 +
 docs/apache-airflow/operators-and-hooks-ref.rst    |   2 +-
 tests/deprecated_classes.py                        |   4 +
 ...t_dagrun_operator.py => test_trigger_dagrun.py} |   2 +-
 9 files changed, 20 insertions(+), 154 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index 1088e41..6171371 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -277,6 +277,7 @@ The following table shows changes in import paths.
 | airflow.hooks.base_hook.BaseHook | airflow.hooks.base.BaseHook |
 | airflow.hooks.dbapi_hook.DbApiHook | airflow.hooks.dbapi.DbApiHook |
 | airflow.operators.dummy_operator.DummyOperator | airflow.operators.dummy.DummyOperator |
+| airflow.operators.dagrun_operator.TriggerDagRunOperator | airflow.operators.trigger_dagrun.TriggerDagRunOperator |
 | airflow.operators.branch_operator.BaseBranchOperator | airflow.operators.branch.BaseBranchOperator |
 | airflow.operators.subdag_operator.SubDagOperator | airflow.operators.subdag.SubDagOperator |
 | airflow.sensors.base_sensor_operator.BaseSensorOperator | airflow.sensors.base.BaseSensorOperator |
@@ -663,7 +664,7 @@ changes the previous response receiving `NULL` or `'0'`. Earlier `'0'` has been
 criteria. `NULL` has been treated depending on value of `allow_null`parameter.  But all the previous
 behaviour is still achievable setting param `success` to `lambda x: x is None or str(x) not in ('0', '')`.
 
-#### `airflow.operators.dagrun_operator.TriggerDagRunOperator`
+#### `airflow.operators.trigger_dagrun.TriggerDagRunOperator`
 
 The TriggerDagRunOperator now takes a `conf` argument to which a dict can be provided as conf for the DagRun.
 As a result, the `python_callable` argument was removed. PR: https://github.com/apache/airflow/pull/6317.
diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py
index 39bc766..0f706c7 100644
--- a/airflow/example_dags/example_trigger_controller_dag.py
+++ b/airflow/example_dags/example_trigger_controller_dag.py
@@ -22,7 +22,7 @@ Example usage of the TriggerDagRunOperator. This example holds 2 DAGs:
 2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG
 """
 from airflow import DAG
-from airflow.operators.dagrun_operator import TriggerDagRunOperator
+from airflow.operators.trigger_dagrun import TriggerDagRunOperator
 from airflow.utils.dates import days_ago
 
 dag = DAG(
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index 63d3361..f2c7bde 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -15,155 +15,15 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+"""This module is deprecated. Please use `airflow.operators.trigger_dagrun`."""
 
-import datetime
-import time
-from typing import Dict, List, Optional, Union
+import warnings
 
-from airflow.api.common.experimental.trigger_dag import trigger_dag
-from airflow.exceptions import AirflowException, DagNotFound, DagRunAlreadyExists
-from airflow.models import BaseOperator, BaseOperatorLink, DagBag, DagModel, DagRun
-from airflow.utils import timezone
-from airflow.utils.decorators import apply_defaults
-from airflow.utils.helpers import build_airflow_url_with_query
-from airflow.utils.state import State
-from airflow.utils.types import DagRunType
+# pylint: disable=unused-import
+from airflow.operators.trigger_dagrun import TriggerDagRunLink, TriggerDagRunOperator  # noqa
 
-
-class TriggerDagRunLink(BaseOperatorLink):
-    """
-    Operator link for TriggerDagRunOperator. It allows users to access
-    DAG triggered by task using TriggerDagRunOperator.
-    """
-
-    name = 'Triggered DAG'
-
-    def get_link(self, operator, dttm):
-        query = {"dag_id": operator.trigger_dag_id, "execution_date": dttm.isoformat()}
-        return build_airflow_url_with_query(query)
-
-
-class TriggerDagRunOperator(BaseOperator):
-    """
-    Triggers a DAG run for a specified ``dag_id``
-
-    :param trigger_dag_id: the dag_id to trigger (templated)
-    :type trigger_dag_id: str
-    :param conf: Configuration for the DAG run
-    :type conf: dict
-    :param execution_date: Execution date for the dag (templated)
-    :type execution_date: str or datetime.datetime
-    :param reset_dag_run: Whether or not clear existing dag run if already exists.
-        This is useful when backfill or rerun an existing dag run.
-        When reset_dag_run=False and dag run exists, DagRunAlreadyExists will be raised.
-        When reset_dag_run=True and dag run exists, existing dag run will be cleared to rerun.
-    :type reset_dag_run: bool
-    :param wait_for_completion: Whether or not wait for dag run completion. (default: False)
-    :type wait_for_completion: bool
-    :param poke_interval: Poke interval to check dag run status when wait_for_completion=True.
-        (default: 60)
-    :type poke_interval: int
-    :param allowed_states: list of allowed states, default is ``['success']``
-    :type allowed_states: list
-    :param failed_states: list of failed or dis-allowed states, default is ``None``
-    :type failed_states: list
-    """
-
-    template_fields = ("trigger_dag_id", "execution_date", "conf")
-    ui_color = "#ffefeb"
-
-    @property
-    def operator_extra_links(self):
-        """Return operator extra links"""
-        return [TriggerDagRunLink()]
-
-    @apply_defaults
-    def __init__(
-        self,
-        *,
-        trigger_dag_id: str,
-        conf: Optional[Dict] = None,
-        execution_date: Optional[Union[str, datetime.datetime]] = None,
-        reset_dag_run: bool = False,
-        wait_for_completion: bool = False,
-        poke_interval: int = 60,
-        allowed_states: Optional[List] = None,
-        failed_states: Optional[List] = None,
-        **kwargs,
-    ) -> None:
-        super().__init__(**kwargs)
-        self.trigger_dag_id = trigger_dag_id
-        self.conf = conf
-        self.reset_dag_run = reset_dag_run
-        self.wait_for_completion = wait_for_completion
-        self.poke_interval = poke_interval
-        self.allowed_states = allowed_states or [State.SUCCESS]
-        self.failed_states = failed_states or [State.FAILED]
-
-        if not isinstance(execution_date, (str, datetime.datetime, type(None))):
-            raise TypeError(
-                "Expected str or datetime.datetime type for execution_date."
-                "Got {}".format(type(execution_date))
-            )
-
-        self.execution_date: Optional[datetime.datetime] = execution_date  # type: ignore
-
-    def execute(self, context: Dict):
-        if isinstance(self.execution_date, datetime.datetime):
-            execution_date = self.execution_date
-        elif isinstance(self.execution_date, str):
-            execution_date = timezone.parse(self.execution_date)
-            self.execution_date = execution_date
-        else:
-            execution_date = timezone.utcnow()
-
-        run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
-        try:
-            # Ignore MyPy type for self.execution_date
-            # because it doesn't pick up the timezone.parse() for strings
-            dag_run = trigger_dag(
-                dag_id=self.trigger_dag_id,
-                run_id=run_id,
-                conf=self.conf,
-                execution_date=self.execution_date,
-                replace_microseconds=False,
-            )
-
-        except DagRunAlreadyExists as e:
-            if self.reset_dag_run:
-                self.log.info("Clearing %s on %s", self.trigger_dag_id, self.execution_date)
-
-                # Get target dag object and call clear()
-
-                dag_model = DagModel.get_current(self.trigger_dag_id)
-                if dag_model is None:
-                    raise DagNotFound(f"Dag id {self.trigger_dag_id} not found in DagModel")
-
-                dag_bag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True)
-
-                dag = dag_bag.get_dag(self.trigger_dag_id)
-
-                dag.clear(start_date=self.execution_date, end_date=self.execution_date)
-
-                dag_run = DagRun.find(dag_id=dag.dag_id, run_id=run_id)[0]
-            else:
-                raise e
-
-        if self.wait_for_completion:
-            # wait for dag to complete
-            while True:
-                self.log.info(
-                    'Waiting for %s on %s to become allowed state %s ...',
-                    self.trigger_dag_id,
-                    dag_run.execution_date,
-                    self.allowed_states,
-                )
-                time.sleep(self.poke_interval)
-
-                dag_run.refresh_from_db()
-                state = dag_run.state
-                if state in self.failed_states:
-                    raise AirflowException(f"{self.trigger_dag_id} failed with failed states {state}")
-                if state in self.allowed_states:
-                    self.log.info("%s finished with allowed state %s", self.trigger_dag_id, state)
-                    return
+warnings.warn(
+    "This module is deprecated. Please use `airflow.operators.trigger_dagrun`.",
+    DeprecationWarning,
+    stacklevel=2,
+)
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/trigger_dagrun.py
similarity index 100%
copy from airflow/operators/dagrun_operator.py
copy to airflow/operators/trigger_dagrun.py
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 4f39114..1cdcb5c 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -65,7 +65,7 @@ if TYPE_CHECKING:
 log = logging.getLogger(__name__)
 
 _OPERATOR_EXTRA_LINKS: Set[str] = {
-    "airflow.operators.dagrun_operator.TriggerDagRunLink",
+    "airflow.operators.trigger_dagrun.TriggerDagRunLink",
     "airflow.sensors.external_task.ExternalTaskSensorLink",
 }
 
diff --git a/dev/provider_packages/refactor_provider_packages.py b/dev/provider_packages/refactor_provider_packages.py
index ea7b3fc..83240fe 100755
--- a/dev/provider_packages/refactor_provider_packages.py
+++ b/dev/provider_packages/refactor_provider_packages.py
@@ -145,6 +145,7 @@ class RefactorBackportPackages:
             ("airflow.operators.branch", "airflow.operators.branch_operator"),
             ("airflow.operators.dummy", "airflow.operators.dummy_operator"),
             ("airflow.operators.python", "airflow.operators.python_operator"),
+            ("airflow.operators.trigger_dagrun", "airflow.operators.dagrun_operator"),
             ("airflow.sensors.base", "airflow.sensors.base_sensor_operator"),
             ("airflow.sensors.date_time", "airflow.sensors.date_time_sensor"),
             ("airflow.sensors.external_task", "airflow.sensors.external_task_sensor"),
diff --git a/docs/apache-airflow/operators-and-hooks-ref.rst b/docs/apache-airflow/operators-and-hooks-ref.rst
index 1d6d7f4..9ba3527 100644
--- a/docs/apache-airflow/operators-and-hooks-ref.rst
+++ b/docs/apache-airflow/operators-and-hooks-ref.rst
@@ -56,7 +56,7 @@ Airflow has many more integrations available for separate installation as a prov
    * - :mod:`airflow.operators.branch_operator`
      -
 
-   * - :mod:`airflow.operators.dagrun_operator`
+   * - :mod:`airflow.operators.trigger_dagrun`
      -
 
    * - :mod:`airflow.operators.dummy`
diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py
index 132986a..1dd76ab 100644
--- a/tests/deprecated_classes.py
+++ b/tests/deprecated_classes.py
@@ -1333,6 +1333,10 @@ OPERATORS = [
         "airflow.operators.latest_only_operator.LatestOnlyOperator",
     ),
     (
+        "airflow.operators.trigger_dagrun.TriggerDagRunOperator",
+        "airflow.operators.dagrun_operator.TriggerDagRunOperator",
+    ),
+    (
         "airflow.operators.subdag.SubDagOperator",
         "airflow.operators.subdag_operator.SubDagOperator",
     ),
diff --git a/tests/operators/test_dagrun_operator.py b/tests/operators/test_trigger_dagrun.py
similarity index 99%
rename from tests/operators/test_dagrun_operator.py
rename to tests/operators/test_trigger_dagrun.py
index ed6f3ed..42bc2a2 100644
--- a/tests/operators/test_dagrun_operator.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -24,7 +24,7 @@ from unittest import TestCase
 from airflow.exceptions import AirflowException, DagRunAlreadyExists
 from airflow.models import DAG, DagBag, DagModel, DagRun, Log, TaskInstance
 from airflow.models.serialized_dag import SerializedDagModel
-from airflow.operators.dagrun_operator import TriggerDagRunOperator
+from airflow.operators.trigger_dagrun import TriggerDagRunOperator
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import State