You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/02/15 20:51:58 UTC

[airflow] branch v2-2-test updated (7c93cf7 -> a520845)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a change to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from 7c93cf7  Fix postgres hook import pipeline tutorial (#21491)
     new f27357f  Use compat data interval shim in log handlers (#21289)
     new 57abbac  Show task status only for running dags or only for the last finished dag (#21352)
     new a0ea47e  Add a session backend to store session data in the database (#21478)
     new 94c6134  Simplify trigger cancel button (#21591)
     new a520845  update tutorial_etl_dag notes (#21503)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/config_templates/config.yml                |  7 +++
 airflow/config_templates/default_airflow.cfg       |  4 ++
 airflow/example_dags/tutorial_etl_dag.py           |  4 +-
 ....py => c381b21cb7e4_add_session_table_to_db.py} | 34 ++++++-----
 .../providers/elasticsearch/log/es_task_handler.py | 27 ++++++---
 airflow/utils/db.py                                |  3 +
 airflow/utils/log/file_task_handler.py             | 35 +++++++++---
 airflow/www/app.py                                 |  3 +-
 airflow/www/extensions/init_session.py             | 63 ++++++++++++---------
 .../www/{extensions/init_session.py => session.py} | 29 ++++------
 airflow/www/templates/airflow/trigger.html         |  2 +-
 airflow/www/views.py                               | 64 ++++++++++++++++-----
 docs/apache-airflow/migrations-ref.rst             |  4 +-
 docs/spelling_wordlist.txt                         |  1 +
 setup.cfg                                          |  3 +
 tests/api_connexion/conftest.py                    |  7 ++-
 tests/api_connexion/test_security.py               |  4 ++
 tests/test_utils/decorators.py                     |  2 +-
 tests/utils/test_db.py                             |  3 +
 tests/www/views/conftest.py                        |  1 +
 tests/www/views/test_session.py                    | 65 ++++++++++++++++++++++
 tests/www/views/test_views.py                      | 35 +++++++++++-
 tests/www/views/test_views_trigger_dag.py          | 11 ++--
 23 files changed, 308 insertions(+), 103 deletions(-)
 copy airflow/migrations/versions/{f2ca10b85618_add_dag_stats_table.py => c381b21cb7e4_add_session_table_to_db.py} (61%)
 copy airflow/www/{extensions/init_session.py => session.py} (59%)
 create mode 100644 tests/www/views/test_session.py

[airflow] 03/05: Add a session backend to store session data in the database (#21478)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a0ea47e4ebabcbae399b9063b38c44e1f26bcc4c
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Feb 15 10:57:46 2022 -0700

    Add a session backend to store session data in the database (#21478)
    
    Co-authored-by: Jed Cunningham <je...@apache.org>
    (cherry picked from commit da9d0863c7ff121c111a455708163b026943bdf1)
---
 airflow/config_templates/config.yml                |  7 +++
 airflow/config_templates/default_airflow.cfg       |  4 ++
 .../c381b21cb7e4_add_session_table_to_db.py        | 54 ++++++++++++++++++
 airflow/utils/db.py                                |  3 +
 airflow/www/app.py                                 |  3 +-
 airflow/www/extensions/init_session.py             | 63 ++++++++++++---------
 .../www/{extensions/init_session.py => session.py} | 29 ++++------
 docs/apache-airflow/migrations-ref.rst             |  4 +-
 docs/spelling_wordlist.txt                         |  1 +
 setup.cfg                                          |  3 +
 tests/api_connexion/conftest.py                    |  7 ++-
 tests/api_connexion/test_security.py               |  4 ++
 tests/test_utils/decorators.py                     |  2 +-
 tests/utils/test_db.py                             |  3 +
 tests/www/views/conftest.py                        |  1 +
 tests/www/views/test_session.py                    | 65 ++++++++++++++++++++++
 16 files changed, 205 insertions(+), 48 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 6941f03..1e77041 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -999,6 +999,13 @@
       type: string
       example: ~
       default: ""
+    - name: session_backend
+      description: |
+        The type of backend used to store web session data, can be 'database' or 'securecookie'
+      version_added: 2.2.4
+      type: string
+      example: securecookie
+      default: database
     - name: web_server_master_timeout
       description: |
         Number of seconds the webserver waits before killing gunicorn master that doesn't respond
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 6a5449b..826eaf4 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -516,6 +516,10 @@ web_server_ssl_cert =
 # provided SSL will be enabled. This does not change the web server port.
 web_server_ssl_key =
 
+# The type of backend used to store web session data, can be 'database' or 'securecookie'
+# Example: session_backend = securecookie
+session_backend = database
+
 # Number of seconds the webserver waits before killing gunicorn master that doesn't respond
 web_server_master_timeout = 120
 
diff --git a/airflow/migrations/versions/c381b21cb7e4_add_session_table_to_db.py b/airflow/migrations/versions/c381b21cb7e4_add_session_table_to_db.py
new file mode 100644
index 0000000..cc6b9ab
--- /dev/null
+++ b/airflow/migrations/versions/c381b21cb7e4_add_session_table_to_db.py
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add session table to db
+
+Revision ID: c381b21cb7e4
+Revises: be2bfac3da23
+Create Date: 2022-01-25 13:56:35.069429
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = 'c381b21cb7e4'
+down_revision = 'be2bfac3da23'
+branch_labels = None
+depends_on = None
+
+TABLE_NAME = 'session'
+
+
+def upgrade():
+    """Apply add session table to db"""
+    op.create_table(
+        TABLE_NAME,
+        sa.Column('id', sa.Integer()),
+        sa.Column('session_id', sa.String(255)),
+        sa.Column('data', sa.LargeBinary()),
+        sa.Column('expiry', sa.DateTime()),
+        sa.PrimaryKeyConstraint('id'),
+        sa.UniqueConstraint('session_id'),
+    )
+
+
+def downgrade():
+    """Unapply add session table to db"""
+    op.drop_table(TABLE_NAME)
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 023f482..c038d66 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -954,9 +954,12 @@ def drop_airflow_models(connection):
     users.drop(settings.engine, checkfirst=True)
     dag_stats = Table('dag_stats', Base.metadata)
     dag_stats.drop(settings.engine, checkfirst=True)
+    session = Table('session', Base.metadata)
+    session.drop(settings.engine, checkfirst=True)
 
     Base.metadata.drop_all(connection)
     # we remove the Tables here so that if resetdb is run metadata does not keep the old tables.
+    Base.metadata.remove(session)
     Base.metadata.remove(dag_stats)
     Base.metadata.remove(users)
     Base.metadata.remove(user)
diff --git a/airflow/www/app.py b/airflow/www/app.py
index 2de041b..16780cb 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -36,7 +36,7 @@ from airflow.www.extensions.init_jinja_globals import init_jinja_globals
 from airflow.www.extensions.init_manifest_files import configure_manifest_files
 from airflow.www.extensions.init_robots import init_robots
 from airflow.www.extensions.init_security import init_api_experimental_auth, init_xframe_protection
-from airflow.www.extensions.init_session import init_airflow_session_interface, init_permanent_session
+from airflow.www.extensions.init_session import init_airflow_session_interface
 from airflow.www.extensions.init_views import (
     init_api_connexion,
     init_api_experimental,
@@ -135,7 +135,6 @@ def create_app(config=None, testing=False):
 
         init_jinja_globals(flask_app)
         init_xframe_protection(flask_app)
-        init_permanent_session(flask_app)
         init_airflow_session_interface(flask_app)
     return flask_app
 
diff --git a/airflow/www/extensions/init_session.py b/airflow/www/extensions/init_session.py
index 06e0ba5..7a09de7 100644
--- a/airflow/www/extensions/init_session.py
+++ b/airflow/www/extensions/init_session.py
@@ -15,33 +15,46 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from flask import request, session as flask_session
-from flask.sessions import SecureCookieSessionInterface
+from flask import session as builtin_flask_session
 
-
-class AirflowSessionInterface(SecureCookieSessionInterface):
-    """
-    Airflow cookie session interface.
-    Modifications of sessions should be done here because
-    the change here is global.
-    """
-
-    def save_session(self, *args, **kwargs):
-        """Prevent creating session from REST API requests."""
-        if request.blueprint == '/api/v1':
-            return None
-        return super().save_session(*args, **kwargs)
-
-
-def init_permanent_session(app):
-    """Make session permanent to allows us to store data"""
-
-    def make_session_permanent():
-        flask_session.permanent = True
-
-    app.before_request(make_session_permanent)
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException
+from airflow.www.session import AirflowDatabaseSessionInterface, AirflowSecureCookieSessionInterface
 
 
 def init_airflow_session_interface(app):
     """Set airflow session interface"""
-    app.session_interface = AirflowSessionInterface()
+    config = app.config.copy()
+    selected_backend = conf.get('webserver', 'SESSION_BACKEND')
+    # A bit of a misnomer - normally cookies expire whenever the browser is closed
+    # or when they hit their expiry datetime, whichever comes first. "Permanent"
+    # cookies only expire when they hit their expiry datetime, and can outlive
+    # the browser being closed.
+    permanent_cookie = config.get('SESSION_PERMANENT', True)
+
+    if selected_backend == 'securecookie':
+        app.session_interface = AirflowSecureCookieSessionInterface()
+        if permanent_cookie:
+
+            def make_session_permanent():
+                builtin_flask_session.permanent = True
+
+            app.before_request(make_session_permanent)
+    elif selected_backend == 'database':
+        app.session_interface = AirflowDatabaseSessionInterface(
+            app=app,
+            db=None,
+            permanent=permanent_cookie,
+            # Typically these would be configurable with Flask-Session,
+            # but we will set them explicitly instead as they don't make
+            # sense to have configurable in Airflow's use case
+            table='session',
+            key_prefix='',
+            use_signer=True,
+        )
+    else:
+        raise AirflowConfigException(
+            "Unrecognized session backend specified in "
+            f"web_server_session_backend: '{selected_backend}'. Please set "
+            "this to either 'database' or 'securecookie'."
+        )
diff --git a/airflow/www/extensions/init_session.py b/airflow/www/session.py
similarity index 59%
copy from airflow/www/extensions/init_session.py
copy to airflow/www/session.py
index 06e0ba5..4092565 100644
--- a/airflow/www/extensions/init_session.py
+++ b/airflow/www/session.py
@@ -15,33 +15,26 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from flask import request, session as flask_session
+from flask import request
 from flask.sessions import SecureCookieSessionInterface
+from flask_session.sessions import SqlAlchemySessionInterface
 
 
-class AirflowSessionInterface(SecureCookieSessionInterface):
-    """
-    Airflow cookie session interface.
-    Modifications of sessions should be done here because
-    the change here is global.
-    """
+class SesssionExemptMixin:
+    """Exempt certain blueprints/paths from autogenerated sessions"""
 
     def save_session(self, *args, **kwargs):
-        """Prevent creating session from REST API requests."""
+        """Prevent creating session from REST API and health requests."""
         if request.blueprint == '/api/v1':
             return None
+        if request.path == '/health':
+            return None
         return super().save_session(*args, **kwargs)
 
 
-def init_permanent_session(app):
-    """Make session permanent to allows us to store data"""
-
-    def make_session_permanent():
-        flask_session.permanent = True
-
-    app.before_request(make_session_permanent)
+class AirflowDatabaseSessionInterface(SesssionExemptMixin, SqlAlchemySessionInterface):
+    """Session interface that exempts some routes and stores session data in the database"""
 
 
-def init_airflow_session_interface(app):
-    """Set airflow session interface"""
-    app.session_interface = AirflowSessionInterface()
+class AirflowSecureCookieSessionInterface(SesssionExemptMixin, SecureCookieSessionInterface):
+    """Session interface that exempts some routes and stores session data in a signed cookie"""
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index 016c624..8dc1a55 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | Description                                                                           |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``be2bfac3da23`` (head)        | ``7b2661a43ba3`` | ``2.2.3``       | Add has_import_errors column to DagModel                                              |
+| ``c381b21cb7e4`` (head)        | ``be2bfac3da23`` | ``2.2.4``       | Create a ``session`` table to store web session data                                  |
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``be2bfac3da23``               | ``7b2661a43ba3`` | ``2.2.3``       | Add has_import_errors column to DagModel                                              |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | ``7b2661a43ba3``               | ``142555e44c17`` | ``2.2.0``       | Change ``TaskInstance`` and ``TaskReschedule`` tables from execution_date to run_id.  |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 5d77e29..ed114b6 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1222,6 +1222,7 @@ sdk
 secretRef
 secretRefs
 securable
+securecookie
 securityManager
 seealso
 seedlist
diff --git a/setup.cfg b/setup.cfg
index 7ab5c77..8e36d06 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -107,6 +107,9 @@ install_requires =
     flask-appbuilder>=3.3.4, <4.0.0
     flask-caching>=1.5.0, <2.0.0
     flask-login>=0.3, <0.5
+    # Strict upper-bound on the latest release of flask-session,
+    # as any schema changes will require a migration.
+    flask-session>=0.3.1, <=0.4.0
     flask-wtf>=0.14.3, <0.15
     graphviz>=0.12
     gunicorn>=20.1.0
diff --git a/tests/api_connexion/conftest.py b/tests/api_connexion/conftest.py
index cc92733..9b37b52 100644
--- a/tests/api_connexion/conftest.py
+++ b/tests/api_connexion/conftest.py
@@ -25,7 +25,12 @@ from tests.test_utils.decorators import dont_initialize_flask_app_submodules
 @pytest.fixture(scope="session")
 def minimal_app_for_api():
     @dont_initialize_flask_app_submodules(
-        skip_all_except=["init_appbuilder", "init_api_experimental_auth", "init_api_connexion"]
+        skip_all_except=[
+            "init_appbuilder",
+            "init_api_experimental_auth",
+            "init_api_connexion",
+            "init_airflow_session_interface",
+        ]
     )
     def factory():
         with conf_vars({("api", "auth_backend"): "tests.test_utils.remote_user_api_auth_backend"}):
diff --git a/tests/api_connexion/test_security.py b/tests/api_connexion/test_security.py
index 244a8a2..68f6d31 100644
--- a/tests/api_connexion/test_security.py
+++ b/tests/api_connexion/test_security.py
@@ -45,3 +45,7 @@ class TestSession:
     def test_session_not_created_on_api_request(self):
         self.client.get("api/v1/dags", environ_overrides={'REMOTE_USER': "test"})
         assert all(cookie.name != "session" for cookie in self.client.cookie_jar)
+
+    def test_session_not_created_on_health_endpoint_request(self):
+        self.client.get("health")
+        assert all(cookie.name != "session" for cookie in self.client.cookie_jar)
diff --git a/tests/test_utils/decorators.py b/tests/test_utils/decorators.py
index d08d159..949df63 100644
--- a/tests/test_utils/decorators.py
+++ b/tests/test_utils/decorators.py
@@ -42,7 +42,7 @@ def dont_initialize_flask_app_submodules(_func=None, *, skip_all_except=None):
             "sync_appbuilder_roles",
             "init_jinja_globals",
             "init_xframe_protection",
-            "init_permanent_session",
+            "init_airflow_session_interface",
             "init_appbuilder",
         ]
 
diff --git a/tests/utils/test_db.py b/tests/utils/test_db.py
index 601dc6f..27fa67b 100644
--- a/tests/utils/test_db.py
+++ b/tests/utils/test_db.py
@@ -74,6 +74,9 @@ class TestDb(unittest.TestCase):
             lambda t: (t[0] == 'remove_table' and t[1].name == 'spt_fallback_usg'),
             lambda t: (t[0] == 'remove_table' and t[1].name == 'MSreplication_options'),
             lambda t: (t[0] == 'remove_table' and t[1].name == 'spt_fallback_dev'),
+            # Ignore flask-session table/index
+            lambda t: (t[0] == 'remove_table' and t[1].name == 'session'),
+            lambda t: (t[0] == 'remove_index' and t[1].name == 'session_id'),
         ]
         for ignore in ignores:
             diff = [d for d in diff if not ignore(d)]
diff --git a/tests/www/views/conftest.py b/tests/www/views/conftest.py
index 05fe1e4..f95a814 100644
--- a/tests/www/views/conftest.py
+++ b/tests/www/views/conftest.py
@@ -55,6 +55,7 @@ def app(examples_dag_bag):
             "init_flash_views",
             "init_jinja_globals",
             "init_plugins",
+            "init_airflow_session_interface",
         ]
     )
     def factory():
diff --git a/tests/www/views/test_session.py b/tests/www/views/test_session.py
new file mode 100644
index 0000000..9fb6f36
--- /dev/null
+++ b/tests/www/views/test_session.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from airflow.exceptions import AirflowConfigException
+from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.decorators import dont_initialize_flask_app_submodules
+
+
+def test_session_cookie_created_on_login(user_client):
+    assert any(cookie.name == 'session' for cookie in user_client.cookie_jar)
+
+
+def test_session_inaccessible_after_logout(user_client):
+    session_cookie = next((cookie for cookie in user_client.cookie_jar if cookie.name == 'session'), None)
+    assert session_cookie is not None
+
+    resp = user_client.get('/logout/')
+    assert resp.status_code == 302
+
+    # Try to access /home with the session cookie from earlier
+    user_client.set_cookie('session', session_cookie.value)
+    user_client.get('/home/')
+    assert resp.status_code == 302
+
+
+def test_invalid_session_backend_option():
+    @dont_initialize_flask_app_submodules(
+        skip_all_except=[
+            "init_api_connexion",
+            "init_appbuilder",
+            "init_appbuilder_links",
+            "init_appbuilder_views",
+            "init_flash_views",
+            "init_jinja_globals",
+            "init_plugins",
+            "init_airflow_session_interface",
+        ]
+    )
+    def poorly_configured_app_factory():
+        with conf_vars({("webserver", "session_backend"): "invalid_value_for_session_backend"}):
+            return app.create_app(testing=True)
+
+    expected_exc_regex = (
+        "^Unrecognized session backend specified in web_server_session_backend: "
+        r"'invalid_value_for_session_backend'\. Please set this to .+\.$"
+    )
+    with pytest.raises(AirflowConfigException, match=expected_exc_regex):
+        poorly_configured_app_factory()

[airflow] 01/05: Use compat data interval shim in log handlers (#21289)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f27357fa415ff23021073a3a8c218fe9b99a143b
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Sat Feb 12 11:40:29 2022 +0800

    Use compat data interval shim in log handlers (#21289)
    
    (cherry picked from commit 44bd211b19dcb75eeb53ced5bea2cf0c80654b1a)
---
 .../providers/elasticsearch/log/es_task_handler.py | 27 ++++++++++++-----
 airflow/utils/log/file_task_handler.py             | 35 +++++++++++++++++-----
 2 files changed, 46 insertions(+), 16 deletions(-)

diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index cd08971..b591aef 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -101,15 +101,25 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
         self.context_set = False
 
     def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
-        dag_run = ti.dag_run
+        dag_run = ti.get_dagrun()
+        try:
+            data_interval: Tuple[datetime, datetime] = ti.task.dag.get_run_data_interval(dag_run)
+        except AttributeError:  # ti.task is not always set.
+            data_interval = (dag_run.data_interval_start, dag_run.data_interval_end)
 
         if self.json_format:
-            data_interval_start = self._clean_date(dag_run.data_interval_start)
-            data_interval_end = self._clean_date(dag_run.data_interval_end)
+            data_interval_start = self._clean_date(data_interval[0])
+            data_interval_end = self._clean_date(data_interval[1])
             execution_date = self._clean_date(dag_run.execution_date)
         else:
-            data_interval_start = dag_run.data_interval_start.isoformat()
-            data_interval_end = dag_run.data_interval_end.isoformat()
+            if data_interval[0]:
+                data_interval_start = data_interval[0].isoformat()
+            else:
+                data_interval_start = ""
+            if data_interval[1]:
+                data_interval_end = data_interval[1].isoformat()
+            else:
+                data_interval_end = ""
             execution_date = dag_run.execution_date.isoformat()
 
         return self.log_id_template.format(
@@ -123,14 +133,15 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
         )
 
     @staticmethod
-    def _clean_date(value: datetime) -> str:
+    def _clean_date(value: Optional[datetime]) -> str:
         """
         Clean up a date value so that it is safe to query in elasticsearch
         by removing reserved characters.
-        # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters
 
-        :param execution_date: execution date of the dag run.
+        https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters
         """
+        if value is None:
+            return ""
         return value.strftime("%Y_%m_%dT%H_%M_%S_%f")
 
     def _group_logs_by_host(self, logs):
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 6e57c67..e13b8d4 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -18,8 +18,9 @@
 """File logging handler for tasks."""
 import logging
 import os
+from datetime import datetime
 from pathlib import Path
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING, Optional, Tuple
 
 import httpx
 from itsdangerous import TimedJSONWebSignatureSerializer
@@ -82,13 +83,31 @@ class FileTaskHandler(logging.Handler):
                 context = Context(ti=ti, ts=ti.get_dagrun().logical_date.isoformat())
             context["try_number"] = try_number
             return render_template_to_string(self.filename_jinja_template, context)
-
-        return self.filename_template.format(
-            dag_id=ti.dag_id,
-            task_id=ti.task_id,
-            execution_date=ti.get_dagrun().logical_date.isoformat(),
-            try_number=try_number,
-        )
+        elif self.filename_template:
+            dag_run = ti.get_dagrun()
+            try:
+                data_interval: Tuple[datetime, datetime] = ti.task.dag.get_run_data_interval(dag_run)
+            except AttributeError:  # ti.task is not always set.
+                data_interval = (dag_run.data_interval_start, dag_run.data_interval_end)
+            if data_interval[0]:
+                data_interval_start = data_interval[0].isoformat()
+            else:
+                data_interval_start = ""
+            if data_interval[1]:
+                data_interval_end = data_interval[1].isoformat()
+            else:
+                data_interval_end = ""
+            return self.filename_template.format(
+                dag_id=ti.dag_id,
+                task_id=ti.task_id,
+                run_id=ti.run_id,
+                data_interval_start=data_interval_start,
+                data_interval_end=data_interval_end,
+                execution_date=ti.get_dagrun().logical_date.isoformat(),
+                try_number=try_number,
+            )
+        else:
+            raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen")
 
     def _read_grouped_logs(self):
         return False

[airflow] 04/05: Simplify trigger cancel button (#21591)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 94c61345245fc62a9b48f9f6f51e3bb9230612b7
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Feb 15 11:00:26 2022 -0700

    Simplify trigger cancel button (#21591)
    
    Co-authored-by: Jed Cunningham <je...@apache.org>
    (cherry picked from commit 65297673a318660fba76797e50d0c06804dfcafc)
---
 airflow/www/templates/airflow/trigger.html |  2 +-
 tests/www/views/test_views_trigger_dag.py  | 11 +++++------
 2 files changed, 6 insertions(+), 7 deletions(-)

diff --git a/airflow/www/templates/airflow/trigger.html b/airflow/www/templates/airflow/trigger.html
index efc1650..2388d4e 100644
--- a/airflow/www/templates/airflow/trigger.html
+++ b/airflow/www/templates/airflow/trigger.html
@@ -63,7 +63,7 @@
       </label>
     </div>
     <button type="submit" class="btn btn-primary">Trigger</button>
-    <button type="button" class="btn" onclick="location.href = '{{ origin }}'; return false">Cancel</button>
+    <a class="btn" href="{{ origin }}">Cancel</a>
   </form>
 {% endblock %}
 
diff --git a/tests/www/views/test_views_trigger_dag.py b/tests/www/views/test_views_trigger_dag.py
index f261438..2b43468 100644
--- a/tests/www/views/test_views_trigger_dag.py
+++ b/tests/www/views/test_views_trigger_dag.py
@@ -134,6 +134,10 @@ def test_trigger_dag_form(admin_client):
         ("http://google.com", "/home"),
         ("36539'%3balert(1)%2f%2f166", "/home"),
         (
+            '"><script>alert(99)</script><a href="',
+            "&#34;&gt;&lt;script&gt;alert(99)&lt;/script&gt;&lt;a href=&#34;",
+        ),
+        (
             "%2Ftree%3Fdag_id%3Dexample_bash_operator';alert(33)//",
             "/home",
         ),
@@ -145,12 +149,7 @@ def test_trigger_dag_form_origin_url(admin_client, test_origin, expected_origin)
     test_dag_id = "example_bash_operator"
 
     resp = admin_client.get(f'trigger?dag_id={test_dag_id}&origin={test_origin}')
-    check_content_in_response(
-        '<button type="button" class="btn" onclick="location.href = \'{}\'; return false">'.format(
-            expected_origin
-        ),
-        resp,
-    )
+    check_content_in_response(f'<a class="btn" href="{expected_origin}">Cancel</a>', resp)
 
 
 @pytest.mark.parametrize(

[airflow] 02/05: Show task status only for running dags or only for the last finished dag (#21352)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 57abbac689472fd04db036e0dc06794971a0a68e
Author: Aleksey Kirilishin <54...@users.noreply.github.com>
AuthorDate: Mon Feb 14 18:55:00 2022 +0300

    Show task status only for running dags or only for the last finished dag (#21352)
    
    * Show task status only for running dags or only for the last finished dag
    
    * Brought the logic of getting task statistics into a separate function
    
    (cherry picked from commit 28d7bde2750c38300e5cf70ba32be153b1a11f2c)
---
 airflow/www/views.py          | 64 ++++++++++++++++++++++++++++++++++---------
 tests/www/views/test_views.py | 35 ++++++++++++++++++++++-
 2 files changed, 85 insertions(+), 14 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index 2ed2a67..9ebe899 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -408,6 +408,31 @@ def dag_edges(dag):
     return result
 
 
+def get_task_stats_from_query(qry):
+    """
+    Return a dict of the task quantity, grouped by dag id and task status.
+
+    :param qry: The data in the format (<dag id>, <task state>, <is dag running>, <task count>),
+        ordered by <dag id> and <is dag running>
+    """
+    data = {}
+    last_dag_id = None
+    has_running_dags = False
+    for dag_id, state, is_dag_running, count in qry:
+        if last_dag_id != dag_id:
+            last_dag_id = dag_id
+            has_running_dags = False
+        elif not is_dag_running and has_running_dags:
+            continue
+
+        if is_dag_running:
+            has_running_dags = True
+        if dag_id not in data:
+            data[dag_id] = {}
+        data[dag_id][state] = count
+    return data
+
+
 ######################################################################################
 #                                    Error handlers
 ######################################################################################
@@ -814,7 +839,9 @@ class Airflow(AirflowBaseView):
 
         # Select all task_instances from active dag_runs.
         running_task_instance_query_result = session.query(
-            TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state')
+            TaskInstance.dag_id.label('dag_id'),
+            TaskInstance.state.label('state'),
+            sqla.literal(True).label('is_dag_running'),
         ).join(
             running_dag_run_query_result,
             and_(
@@ -838,7 +865,11 @@ class Airflow(AirflowBaseView):
             # Select all task_instances from active dag_runs.
             # If no dag_run is active, return task instances from most recent dag_run.
             last_task_instance_query_result = (
-                session.query(TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state'))
+                session.query(
+                    TaskInstance.dag_id.label('dag_id'),
+                    TaskInstance.state.label('state'),
+                    sqla.literal(False).label('is_dag_running'),
+                )
                 .join(TaskInstance.dag_run)
                 .join(
                     last_dag_run,
@@ -855,18 +886,25 @@ class Airflow(AirflowBaseView):
         else:
             final_task_instance_query_result = running_task_instance_query_result.subquery('final_ti')
 
-        qry = session.query(
-            final_task_instance_query_result.c.dag_id,
-            final_task_instance_query_result.c.state,
-            sqla.func.count(),
-        ).group_by(final_task_instance_query_result.c.dag_id, final_task_instance_query_result.c.state)
-
-        data = {}
-        for dag_id, state, count in qry:
-            if dag_id not in data:
-                data[dag_id] = {}
-            data[dag_id][state] = count
+        qry = (
+            session.query(
+                final_task_instance_query_result.c.dag_id,
+                final_task_instance_query_result.c.state,
+                final_task_instance_query_result.c.is_dag_running,
+                sqla.func.count(),
+            )
+            .group_by(
+                final_task_instance_query_result.c.dag_id,
+                final_task_instance_query_result.c.state,
+                final_task_instance_query_result.c.is_dag_running,
+            )
+            .order_by(
+                final_task_instance_query_result.c.dag_id,
+                final_task_instance_query_result.c.is_dag_running.desc(),
+            )
+        )
 
+        data = get_task_stats_from_query(qry)
         payload = {}
         for dag_id in filter_dag_ids:
             payload[dag_id] = []
diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py
index b98c1bc..672d4a1 100644
--- a/tests/www/views/test_views.py
+++ b/tests/www/views/test_views.py
@@ -24,7 +24,13 @@ import pytest
 from airflow.configuration import initialize_config
 from airflow.plugins_manager import AirflowPlugin, EntryPointSource
 from airflow.www import views
-from airflow.www.views import get_key_paths, get_safe_url, get_value_from_path, truncate_task_duration
+from airflow.www.views import (
+    get_key_paths,
+    get_safe_url,
+    get_task_stats_from_query,
+    get_value_from_path,
+    truncate_task_duration,
+)
 from tests.test_utils.config import conf_vars
 from tests.test_utils.mock_plugins import mock_plugin_manager
 from tests.test_utils.www import check_content_in_response, check_content_not_in_response
@@ -333,3 +339,30 @@ def test_dag_edit_privileged_requires_view_has_action_decorators(cls: type):
     action_funcs = action_funcs - {"action_post"}
     for action_function in action_funcs:
         assert_decorator_used(cls, action_function, views.action_has_dag_edit_access)
+
+
+def test_get_task_stats_from_query():
+    query_data = [
+        ['dag1', 'queued', True, 1],
+        ['dag1', 'running', True, 2],
+        ['dag1', 'success', False, 3],
+        ['dag2', 'running', True, 4],
+        ['dag2', 'success', True, 5],
+        ['dag3', 'success', False, 6],
+    ]
+    expected_data = {
+        'dag1': {
+            'queued': 1,
+            'running': 2,
+        },
+        'dag2': {
+            'running': 4,
+            'success': 5,
+        },
+        'dag3': {
+            'success': 6,
+        },
+    }
+
+    data = get_task_stats_from_query(query_data)
+    assert data == expected_data

[airflow] 05/05: update tutorial_etl_dag notes (#21503)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a520845c183a5091fd6750e5e26521c4e18981ba
Author: eladkal <45...@users.noreply.github.com>
AuthorDate: Fri Feb 11 10:17:18 2022 +0200

    update tutorial_etl_dag notes (#21503)
    
    * update tutorial_etl_dag notes
    
    (cherry picked from commit a42607a4b75586a396d6a56145ed048d127dd344)
---
 airflow/example_dags/tutorial_etl_dag.py | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/airflow/example_dags/tutorial_etl_dag.py b/airflow/example_dags/tutorial_etl_dag.py
index 8dd0ea4..dd18449 100644
--- a/airflow/example_dags/tutorial_etl_dag.py
+++ b/airflow/example_dags/tutorial_etl_dag.py
@@ -19,9 +19,7 @@
 
 """
 ### ETL DAG Tutorial Documentation
-This ETL DAG is compatible with Airflow 1.10.x (specifically tested with 1.10.12) and is referenced
-as part of the documentation that goes along with the Airflow Functional DAG tutorial located
-[here](https://airflow.apache.org/tutorial_decorated_flows.html)
+This ETL DAG is demonstrating an Extract -> Transform -> Load pipeline
 """
 # [START tutorial]
 # [START import_module]