You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/12/09 14:02:03 UTC
[airflow] branch master updated: Rename
airflow.operators.dagrun_operator to airflow.operators.trigger_dagrun
(#12933)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 0bf386f Rename airflow.operators.dagrun_operator to airflow.operators.trigger_dagrun (#12933)
0bf386f is described below
commit 0bf386fdf2a994c1b92b5c093cba9c3e7e09f134
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Wed Dec 9 14:00:51 2020 +0000
Rename airflow.operators.dagrun_operator to airflow.operators.trigger_dagrun (#12933)
Part of AIP-21
Co-authored-by: Kishore Vancheeshwaran <24...@users.noreply.github.com>
---
UPDATING.md | 3 +-
.../example_dags/example_trigger_controller_dag.py | 2 +-
airflow/operators/dagrun_operator.py | 158 ++-------------------
.../{dagrun_operator.py => trigger_dagrun.py} | 0
airflow/serialization/serialized_objects.py | 2 +-
.../refactor_provider_packages.py | 1 +
docs/apache-airflow/operators-and-hooks-ref.rst | 2 +-
tests/deprecated_classes.py | 4 +
...t_dagrun_operator.py => test_trigger_dagrun.py} | 2 +-
9 files changed, 20 insertions(+), 154 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index 1088e41..6171371 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -277,6 +277,7 @@ The following table shows changes in import paths.
| airflow.hooks.base_hook.BaseHook | airflow.hooks.base.BaseHook |
| airflow.hooks.dbapi_hook.DbApiHook | airflow.hooks.dbapi.DbApiHook |
| airflow.operators.dummy_operator.DummyOperator | airflow.operators.dummy.DummyOperator |
+| airflow.operators.dagrun_operator.TriggerDagRunOperator | airflow.operators.trigger_dagrun.TriggerDagRunOperator |
| airflow.operators.branch_operator.BaseBranchOperator | airflow.operators.branch.BaseBranchOperator |
| airflow.operators.subdag_operator.SubDagOperator | airflow.operators.subdag.SubDagOperator |
| airflow.sensors.base_sensor_operator.BaseSensorOperator | airflow.sensors.base.BaseSensorOperator |
@@ -663,7 +664,7 @@ changes the previous response receiving `NULL` or `'0'`. Earlier `'0'` has been
criteria. `NULL` has been treated depending on value of `allow_null`parameter. But all the previous
behaviour is still achievable setting param `success` to `lambda x: x is None or str(x) not in ('0', '')`.
-#### `airflow.operators.dagrun_operator.TriggerDagRunOperator`
+#### `airflow.operators.trigger_dagrun.TriggerDagRunOperator`
The TriggerDagRunOperator now takes a `conf` argument to which a dict can be provided as conf for the DagRun.
As a result, the `python_callable` argument was removed. PR: https://github.com/apache/airflow/pull/6317.
diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py
index 39bc766..0f706c7 100644
--- a/airflow/example_dags/example_trigger_controller_dag.py
+++ b/airflow/example_dags/example_trigger_controller_dag.py
@@ -22,7 +22,7 @@ Example usage of the TriggerDagRunOperator. This example holds 2 DAGs:
2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG
"""
from airflow import DAG
-from airflow.operators.dagrun_operator import TriggerDagRunOperator
+from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
dag = DAG(
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index 63d3361..f2c7bde 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -15,155 +15,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+"""This module is deprecated. Please use `airflow.operators.trigger_dagrun`."""
-import datetime
-import time
-from typing import Dict, List, Optional, Union
+import warnings
-from airflow.api.common.experimental.trigger_dag import trigger_dag
-from airflow.exceptions import AirflowException, DagNotFound, DagRunAlreadyExists
-from airflow.models import BaseOperator, BaseOperatorLink, DagBag, DagModel, DagRun
-from airflow.utils import timezone
-from airflow.utils.decorators import apply_defaults
-from airflow.utils.helpers import build_airflow_url_with_query
-from airflow.utils.state import State
-from airflow.utils.types import DagRunType
+# pylint: disable=unused-import
+from airflow.operators.trigger_dagrun import TriggerDagRunLink, TriggerDagRunOperator # noqa
-
-class TriggerDagRunLink(BaseOperatorLink):
- """
- Operator link for TriggerDagRunOperator. It allows users to access
- DAG triggered by task using TriggerDagRunOperator.
- """
-
- name = 'Triggered DAG'
-
- def get_link(self, operator, dttm):
- query = {"dag_id": operator.trigger_dag_id, "execution_date": dttm.isoformat()}
- return build_airflow_url_with_query(query)
-
-
-class TriggerDagRunOperator(BaseOperator):
- """
- Triggers a DAG run for a specified ``dag_id``
-
- :param trigger_dag_id: the dag_id to trigger (templated)
- :type trigger_dag_id: str
- :param conf: Configuration for the DAG run
- :type conf: dict
- :param execution_date: Execution date for the dag (templated)
- :type execution_date: str or datetime.datetime
- :param reset_dag_run: Whether or not clear existing dag run if already exists.
- This is useful when backfill or rerun an existing dag run.
- When reset_dag_run=False and dag run exists, DagRunAlreadyExists will be raised.
- When reset_dag_run=True and dag run exists, existing dag run will be cleared to rerun.
- :type reset_dag_run: bool
- :param wait_for_completion: Whether or not wait for dag run completion. (default: False)
- :type wait_for_completion: bool
- :param poke_interval: Poke interval to check dag run status when wait_for_completion=True.
- (default: 60)
- :type poke_interval: int
- :param allowed_states: list of allowed states, default is ``['success']``
- :type allowed_states: list
- :param failed_states: list of failed or dis-allowed states, default is ``None``
- :type failed_states: list
- """
-
- template_fields = ("trigger_dag_id", "execution_date", "conf")
- ui_color = "#ffefeb"
-
- @property
- def operator_extra_links(self):
- """Return operator extra links"""
- return [TriggerDagRunLink()]
-
- @apply_defaults
- def __init__(
- self,
- *,
- trigger_dag_id: str,
- conf: Optional[Dict] = None,
- execution_date: Optional[Union[str, datetime.datetime]] = None,
- reset_dag_run: bool = False,
- wait_for_completion: bool = False,
- poke_interval: int = 60,
- allowed_states: Optional[List] = None,
- failed_states: Optional[List] = None,
- **kwargs,
- ) -> None:
- super().__init__(**kwargs)
- self.trigger_dag_id = trigger_dag_id
- self.conf = conf
- self.reset_dag_run = reset_dag_run
- self.wait_for_completion = wait_for_completion
- self.poke_interval = poke_interval
- self.allowed_states = allowed_states or [State.SUCCESS]
- self.failed_states = failed_states or [State.FAILED]
-
- if not isinstance(execution_date, (str, datetime.datetime, type(None))):
- raise TypeError(
- "Expected str or datetime.datetime type for execution_date."
- "Got {}".format(type(execution_date))
- )
-
- self.execution_date: Optional[datetime.datetime] = execution_date # type: ignore
-
- def execute(self, context: Dict):
- if isinstance(self.execution_date, datetime.datetime):
- execution_date = self.execution_date
- elif isinstance(self.execution_date, str):
- execution_date = timezone.parse(self.execution_date)
- self.execution_date = execution_date
- else:
- execution_date = timezone.utcnow()
-
- run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date)
- try:
- # Ignore MyPy type for self.execution_date
- # because it doesn't pick up the timezone.parse() for strings
- dag_run = trigger_dag(
- dag_id=self.trigger_dag_id,
- run_id=run_id,
- conf=self.conf,
- execution_date=self.execution_date,
- replace_microseconds=False,
- )
-
- except DagRunAlreadyExists as e:
- if self.reset_dag_run:
- self.log.info("Clearing %s on %s", self.trigger_dag_id, self.execution_date)
-
- # Get target dag object and call clear()
-
- dag_model = DagModel.get_current(self.trigger_dag_id)
- if dag_model is None:
- raise DagNotFound(f"Dag id {self.trigger_dag_id} not found in DagModel")
-
- dag_bag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True)
-
- dag = dag_bag.get_dag(self.trigger_dag_id)
-
- dag.clear(start_date=self.execution_date, end_date=self.execution_date)
-
- dag_run = DagRun.find(dag_id=dag.dag_id, run_id=run_id)[0]
- else:
- raise e
-
- if self.wait_for_completion:
- # wait for dag to complete
- while True:
- self.log.info(
- 'Waiting for %s on %s to become allowed state %s ...',
- self.trigger_dag_id,
- dag_run.execution_date,
- self.allowed_states,
- )
- time.sleep(self.poke_interval)
-
- dag_run.refresh_from_db()
- state = dag_run.state
- if state in self.failed_states:
- raise AirflowException(f"{self.trigger_dag_id} failed with failed states {state}")
- if state in self.allowed_states:
- self.log.info("%s finished with allowed state %s", self.trigger_dag_id, state)
- return
+warnings.warn(
+ "This module is deprecated. Please use `airflow.operators.trigger_dagrun`.",
+ DeprecationWarning,
+ stacklevel=2,
+)
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/trigger_dagrun.py
similarity index 100%
copy from airflow/operators/dagrun_operator.py
copy to airflow/operators/trigger_dagrun.py
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 4f39114..1cdcb5c 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -65,7 +65,7 @@ if TYPE_CHECKING:
log = logging.getLogger(__name__)
_OPERATOR_EXTRA_LINKS: Set[str] = {
- "airflow.operators.dagrun_operator.TriggerDagRunLink",
+ "airflow.operators.trigger_dagrun.TriggerDagRunLink",
"airflow.sensors.external_task.ExternalTaskSensorLink",
}
diff --git a/dev/provider_packages/refactor_provider_packages.py b/dev/provider_packages/refactor_provider_packages.py
index ea7b3fc..83240fe 100755
--- a/dev/provider_packages/refactor_provider_packages.py
+++ b/dev/provider_packages/refactor_provider_packages.py
@@ -145,6 +145,7 @@ class RefactorBackportPackages:
("airflow.operators.branch", "airflow.operators.branch_operator"),
("airflow.operators.dummy", "airflow.operators.dummy_operator"),
("airflow.operators.python", "airflow.operators.python_operator"),
+ ("airflow.operators.trigger_dagrun", "airflow.operators.dagrun_operator"),
("airflow.sensors.base", "airflow.sensors.base_sensor_operator"),
("airflow.sensors.date_time", "airflow.sensors.date_time_sensor"),
("airflow.sensors.external_task", "airflow.sensors.external_task_sensor"),
diff --git a/docs/apache-airflow/operators-and-hooks-ref.rst b/docs/apache-airflow/operators-and-hooks-ref.rst
index 1d6d7f4..9ba3527 100644
--- a/docs/apache-airflow/operators-and-hooks-ref.rst
+++ b/docs/apache-airflow/operators-and-hooks-ref.rst
@@ -56,7 +56,7 @@ Airflow has many more integrations available for separate installation as a prov
* - :mod:`airflow.operators.branch_operator`
-
- * - :mod:`airflow.operators.dagrun_operator`
+ * - :mod:`airflow.operators.trigger_dagrun`
-
* - :mod:`airflow.operators.dummy`
diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py
index 132986a..1dd76ab 100644
--- a/tests/deprecated_classes.py
+++ b/tests/deprecated_classes.py
@@ -1333,6 +1333,10 @@ OPERATORS = [
"airflow.operators.latest_only_operator.LatestOnlyOperator",
),
(
+ "airflow.operators.trigger_dagrun.TriggerDagRunOperator",
+ "airflow.operators.dagrun_operator.TriggerDagRunOperator",
+ ),
+ (
"airflow.operators.subdag.SubDagOperator",
"airflow.operators.subdag_operator.SubDagOperator",
),
diff --git a/tests/operators/test_dagrun_operator.py b/tests/operators/test_trigger_dagrun.py
similarity index 99%
rename from tests/operators/test_dagrun_operator.py
rename to tests/operators/test_trigger_dagrun.py
index ed6f3ed..42bc2a2 100644
--- a/tests/operators/test_dagrun_operator.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -24,7 +24,7 @@ from unittest import TestCase
from airflow.exceptions import AirflowException, DagRunAlreadyExists
from airflow.models import DAG, DagBag, DagModel, DagRun, Log, TaskInstance
from airflow.models.serialized_dag import SerializedDagModel
-from airflow.operators.dagrun_operator import TriggerDagRunOperator
+from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State