You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/08/22 08:58:47 UTC

[airflow] branch master updated: Add Type Annotations & Docstrings to airflow/models/dagrun.py (#10466)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 90b9e7e  Add Type Annotations & Docstrings to airflow/models/dagrun.py (#10466)
90b9e7e is described below

commit 90b9e7e3c7a60a2c66fa309cf2da485bdafa884c
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sat Aug 22 09:58:14 2020 +0100

    Add Type Annotations & Docstrings to airflow/models/dagrun.py (#10466)
---
 airflow/models/dagrun.py | 32 ++++++++++++++++++++++----------
 1 file changed, 22 insertions(+), 10 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 773d164..736da96 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -111,11 +111,12 @@ class DagRun(Base, LoggingMixin):
         return synonym('_state', descriptor=property(self.get_state, self.set_state))
 
     @provide_session
-    def refresh_from_db(self, session=None):
+    def refresh_from_db(self, session: Session = None):
         """
         Reloads the current dagrun from the database
 
         :param session: database session
+        :type session: Session
         """
         DR = DagRun
 
@@ -203,6 +204,7 @@ class DagRun(Base, LoggingMixin):
 
     @staticmethod
     def generate_run_id(run_type: DagRunType, execution_date: datetime) -> str:
+        """Generate Run ID based on Run Type and Execution Date"""
         return f"{run_type.value}__{execution_date.isoformat()}"
 
     @provide_session
@@ -237,11 +239,14 @@ class DagRun(Base, LoggingMixin):
         return tis.all()
 
     @provide_session
-    def get_task_instance(self, task_id, session=None):
+    def get_task_instance(self, task_id: str, session: Session = None):
         """
         Returns the task instance specified by task_id for this dag run
 
         :param task_id: the task id
+        :type task_id: str
+        :param session: Sqlalchemy ORM Session
+        :type session: Session
         """
         ti = session.query(TI).filter(
             TI.dag_id == self.dag_id,
@@ -258,8 +263,7 @@ class DagRun(Base, LoggingMixin):
         :return: DAG
         """
         if not self.dag:
-            raise AirflowException("The DAG (.dag) for {} needs to be set"
-                                   .format(self))
+            raise AirflowException("The DAG (.dag) for {} needs to be set".format(self))
 
         return self.dag
 
@@ -280,7 +284,7 @@ class DagRun(Base, LoggingMixin):
         ).first()
 
     @provide_session
-    def get_previous_scheduled_dagrun(self, session=None):
+    def get_previous_scheduled_dagrun(self, session: Session = None):
         """The previous, SCHEDULED DagRun, if there is one"""
         dag = self.get_dag()
 
@@ -290,11 +294,13 @@ class DagRun(Base, LoggingMixin):
         ).first()
 
     @provide_session
-    def update_state(self, session=None) -> List[TI]:
+    def update_state(self, session: Session = None) -> List[TI]:
         """
         Determines the overall state of the DagRun based on the state
         of its TaskInstances.
 
+        :param session: Sqlalchemy ORM Session
+        :type session: Session
         :return: ready_tis: the tis that can be scheduled in the current loop
         :rtype ready_tis: list[airflow.models.TaskInstance]
         """
@@ -336,8 +342,7 @@ class DagRun(Base, LoggingMixin):
         ):
             self.log.error('Marking run %s failed', self)
             self.set_state(State.FAILED)
-            dag.handle_callback(self, success=False, reason='task_failure',
-                                session=session)
+            dag.handle_callback(self, success=False, reason='task_failure', session=session)
 
         # if all leafs succeeded and no unfinished tasks, the run succeeded
         elif not unfinished_tasks and all(
@@ -430,10 +435,13 @@ class DagRun(Base, LoggingMixin):
             Stats.timing('dagrun.duration.failed.{}'.format(self.dag_id), duration)
 
     @provide_session
-    def verify_integrity(self, session=None):
+    def verify_integrity(self, session: Session = None):
         """
         Verifies the DagRun by checking for removed tasks or tasks that are not in the
         database yet. It will set state to removed or add the task if required.
+
+        :param session: Sqlalchemy ORM Session
+        :type session: Session
         """
         dag = self.get_dag()
         tis = self.get_task_instances(session=session)
@@ -487,8 +495,12 @@ class DagRun(Base, LoggingMixin):
             session.rollback()
 
     @staticmethod
-    def get_run(session, dag_id, execution_date):
+    def get_run(session: Session, dag_id: str, execution_date: datetime):
         """
+        Get a single DAG Run
+
+        :param session: Sqlalchemy ORM Session
+        :type session: Session
         :param dag_id: DAG ID
         :type dag_id: unicode
         :param execution_date: execution date