You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ms...@apache.org on 2021/06/11 08:43:26 UTC

[airflow] branch main updated: Fix TI success/failure links (#16233)

This is an automated email from the ASF dual-hosted git repository.

msumit pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7432c4d  Fix TI success/failure links (#16233)
7432c4d is described below

commit 7432c4d7ea17ad95cc47c6e772c221d5d141f5e0
Author: Sumit Maheshwari <ms...@users.noreply.github.com>
AuthorDate: Fri Jun 11 14:12:55 2021 +0530

    Fix TI success/failure links (#16233)
    
    fixes issue #15234.
    
    As of now, TI success & failure endpoints are POST only and behave differently as per the "confirmed" flag. They either render a confirmation page or updates the TI states on the basis of that flag, something which is not a great design.
    
    Also, as these endpoints are POST only, they throw a 404 error when someone clicks on the link received via email.
    
    To fix the issue, extracting the rendering functionalities into a diff endpoint "/confirm" & keeping these endpoints as pure POST endpoints.
---
 airflow/models/taskinstance.py             |   3 +-
 airflow/utils/strings.py                   |   2 +-
 airflow/www/templates/airflow/confirm.html |   8 +-
 airflow/www/templates/airflow/dag.html     |   8 +-
 airflow/www/views.py                       | 168 +++++++++++++++++++----------
 tests/www/views/test_views.py              |   1 -
 tests/www/views/test_views_acl.py          |   4 +-
 tests/www/views/test_views_tasks.py        |  60 +++++------
 8 files changed, 153 insertions(+), 101 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index b4e644e..cf066b3 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -533,12 +533,13 @@ class TaskInstance(Base, LoggingMixin):  # pylint: disable=R0902,R0904
         iso = quote(self.execution_date.isoformat())
         base_url = conf.get('webserver', 'BASE_URL')
         return base_url + (
-            "/success"
+            "/confirm"
             f"?task_id={self.task_id}"
             f"&dag_id={self.dag_id}"
             f"&execution_date={iso}"
             "&upstream=false"
             "&downstream=false"
+            "&state=success"
         )
 
     @provide_session
diff --git a/airflow/utils/strings.py b/airflow/utils/strings.py
index 8d73914..c1823ff 100644
--- a/airflow/utils/strings.py
+++ b/airflow/utils/strings.py
@@ -27,4 +27,4 @@ def get_random_string(length=8, choices=string.ascii_letters + string.digits):
 
 def to_boolean(astring):
     """Convert a string to a boolean"""
-    return astring.lower() in ['true', 't', 'y', 'yes', '1']
+    return False if astring is None else astring.lower() in ['true', 't', 'y', 'yes', '1']
diff --git a/airflow/www/templates/airflow/confirm.html b/airflow/www/templates/airflow/confirm.html
index ccf49cb..3bbc908 100644
--- a/airflow/www/templates/airflow/confirm.html
+++ b/airflow/www/templates/airflow/confirm.html
@@ -28,10 +28,14 @@
       <pre><code>{{ details }}</code></pre>
     {% endif %}
   </div>
-  <form method="POST">
+  {% if endpoint %}
+    <form method="POST" action="{{ endpoint }}">
+  {% else %}
+    <form method="POST">
+  {% endif %}
     <input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
     <input type="hidden" name="confirmed" value="true">
-    {% for name,val in request.form.items() if name != "csrf_token" %}
+    {% for name,val in request.values.items() if name != "csrf_token" %}
       <input type="hidden" name="{{ name }}" value="{{ val }}">
     {% endfor %}
     <button type="submit" class="btn btn-primary">OK!</button>
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index 171b4a9..5bab88d 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -284,12 +284,12 @@
             </div>
           </form>
           <hr style="margin-bottom: 8px;">
-          <form method="POST" data-action="{{ url_for('Airflow.failed') }}">
-            <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
+          <form method="GET" data-action="{{ url_for('Airflow.confirm') }}">
             <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
             <input type="hidden" name="task_id">
             <input type="hidden" name="execution_date">
             <input type="hidden" name="origin" value="{{ request.base_url }}">
+            <input type="hidden" name="state" value="failed">
             <div class="row">
               <span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons">
                 <label class="btn btn-default">
@@ -317,12 +317,12 @@
             </div>
           </form>
           <hr style="margin-bottom: 8px;">
-          <form method="POST" data-action="{{ url_for('Airflow.success') }}">
-            <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
+          <form method="GET" data-action="{{ url_for('Airflow.confirm') }}">
             <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
             <input type="hidden" name="task_id">
             <input type="hidden" name="execution_date">
             <input type="hidden" name="origin" value="{{ request.base_url }}">
+            <input type="hidden" name="state" value="success">
             <div class="row">
               <span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons">
                 <label class="btn btn-default">
diff --git a/airflow/www/views.py b/airflow/www/views.py
index e7eeb3c..477b87d 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -111,6 +111,7 @@ from airflow.utils.log import secrets_masker
 from airflow.utils.log.log_reader import TaskLogReader
 from airflow.utils.session import create_session, provide_session
 from airflow.utils.state import State
+from airflow.utils.strings import to_boolean
 from airflow.version import version
 from airflow.www import auth, utils as wwwutils
 from airflow.www.decorators import action_logging, gzipped
@@ -1578,6 +1579,7 @@ class Airflow(AirflowBaseView):  # noqa: D101  pylint: disable=too-many-public-m
 
             response = self.render_template(
                 'airflow/confirm.html',
+                endpoint=None,
                 message="Here's the list of task instances you are about to clear:",
                 details=details,
             )
@@ -1794,7 +1796,6 @@ class Airflow(AirflowBaseView):  # noqa: D101  pylint: disable=too-many-public-m
         task_id,
         origin,
         execution_date,
-        confirmed,
         upstream,
         downstream,
         future,
@@ -1807,54 +1808,104 @@ class Airflow(AirflowBaseView):  # noqa: D101  pylint: disable=too-many-public-m
 
         latest_execution_date = dag.get_latest_execution_date()
         if not latest_execution_date:
-            flash(f"Cannot make {state}, seem that dag {dag_id} has never run", "error")
+            flash(f"Cannot mark tasks as {state}, seem that dag {dag_id} has never run", "error")
             return redirect(origin)
 
         execution_date = timezone.parse(execution_date)
 
         from airflow.api.common.experimental.mark_tasks import set_state
 
-        if confirmed:
-            with create_session() as session:
-                altered = set_state(
-                    tasks=[task],
-                    execution_date=execution_date,
-                    upstream=upstream,
-                    downstream=downstream,
-                    future=future,
-                    past=past,
-                    state=state,
-                    commit=True,
-                    session=session,
-                )
+        with create_session() as session:
+            altered = set_state(
+                tasks=[task],
+                execution_date=execution_date,
+                upstream=upstream,
+                downstream=downstream,
+                future=future,
+                past=past,
+                state=state,
+                commit=True,
+                session=session,
+            )
 
-                # Clear downstream tasks that are in failed/upstream_failed state to resume them.
-                # Flush the session so that the tasks marked success are reflected in the db.
-                session.flush()
-                subdag = dag.partial_subset(
-                    task_ids_or_regex={task_id},
-                    include_downstream=True,
-                    include_upstream=False,
-                )
+            # Clear downstream tasks that are in failed/upstream_failed state to resume them.
+            # Flush the session so that the tasks marked success are reflected in the db.
+            session.flush()
+            subdag = dag.partial_subset(
+                task_ids_or_regex={task_id},
+                include_downstream=True,
+                include_upstream=False,
+            )
 
-                end_date = execution_date if not future else None
-                start_date = execution_date if not past else None
-
-                subdag.clear(
-                    start_date=start_date,
-                    end_date=end_date,
-                    include_subdags=True,
-                    include_parentdag=True,
-                    only_failed=True,
-                    session=session,
-                    # Exclude the task itself from being cleared
-                    exclude_task_ids={task_id},
-                )
+            end_date = execution_date if not future else None
+            start_date = execution_date if not past else None
 
-                session.commit()
+            subdag.clear(
+                start_date=start_date,
+                end_date=end_date,
+                include_subdags=True,
+                include_parentdag=True,
+                only_failed=True,
+                session=session,
+                # Exclude the task itself from being cleared
+                exclude_task_ids={task_id},
+            )
 
-            flash(f"Marked {state} on {len(altered)} task instances")
-            return redirect(origin)
+            session.commit()
+
+        flash(f"Marked {state} on {len(altered)} task instances")
+        return redirect(origin)
+
+    @expose('/confirm', methods=['GET'])
+    @auth.has_access(
+        [
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+        ]
+    )
+    @action_logging
+    def confirm(self):
+        """Show confirmation page for marking tasks as success or failed."""
+        args = request.args
+        dag_id = args.get('dag_id')
+        task_id = args.get('task_id')
+        execution_date = args.get('execution_date')
+        state = args.get('state')
+
+        upstream = to_boolean(args.get('failed_upstream'))
+        downstream = to_boolean(args.get('failed_downstream'))
+        future = to_boolean(args.get('failed_future'))
+        past = to_boolean(args.get('failed_past'))
+
+        try:
+            dag = current_app.dag_bag.get_dag(dag_id)
+        except airflow.exceptions.SerializedDagNotFound:
+            flash(f'DAG {dag_id} not found', "error")
+            return redirect(request.referrer or url_for('Airflow.index'))
+
+        try:
+            task = dag.get_task(task_id)
+        except airflow.exceptions.TaskNotFound:
+            flash(f"Task {task_id} not found", "error")
+            return redirect(request.referrer or url_for('Airflow.index'))
+
+        task.dag = dag
+
+        if state not in (
+            'success',
+            'failed',
+        ):
+            flash(f"Invalid state {state}, must be either 'success' or 'failed'", "error")
+            return redirect(request.referrer or url_for('Airflow.index'))
+
+        latest_execution_date = dag.get_latest_execution_date()
+        if not latest_execution_date:
+            flash(f"Cannot mark tasks as {state}, seem that dag {dag_id} has never run", "error")
+            return redirect(request.referrer or url_for('Airflow.index'))
+
+        execution_date = timezone.parse(execution_date)
+
+        from airflow.api.common.experimental.mark_tasks import set_state
 
         to_be_altered = set_state(
             tasks=[task],
@@ -1871,6 +1922,7 @@ class Airflow(AirflowBaseView):  # noqa: D101  pylint: disable=too-many-public-m
 
         response = self.render_template(
             "airflow/confirm.html",
+            endpoint=url_for(f'Airflow.{state}'),
             message=f"Here's the list of task instances you are about to mark as {state}:",
             details=details,
         )
@@ -1887,23 +1939,22 @@ class Airflow(AirflowBaseView):  # noqa: D101  pylint: disable=too-many-public-m
     @action_logging
     def failed(self):
         """Mark task as failed."""
-        dag_id = request.form.get('dag_id')
-        task_id = request.form.get('task_id')
-        origin = get_safe_url(request.form.get('origin'))
-        execution_date = request.form.get('execution_date')
+        args = request.form
+        dag_id = args.get('dag_id')
+        task_id = args.get('task_id')
+        origin = get_safe_url(args.get('origin'))
+        execution_date = args.get('execution_date')
 
-        confirmed = request.form.get('confirmed') == "true"
-        upstream = request.form.get('failed_upstream') == "true"
-        downstream = request.form.get('failed_downstream') == "true"
-        future = request.form.get('failed_future') == "true"
-        past = request.form.get('failed_past') == "true"
+        upstream = to_boolean(args.get('failed_upstream'))
+        downstream = to_boolean(args.get('failed_downstream'))
+        future = to_boolean(args.get('failed_future'))
+        past = to_boolean(args.get('failed_past'))
 
         return self._mark_task_instance_state(
             dag_id,
             task_id,
             origin,
             execution_date,
-            confirmed,
             upstream,
             downstream,
             future,
@@ -1921,23 +1972,22 @@ class Airflow(AirflowBaseView):  # noqa: D101  pylint: disable=too-many-public-m
     @action_logging
     def success(self):
         """Mark task as success."""
-        dag_id = request.form.get('dag_id')
-        task_id = request.form.get('task_id')
-        origin = get_safe_url(request.form.get('origin'))
-        execution_date = request.form.get('execution_date')
+        args = request.form
+        dag_id = args.get('dag_id')
+        task_id = args.get('task_id')
+        origin = get_safe_url(args.get('origin'))
+        execution_date = args.get('execution_date')
 
-        confirmed = request.form.get('confirmed') == "true"
-        upstream = request.form.get('success_upstream') == "true"
-        downstream = request.form.get('success_downstream') == "true"
-        future = request.form.get('success_future') == "true"
-        past = request.form.get('success_past') == "true"
+        upstream = to_boolean(args.get('failed_upstream'))
+        downstream = to_boolean(args.get('failed_downstream'))
+        future = to_boolean(args.get('failed_future'))
+        past = to_boolean(args.get('failed_past'))
 
         return self._mark_task_instance_state(
             dag_id,
             task_id,
             origin,
             execution_date,
-            confirmed,
             upstream,
             downstream,
             future,
diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py
index 4f7f5d0..e698de8 100644
--- a/tests/www/views/test_views.py
+++ b/tests/www/views/test_views.py
@@ -223,7 +223,6 @@ def test_mark_task_instance_state(test_app):
             task_id=task_1.task_id,
             origin="",
             execution_date=start_date.isoformat(),
-            confirmed=True,
             upstream=False,
             downstream=False,
             future=False,
diff --git a/tests/www/views/test_views_acl.py b/tests/www/views/test_views_acl.py
index 5964081..e82e3e8 100644
--- a/tests/www/views/test_views_acl.py
+++ b/tests/www/views/test_views_acl.py
@@ -690,8 +690,8 @@ def test_failed_success(client_all_dags_edit_tis):
         future="false",
         past="false",
     )
-    resp = client_all_dags_edit_tis.post('failed', data=form)
-    check_content_in_response('example_bash_operator', resp)
+    resp = client_all_dags_edit_tis.post('failed', data=form, follow_redirects=True)
+    check_content_in_response('Marked failed on 1 task instances', resp)
 
 
 @pytest.mark.parametrize(
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index 1250085..dc17c39 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -177,6 +177,34 @@ def init_dagruns(app, reset_dagruns):  # pylint: disable=unused-argument
             ["example_bash_operator"],
             id="existing-dagbag-dag-details",
         ),
+        pytest.param(
+            f'confirm?task_id=runme_0&dag_id=example_bash_operator&state=success'
+            f'&execution_date={DEFAULT_VAL}',
+            ['Wait a minute.'],
+            id="confirm-success",
+        ),
+        pytest.param(
+            f'confirm?task_id=runme_0&dag_id=example_bash_operator&state=failed&execution_date={DEFAULT_VAL}',
+            ['Wait a minute.'],
+            id="confirm-failed",
+        ),
+        pytest.param(
+            f'confirm?task_id=runme_0&dag_id=invalid_dag&state=failed&execution_date={DEFAULT_VAL}',
+            ['DAG invalid_dag not found'],
+            id="confirm-failed",
+        ),
+        pytest.param(
+            f'confirm?task_id=invalid_task&dag_id=example_bash_operator&state=failed'
+            f'&execution_date={DEFAULT_VAL}',
+            ['Task invalid_task not found'],
+            id="confirm-failed",
+        ),
+        pytest.param(
+            f'confirm?task_id=runme_0&dag_id=example_bash_operator&state=invalid'
+            f'&execution_date={DEFAULT_VAL}',
+            ["Invalid state invalid, must be either &#39;success&#39; or &#39;failed&#39;"],
+            id="confirm-invalid",
+        ),
     ],
 )
 def test_views_get(admin_client, url, contents):
@@ -332,20 +360,6 @@ def test_code_from_db_all_example_dags(admin_client):
                 downstream="false",
                 future="false",
                 past="false",
-            ),
-            "Wait a minute",
-        ),
-        (
-            "failed",
-            dict(
-                task_id="run_this_last",
-                dag_id="example_bash_operator",
-                execution_date=DEFAULT_DATE,
-                confirmed="true",
-                upstream="false",
-                downstream="false",
-                future="false",
-                past="false",
                 origin="/graph?dag_id=example_bash_operator",
             ),
             "Marked failed on 1 task instances",
@@ -360,20 +374,6 @@ def test_code_from_db_all_example_dags(admin_client):
                 downstream="false",
                 future="false",
                 past="false",
-            ),
-            'Wait a minute',
-        ),
-        (
-            "success",
-            dict(
-                task_id="run_this_last",
-                dag_id="example_bash_operator",
-                execution_date=DEFAULT_DATE,
-                confirmed="true",
-                upstream="false",
-                downstream="false",
-                future="false",
-                past="false",
                 origin="/graph?dag_id=example_bash_operator",
             ),
             "Marked success on 1 task instances",
@@ -406,9 +406,7 @@ def test_code_from_db_all_example_dags(admin_client):
     ],
     ids=[
         "paused",
-        "failed",
         "failed-flash-hint",
-        "success",
         "success-flash-hint",
         "clear",
         "run",
@@ -434,7 +432,7 @@ def test_dag_never_run(admin_client, url):
     )
     clear_db_runs()
     resp = admin_client.post(url, data=form, follow_redirects=True)
-    check_content_in_response(f"Cannot make {url}, seem that dag {dag_id} has never run", resp)
+    check_content_in_response(f"Cannot mark tasks as {url}, seem that dag {dag_id} has never run", resp)
 
 
 class _ForceHeartbeatCeleryExecutor(CeleryExecutor):