You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/30 12:31:53 UTC

[GitHub] bolkedebruin closed pull request #3761: Subdag inherit runid *do not merge*

bolkedebruin closed pull request #3761: Subdag inherit runid *do not merge*
URL: https://github.com/apache/incubator-airflow/pull/3761
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/jobs.py b/airflow/jobs.py
index e7fff3114f..4c82e722cb 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1922,6 +1922,7 @@ def __init__(
             ignore_task_deps=False,
             pool=None,
             delay_on_limit_secs=1.0,
+            run_id_template=None,
             *args, **kwargs):
         self.dag = dag
         self.dag_id = dag.dag_id
@@ -1934,6 +1935,9 @@ def __init__(
         self.ignore_task_deps = ignore_task_deps
         self.pool = pool
         self.delay_on_limit_secs = delay_on_limit_secs
+        self.run_id_template = BackfillJob.ID_FORMAT_PREFIX
+        if run_id_template:
+            self.run_id_template = run_id_template
         super(BackfillJob, self).__init__(*args, **kwargs)
 
     def _update_counters(self, ti_status):
@@ -2023,7 +2027,7 @@ def _get_dag_run(self, run_date, session=None):
         :type session: Session
         :return: a DagRun in state RUNNING or None
         """
-        run_id = BackfillJob.ID_FORMAT_PREFIX.format(run_date.isoformat())
+        run_id = self.run_id_template.format(run_date.isoformat())
 
         # consider max_active_runs but ignore when running subdags
         respect_dag_max_active_limit = (True
diff --git a/airflow/models.py b/airflow/models.py
old mode 100755
new mode 100644
index 3e296eb58b..90546f5940
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3675,7 +3675,9 @@ def run(
             ignore_task_deps=False,
             ignore_first_depends_on_past=False,
             pool=None,
-            delay_on_limit_secs=1.0):
+            delay_on_limit_secs=1.0,
+            run_id_template=None
+    ):
         """
         Runs the DAG.
 
@@ -3703,6 +3705,8 @@ def run(
         :param delay_on_limit_secs: Time in seconds to wait before next attempt to run
             dag run when max_active_runs limit has been reached
         :type delay_on_limit_secs: float
+        :param run_id_template: Template for the run_id to be with the execution date
+        :type run_id_template: string
         """
         from airflow.jobs import BackfillJob
         if not executor and local:
@@ -3720,7 +3724,9 @@ def run(
             ignore_task_deps=ignore_task_deps,
             ignore_first_depends_on_past=ignore_first_depends_on_past,
             pool=pool,
-            delay_on_limit_secs=delay_on_limit_secs)
+            delay_on_limit_secs=delay_on_limit_secs,
+            run_id_template=run_id_template
+        )
         job.run()
 
     def cli(self):
diff --git a/airflow/operators/subdag_operator.py b/airflow/operators/subdag_operator.py
index 9445c4c96d..369c645ed7 100644
--- a/airflow/operators/subdag_operator.py
+++ b/airflow/operators/subdag_operator.py
@@ -87,6 +87,11 @@ def __init__(
 
     def execute(self, context):
         ed = context['execution_date']
+        # Use the parent's run id as a template for the subdag dag run's run_id
+        run_id = context['run_id']
+        run_id_template = run_id + '.{0}'
         self.subdag.run(
             start_date=ed, end_date=ed, donot_pickle=True,
-            executor=self.executor)
+            executor=self.executor,
+            run_id_template=run_id_template
+        )


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services