You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/31 11:46:08 UTC

[GitHub] Fokko closed pull request #4407: [AIRFLOW-3600] Remove dagbag from trigger

Fokko closed pull request #4407: [AIRFLOW-3600] Remove dagbag from trigger
URL: https://github.com/apache/incubator-airflow/pull/4407
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index 5b24bde81c..50ac1e54fa 100755
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -2992,6 +2992,44 @@ def get_default_view(self):
         else:
             return self.default_view
 
+    def get_dag(self):
+        return DagBag(dag_folder=self.fileloc).get_dag(self.dag_id)
+
+    @provide_session
+    def create_dagrun(self,
+                      run_id,
+                      state,
+                      execution_date,
+                      start_date=None,
+                      external_trigger=False,
+                      conf=None,
+                      session=None):
+        """
+        Creates a dag run from this dag including the tasks associated with this dag.
+        Returns the dag run.
+
+        :param run_id: defines the the run id for this dag run
+        :type run_id: str
+        :param execution_date: the execution date of this dag run
+        :type execution_date: datetime
+        :param state: the state of the dag run
+        :type state: State
+        :param start_date: the date this dag run should be evaluated
+        :type start_date: datetime
+        :param external_trigger: whether this dag run is externally triggered
+        :type external_trigger: bool
+        :param session: database session
+        :type session: Session
+        """
+
+        return self.get_dag().create_dagrun(run_id=run_id,
+                                            state=state,
+                                            execution_date=execution_date,
+                                            start_date=start_date,
+                                            external_trigger=external_trigger,
+                                            conf=conf,
+                                            session=session)
+
 
 @functools.total_ordering
 class DAG(BaseDag, LoggingMixin):
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 5bde8d3ee8..7746fb6de5 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1069,11 +1069,11 @@ def delete(self):
     @login_required
     @wwwutils.action_logging
     @wwwutils.notify_owner
-    def trigger(self):
+    @provide_session
+    def trigger(self, session=None):
         dag_id = request.args.get('dag_id')
         origin = request.args.get('origin') or "/admin/"
-        dag = dagbag.get_dag(dag_id)
-
+        dag = session.query(models.DagModel).filter(models.DagModel.dag_id == dag_id).first()
         if not dag:
             flash("Cannot find dag {}".format(dag_id))
             return redirect(origin)
@@ -1592,7 +1592,7 @@ class GraphForm(DateTimeWithNumRunsWithDagRunsForm):
             task_instances=json.dumps(task_instances, indent=2),
             tasks=json.dumps(tasks, indent=2),
             nodes=json.dumps(nodes, indent=2),
-            edges=json.dumps(edges, indent=2), )
+            edges=json.dumps(edges, indent=2))
 
     @expose('/duration')
     @login_required
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 5c4476a667..17efcdfe7c 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -798,11 +798,11 @@ def delete(self):
     @has_dag_access(can_dag_edit=True)
     @has_access
     @action_logging
-    def trigger(self):
+    @provide_session
+    def trigger(self, session=None):
         dag_id = request.args.get('dag_id')
         origin = request.args.get('origin') or "/"
-        dag = dagbag.get_dag(dag_id)
-
+        dag = session.query(models.DagModel).filter(models.DagModel.dag_id == dag_id).first()
         if not dag:
             flash("Cannot find dag {}".format(dag_id))
             return redirect(origin)
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index a0d86ac54a..aeaca96333 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -30,6 +30,7 @@
 
 from urllib.parse import quote_plus
 from werkzeug.test import Client
+from sqlalchemy import func
 
 from airflow import models, configuration
 from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
@@ -821,5 +822,35 @@ def test_delete_dag_button_for_dag_on_scheduler_only(self):
         session.commit()
 
 
+class TestTriggerDag(unittest.TestCase):
+
+    def setUp(self):
+        conf.load_test_config()
+        app = application.create_app(testing=True)
+        app.config['WTF_CSRF_METHODS'] = []
+        self.app = app.test_client()
+        self.session = Session()
+        models.DagBag().get_dag("example_bash_operator").sync_to_db()
+
+    def test_trigger_dag_button_normal_exist(self):
+        resp = self.app.get('/', follow_redirects=True)
+        self.assertIn('/trigger?dag_id=example_bash_operator', resp.data.decode('utf-8'))
+        self.assertIn("return confirmDeleteDag('example_bash_operator')", resp.data.decode('utf-8'))
+
+    def test_trigger_dag_button(self):
+
+        test_dag_id = "example_bash_operator"
+
+        DR = models.DagRun
+        self.session.query(DR).delete()
+        self.session.commit()
+
+        self.app.get('/admin/airflow/trigger?dag_id={}'.format(test_dag_id))
+
+        run = self.session.query(DR).filter(DR.dag_id == test_dag_id).first()
+        self.assertIsNotNone(run)
+        self.assertIn("manual__", run.run_id)
+
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py
index 500fcf3d99..0f0de56cea 100644
--- a/tests/www_rbac/test_views.py
+++ b/tests/www_rbac/test_views.py
@@ -1428,5 +1428,32 @@ def test_start_date_filter(self):
         pass
 
 
+class TestTriggerDag(TestBase):
+
+    def setUp(self):
+        super(TestTriggerDag, self).setUp()
+        self.session = Session()
+        models.DagBag().get_dag("example_bash_operator").sync_to_db(session=self.session)
+
+    def test_trigger_dag_button_normal_exist(self):
+        resp = self.client.get('/', follow_redirects=True)
+        self.assertIn('/trigger?dag_id=example_bash_operator', resp.data.decode('utf-8'))
+        self.assertIn("return confirmDeleteDag('example_bash_operator')", resp.data.decode('utf-8'))
+
+    def test_trigger_dag_button(self):
+
+        test_dag_id = "example_bash_operator"
+
+        DR = models.DagRun
+        self.session.query(DR).delete()
+        self.session.commit()
+
+        resp = self.client.get('trigger?dag_id={}'.format(test_dag_id))
+
+        run = self.session.query(DR).filter(DR.dag_id == test_dag_id).first()
+        self.assertIsNotNone(run)
+        self.assertIn("manual__", run.run_id)
+
+
 if __name__ == '__main__':
     unittest.main()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services