You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ms...@apache.org on 2021/06/11 08:43:26 UTC
[airflow] branch main updated: Fix TI success/failure links (#16233)
This is an automated email from the ASF dual-hosted git repository.
msumit pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7432c4d Fix TI success/failure links (#16233)
7432c4d is described below
commit 7432c4d7ea17ad95cc47c6e772c221d5d141f5e0
Author: Sumit Maheshwari <ms...@users.noreply.github.com>
AuthorDate: Fri Jun 11 14:12:55 2021 +0530
Fix TI success/failure links (#16233)
fixes issue #15234.
As of now, TI success & failure endpoints are POST only and behave differently as per the "confirmed" flag. They either render a confirmation page or updates the TI states on the basis of that flag, something which is not a great design.
Also, as these endpoints are POST only, they throw a 404 error when someone clicks on the link received via email.
To fix the issue, extracting the rendering functionalities into a diff endpoint "/confirm" & keeping these endpoints as pure POST endpoints.
---
airflow/models/taskinstance.py | 3 +-
airflow/utils/strings.py | 2 +-
airflow/www/templates/airflow/confirm.html | 8 +-
airflow/www/templates/airflow/dag.html | 8 +-
airflow/www/views.py | 168 +++++++++++++++++++----------
tests/www/views/test_views.py | 1 -
tests/www/views/test_views_acl.py | 4 +-
tests/www/views/test_views_tasks.py | 60 +++++------
8 files changed, 153 insertions(+), 101 deletions(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index b4e644e..cf066b3 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -533,12 +533,13 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904
iso = quote(self.execution_date.isoformat())
base_url = conf.get('webserver', 'BASE_URL')
return base_url + (
- "/success"
+ "/confirm"
f"?task_id={self.task_id}"
f"&dag_id={self.dag_id}"
f"&execution_date={iso}"
"&upstream=false"
"&downstream=false"
+ "&state=success"
)
@provide_session
diff --git a/airflow/utils/strings.py b/airflow/utils/strings.py
index 8d73914..c1823ff 100644
--- a/airflow/utils/strings.py
+++ b/airflow/utils/strings.py
@@ -27,4 +27,4 @@ def get_random_string(length=8, choices=string.ascii_letters + string.digits):
def to_boolean(astring):
"""Convert a string to a boolean"""
- return astring.lower() in ['true', 't', 'y', 'yes', '1']
+ return False if astring is None else astring.lower() in ['true', 't', 'y', 'yes', '1']
diff --git a/airflow/www/templates/airflow/confirm.html b/airflow/www/templates/airflow/confirm.html
index ccf49cb..3bbc908 100644
--- a/airflow/www/templates/airflow/confirm.html
+++ b/airflow/www/templates/airflow/confirm.html
@@ -28,10 +28,14 @@
<pre><code>{{ details }}</code></pre>
{% endif %}
</div>
- <form method="POST">
+ {% if endpoint %}
+ <form method="POST" action="{{ endpoint }}">
+ {% else %}
+ <form method="POST">
+ {% endif %}
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<input type="hidden" name="confirmed" value="true">
- {% for name,val in request.form.items() if name != "csrf_token" %}
+ {% for name,val in request.values.items() if name != "csrf_token" %}
<input type="hidden" name="{{ name }}" value="{{ val }}">
{% endfor %}
<button type="submit" class="btn btn-primary">OK!</button>
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index 171b4a9..5bab88d 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -284,12 +284,12 @@
</div>
</form>
<hr style="margin-bottom: 8px;">
- <form method="POST" data-action="{{ url_for('Airflow.failed') }}">
- <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
+ <form method="GET" data-action="{{ url_for('Airflow.confirm') }}">
<input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
<input type="hidden" name="task_id">
<input type="hidden" name="execution_date">
<input type="hidden" name="origin" value="{{ request.base_url }}">
+ <input type="hidden" name="state" value="failed">
<div class="row">
<span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons">
<label class="btn btn-default">
@@ -317,12 +317,12 @@
</div>
</form>
<hr style="margin-bottom: 8px;">
- <form method="POST" data-action="{{ url_for('Airflow.success') }}">
- <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
+ <form method="GET" data-action="{{ url_for('Airflow.confirm') }}">
<input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
<input type="hidden" name="task_id">
<input type="hidden" name="execution_date">
<input type="hidden" name="origin" value="{{ request.base_url }}">
+ <input type="hidden" name="state" value="success">
<div class="row">
<span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons">
<label class="btn btn-default">
diff --git a/airflow/www/views.py b/airflow/www/views.py
index e7eeb3c..477b87d 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -111,6 +111,7 @@ from airflow.utils.log import secrets_masker
from airflow.utils.log.log_reader import TaskLogReader
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import State
+from airflow.utils.strings import to_boolean
from airflow.version import version
from airflow.www import auth, utils as wwwutils
from airflow.www.decorators import action_logging, gzipped
@@ -1578,6 +1579,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m
response = self.render_template(
'airflow/confirm.html',
+ endpoint=None,
message="Here's the list of task instances you are about to clear:",
details=details,
)
@@ -1794,7 +1796,6 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m
task_id,
origin,
execution_date,
- confirmed,
upstream,
downstream,
future,
@@ -1807,54 +1808,104 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m
latest_execution_date = dag.get_latest_execution_date()
if not latest_execution_date:
- flash(f"Cannot make {state}, seem that dag {dag_id} has never run", "error")
+ flash(f"Cannot mark tasks as {state}, seem that dag {dag_id} has never run", "error")
return redirect(origin)
execution_date = timezone.parse(execution_date)
from airflow.api.common.experimental.mark_tasks import set_state
- if confirmed:
- with create_session() as session:
- altered = set_state(
- tasks=[task],
- execution_date=execution_date,
- upstream=upstream,
- downstream=downstream,
- future=future,
- past=past,
- state=state,
- commit=True,
- session=session,
- )
+ with create_session() as session:
+ altered = set_state(
+ tasks=[task],
+ execution_date=execution_date,
+ upstream=upstream,
+ downstream=downstream,
+ future=future,
+ past=past,
+ state=state,
+ commit=True,
+ session=session,
+ )
- # Clear downstream tasks that are in failed/upstream_failed state to resume them.
- # Flush the session so that the tasks marked success are reflected in the db.
- session.flush()
- subdag = dag.partial_subset(
- task_ids_or_regex={task_id},
- include_downstream=True,
- include_upstream=False,
- )
+ # Clear downstream tasks that are in failed/upstream_failed state to resume them.
+ # Flush the session so that the tasks marked success are reflected in the db.
+ session.flush()
+ subdag = dag.partial_subset(
+ task_ids_or_regex={task_id},
+ include_downstream=True,
+ include_upstream=False,
+ )
- end_date = execution_date if not future else None
- start_date = execution_date if not past else None
-
- subdag.clear(
- start_date=start_date,
- end_date=end_date,
- include_subdags=True,
- include_parentdag=True,
- only_failed=True,
- session=session,
- # Exclude the task itself from being cleared
- exclude_task_ids={task_id},
- )
+ end_date = execution_date if not future else None
+ start_date = execution_date if not past else None
- session.commit()
+ subdag.clear(
+ start_date=start_date,
+ end_date=end_date,
+ include_subdags=True,
+ include_parentdag=True,
+ only_failed=True,
+ session=session,
+ # Exclude the task itself from being cleared
+ exclude_task_ids={task_id},
+ )
- flash(f"Marked {state} on {len(altered)} task instances")
- return redirect(origin)
+ session.commit()
+
+ flash(f"Marked {state} on {len(altered)} task instances")
+ return redirect(origin)
+
+ @expose('/confirm', methods=['GET'])
+ @auth.has_access(
+ [
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+ ]
+ )
+ @action_logging
+ def confirm(self):
+ """Show confirmation page for marking tasks as success or failed."""
+ args = request.args
+ dag_id = args.get('dag_id')
+ task_id = args.get('task_id')
+ execution_date = args.get('execution_date')
+ state = args.get('state')
+
+ upstream = to_boolean(args.get('failed_upstream'))
+ downstream = to_boolean(args.get('failed_downstream'))
+ future = to_boolean(args.get('failed_future'))
+ past = to_boolean(args.get('failed_past'))
+
+ try:
+ dag = current_app.dag_bag.get_dag(dag_id)
+ except airflow.exceptions.SerializedDagNotFound:
+ flash(f'DAG {dag_id} not found', "error")
+ return redirect(request.referrer or url_for('Airflow.index'))
+
+ try:
+ task = dag.get_task(task_id)
+ except airflow.exceptions.TaskNotFound:
+ flash(f"Task {task_id} not found", "error")
+ return redirect(request.referrer or url_for('Airflow.index'))
+
+ task.dag = dag
+
+ if state not in (
+ 'success',
+ 'failed',
+ ):
+ flash(f"Invalid state {state}, must be either 'success' or 'failed'", "error")
+ return redirect(request.referrer or url_for('Airflow.index'))
+
+ latest_execution_date = dag.get_latest_execution_date()
+ if not latest_execution_date:
+ flash(f"Cannot mark tasks as {state}, seem that dag {dag_id} has never run", "error")
+ return redirect(request.referrer or url_for('Airflow.index'))
+
+ execution_date = timezone.parse(execution_date)
+
+ from airflow.api.common.experimental.mark_tasks import set_state
to_be_altered = set_state(
tasks=[task],
@@ -1871,6 +1922,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m
response = self.render_template(
"airflow/confirm.html",
+ endpoint=url_for(f'Airflow.{state}'),
message=f"Here's the list of task instances you are about to mark as {state}:",
details=details,
)
@@ -1887,23 +1939,22 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m
@action_logging
def failed(self):
"""Mark task as failed."""
- dag_id = request.form.get('dag_id')
- task_id = request.form.get('task_id')
- origin = get_safe_url(request.form.get('origin'))
- execution_date = request.form.get('execution_date')
+ args = request.form
+ dag_id = args.get('dag_id')
+ task_id = args.get('task_id')
+ origin = get_safe_url(args.get('origin'))
+ execution_date = args.get('execution_date')
- confirmed = request.form.get('confirmed') == "true"
- upstream = request.form.get('failed_upstream') == "true"
- downstream = request.form.get('failed_downstream') == "true"
- future = request.form.get('failed_future') == "true"
- past = request.form.get('failed_past') == "true"
+ upstream = to_boolean(args.get('failed_upstream'))
+ downstream = to_boolean(args.get('failed_downstream'))
+ future = to_boolean(args.get('failed_future'))
+ past = to_boolean(args.get('failed_past'))
return self._mark_task_instance_state(
dag_id,
task_id,
origin,
execution_date,
- confirmed,
upstream,
downstream,
future,
@@ -1921,23 +1972,22 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m
@action_logging
def success(self):
"""Mark task as success."""
- dag_id = request.form.get('dag_id')
- task_id = request.form.get('task_id')
- origin = get_safe_url(request.form.get('origin'))
- execution_date = request.form.get('execution_date')
+ args = request.form
+ dag_id = args.get('dag_id')
+ task_id = args.get('task_id')
+ origin = get_safe_url(args.get('origin'))
+ execution_date = args.get('execution_date')
- confirmed = request.form.get('confirmed') == "true"
- upstream = request.form.get('success_upstream') == "true"
- downstream = request.form.get('success_downstream') == "true"
- future = request.form.get('success_future') == "true"
- past = request.form.get('success_past') == "true"
+ upstream = to_boolean(args.get('failed_upstream'))
+ downstream = to_boolean(args.get('failed_downstream'))
+ future = to_boolean(args.get('failed_future'))
+ past = to_boolean(args.get('failed_past'))
return self._mark_task_instance_state(
dag_id,
task_id,
origin,
execution_date,
- confirmed,
upstream,
downstream,
future,
diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py
index 4f7f5d0..e698de8 100644
--- a/tests/www/views/test_views.py
+++ b/tests/www/views/test_views.py
@@ -223,7 +223,6 @@ def test_mark_task_instance_state(test_app):
task_id=task_1.task_id,
origin="",
execution_date=start_date.isoformat(),
- confirmed=True,
upstream=False,
downstream=False,
future=False,
diff --git a/tests/www/views/test_views_acl.py b/tests/www/views/test_views_acl.py
index 5964081..e82e3e8 100644
--- a/tests/www/views/test_views_acl.py
+++ b/tests/www/views/test_views_acl.py
@@ -690,8 +690,8 @@ def test_failed_success(client_all_dags_edit_tis):
future="false",
past="false",
)
- resp = client_all_dags_edit_tis.post('failed', data=form)
- check_content_in_response('example_bash_operator', resp)
+ resp = client_all_dags_edit_tis.post('failed', data=form, follow_redirects=True)
+ check_content_in_response('Marked failed on 1 task instances', resp)
@pytest.mark.parametrize(
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index 1250085..dc17c39 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -177,6 +177,34 @@ def init_dagruns(app, reset_dagruns): # pylint: disable=unused-argument
["example_bash_operator"],
id="existing-dagbag-dag-details",
),
+ pytest.param(
+ f'confirm?task_id=runme_0&dag_id=example_bash_operator&state=success'
+ f'&execution_date={DEFAULT_VAL}',
+ ['Wait a minute.'],
+ id="confirm-success",
+ ),
+ pytest.param(
+ f'confirm?task_id=runme_0&dag_id=example_bash_operator&state=failed&execution_date={DEFAULT_VAL}',
+ ['Wait a minute.'],
+ id="confirm-failed",
+ ),
+ pytest.param(
+ f'confirm?task_id=runme_0&dag_id=invalid_dag&state=failed&execution_date={DEFAULT_VAL}',
+ ['DAG invalid_dag not found'],
+ id="confirm-failed",
+ ),
+ pytest.param(
+ f'confirm?task_id=invalid_task&dag_id=example_bash_operator&state=failed'
+ f'&execution_date={DEFAULT_VAL}',
+ ['Task invalid_task not found'],
+ id="confirm-failed",
+ ),
+ pytest.param(
+ f'confirm?task_id=runme_0&dag_id=example_bash_operator&state=invalid'
+ f'&execution_date={DEFAULT_VAL}',
+ ["Invalid state invalid, must be either 'success' or 'failed'"],
+ id="confirm-invalid",
+ ),
],
)
def test_views_get(admin_client, url, contents):
@@ -332,20 +360,6 @@ def test_code_from_db_all_example_dags(admin_client):
downstream="false",
future="false",
past="false",
- ),
- "Wait a minute",
- ),
- (
- "failed",
- dict(
- task_id="run_this_last",
- dag_id="example_bash_operator",
- execution_date=DEFAULT_DATE,
- confirmed="true",
- upstream="false",
- downstream="false",
- future="false",
- past="false",
origin="/graph?dag_id=example_bash_operator",
),
"Marked failed on 1 task instances",
@@ -360,20 +374,6 @@ def test_code_from_db_all_example_dags(admin_client):
downstream="false",
future="false",
past="false",
- ),
- 'Wait a minute',
- ),
- (
- "success",
- dict(
- task_id="run_this_last",
- dag_id="example_bash_operator",
- execution_date=DEFAULT_DATE,
- confirmed="true",
- upstream="false",
- downstream="false",
- future="false",
- past="false",
origin="/graph?dag_id=example_bash_operator",
),
"Marked success on 1 task instances",
@@ -406,9 +406,7 @@ def test_code_from_db_all_example_dags(admin_client):
],
ids=[
"paused",
- "failed",
"failed-flash-hint",
- "success",
"success-flash-hint",
"clear",
"run",
@@ -434,7 +432,7 @@ def test_dag_never_run(admin_client, url):
)
clear_db_runs()
resp = admin_client.post(url, data=form, follow_redirects=True)
- check_content_in_response(f"Cannot make {url}, seem that dag {dag_id} has never run", resp)
+ check_content_in_response(f"Cannot mark tasks as {url}, seem that dag {dag_id} has never run", resp)
class _ForceHeartbeatCeleryExecutor(CeleryExecutor):