You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/31 06:31:13 UTC

[GitHub] feng-tao closed pull request #4385: [AIRFLOW-3581] Fix next_ds/prev_ds semantics for manual runs

feng-tao closed pull request #4385: [AIRFLOW-3581] Fix next_ds/prev_ds semantics for manual runs
URL: https://github.com/apache/incubator-airflow/pull/4385
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/UPDATING.md b/UPDATING.md
index 851c299db1..7409a35b5f 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -49,6 +49,10 @@ controlled by the new `dag_processor_manager_log_location` config option in core
 The new `sync_parallelism` config option will control how many processes CeleryExecutor will use to
 fetch celery task state in parallel. Default value is max(1, number of cores - 1)
 
+### Semantics of next_ds/prev_ds changed for manually triggered runs
+
+next_ds/prev_ds now map to execution_date instead of the next/previous schedule-aligned execution date for DAGs triggered in the UI.
+
 ### Rename of BashTaskRunner to StandardTaskRunner
 
 BashTaskRunner has been renamed to StandardTaskRunner. It is the default task runner
diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index cbcf586ac2..3e612a9535 100755
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -1579,13 +1579,38 @@ def get_template_context(self, session=None):
         if 'tables' in task.params:
             tables = task.params['tables']
 
+        params = {}
+        run_id = ''
+        dag_run = None
+        if hasattr(task, 'dag'):
+            if task.dag.params:
+                params.update(task.dag.params)
+            dag_run = (
+                session.query(DagRun)
+                .filter_by(
+                    dag_id=task.dag.dag_id,
+                    execution_date=self.execution_date)
+                .first()
+            )
+            run_id = dag_run.run_id if dag_run else None
+            session.expunge_all()
+            session.commit()
+
         ds = self.execution_date.strftime('%Y-%m-%d')
         ts = self.execution_date.isoformat()
         yesterday_ds = (self.execution_date - timedelta(1)).strftime('%Y-%m-%d')
         tomorrow_ds = (self.execution_date + timedelta(1)).strftime('%Y-%m-%d')
 
-        prev_execution_date = task.dag.previous_schedule(self.execution_date)
-        next_execution_date = task.dag.following_schedule(self.execution_date)
+        # For manually triggered dagruns that aren't run on a schedule, next/previous
+        # schedule dates don't make sense, and should be set to execution date for
+        # consistency with how execution_date is set for manually triggered tasks, i.e.
+        # triggered_date == execution_date.
+        if dag_run and dag_run.external_trigger:
+            prev_execution_date = self.execution_date
+            next_execution_date = self.execution_date
+        else:
+            prev_execution_date = task.dag.previous_schedule(self.execution_date)
+            next_execution_date = task.dag.following_schedule(self.execution_date)
 
         next_ds = None
         next_ds_nodash = None
@@ -1608,23 +1633,6 @@ def get_template_context(self, session=None):
         ti_key_str = "{task.dag_id}__{task.task_id}__{ds_nodash}"
         ti_key_str = ti_key_str.format(**locals())
 
-        params = {}
-        run_id = ''
-        dag_run = None
-        if hasattr(task, 'dag'):
-            if task.dag.params:
-                params.update(task.dag.params)
-            dag_run = (
-                session.query(DagRun)
-                .filter_by(
-                    dag_id=task.dag.dag_id,
-                    execution_date=self.execution_date)
-                .first()
-            )
-            run_id = dag_run.run_id if dag_run else None
-            session.expunge_all()
-            session.commit()
-
         if task.params:
             params.update(task.params)
 
diff --git a/tests/core.py b/tests/core.py
index efae7f4b1e..fe131a9c56 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1061,6 +1061,38 @@ def payload_generator(context, object):
         dag_run = dag_runs[0]
         self.assertEquals(dag_run.execution_date, utc_now)
 
+    def test_externally_triggered_dagrun(self):
+        TI = models.TaskInstance
+
+        # Create the dagrun between two "scheduled" execution dates of the DAG
+        EXECUTION_DATE = DEFAULT_DATE + timedelta(days=2)
+        EXECUTION_DS = EXECUTION_DATE.strftime('%Y-%m-%d')
+        EXECUTION_DS_NODASH = EXECUTION_DS.replace('-', '')
+
+        dag = DAG(
+            TEST_DAG_ID,
+            default_args=self.args,
+            schedule_interval=timedelta(weeks=1),
+            start_date=DEFAULT_DATE)
+        task = DummyOperator(task_id='test_externally_triggered_dag_context',
+                             dag=dag)
+        dag.create_dagrun(run_id=models.DagRun.id_for_date(EXECUTION_DATE),
+                          execution_date=EXECUTION_DATE,
+                          state=State.RUNNING,
+                          external_trigger=True)
+        task.run(
+            start_date=EXECUTION_DATE, end_date=EXECUTION_DATE)
+
+        ti = TI(task=task, execution_date=EXECUTION_DATE)
+        context = ti.get_template_context()
+
+        # next_ds/prev_ds should be the execution date for manually triggered runs
+        self.assertEquals(context['next_ds'], EXECUTION_DS)
+        self.assertEquals(context['next_ds_nodash'], EXECUTION_DS_NODASH)
+
+        self.assertEquals(context['prev_ds'], EXECUTION_DS)
+        self.assertEquals(context['prev_ds_nodash'], EXECUTION_DS_NODASH)
+
 
 class CliTests(unittest.TestCase):
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services