You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/07/06 21:02:03 UTC

[airflow] branch v2-1-test updated (e9500cf -> dd92d9a)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a change to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard e9500cf  Add changelog updates for 2.1.2 (#16838)
 discard 51b6c33  Set version to 2.1.2
 discard cf3a22b  Only allow webserver to request from the worker log server (#16754)
     new f20d5c7  Only allow webserver to request from the worker log server (#16754)
     new 051a78a  Set version to 2.1.2
     new dd92d9a  Add changelog updates for 2.1.2 (#16838)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (e9500cf)
            \
             N -- N -- N   refs/heads/v2-1-test (dd92d9a)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/utils/serve_logs.py    | 5 ++++-
 tests/utils/test_serve_logs.py | 2 ++
 2 files changed, 6 insertions(+), 1 deletion(-)

[airflow] 02/03: Set version to 2.1.2

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 051a78a9f1145baae570859162fa9cba9cde6fbd
Author: James Timmins <ja...@astronomer.io>
AuthorDate: Tue Jul 6 11:46:16 2021 -0700

    Set version to 2.1.2
---
 setup.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/setup.py b/setup.py
index fe52933..922248e 100644
--- a/setup.py
+++ b/setup.py
@@ -41,7 +41,7 @@ PY39 = sys.version_info >= (3, 9)
 
 logger = logging.getLogger(__name__)
 
-version = '2.1.1'
+version = '2.1.2'
 
 my_dir = dirname(__file__)
 

[airflow] 01/03: Only allow webserver to request from the worker log server (#16754)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f20d5c74933555c956350aa659308fb3e9c4b805
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Thu Jul 1 16:20:36 2021 +0100

    Only allow webserver to request from the worker log server (#16754)
    
    Logs _shouldn't_ contain any sensitive info, but they often do by
    mistake. As an extra level of protection we shouldn't allow anything
    other than the webserver to access the logs.
    
    (We can't change the bind IP form 0.0.0.0 as for it to be useful it
    needs to be accessed from different hosts -- i.e. the webserver will
    almost always be on a different node)
    
    (cherry picked from commit 2285ee9f71a004d5c013119271873182fb431d8f)
---
 airflow/utils/log/file_task_handler.py |  13 +++-
 airflow/utils/serve_logs.py            |  68 ++++++++++++++++----
 tests/utils/test_serve_logs.py         | 110 ++++++++++++++++++++++++++-------
 3 files changed, 157 insertions(+), 34 deletions(-)

diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index b859020..1b52a4f 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -22,6 +22,7 @@ from pathlib import Path
 from typing import TYPE_CHECKING, Optional
 
 import httpx
+from itsdangerous import TimedJSONWebSignatureSerializer
 
 from airflow.configuration import AirflowConfigException, conf
 from airflow.utils.helpers import parse_template_string
@@ -172,7 +173,17 @@ class FileTaskHandler(logging.Handler):
                 except (AirflowConfigException, ValueError):
                     pass
 
-                response = httpx.get(url, timeout=timeout)
+                signer = TimedJSONWebSignatureSerializer(
+                    secret_key=conf.get('webserver', 'secret_key'),
+                    algorithm_name='HS512',
+                    expires_in=conf.getint('webserver', 'log_request_clock_grace', fallback=30),
+                    # This isn't really a "salt", more of a signing context
+                    salt='task-instance-logs',
+                )
+
+                response = httpx.get(
+                    url, timeout=timeout, headers={'Authorization': signer.dumps(log_relative_path)}
+                )
                 response.encoding = "utf-8"
 
                 # Check if the resource was properly fetched
diff --git a/airflow/utils/serve_logs.py b/airflow/utils/serve_logs.py
index 0fefa42..fd5eadb 100644
--- a/airflow/utils/serve_logs.py
+++ b/airflow/utils/serve_logs.py
@@ -16,24 +16,70 @@
 # under the License.
 
 """Serve logs process"""
+
+# pylint: skip-file
+
 import os
+import time
 
-import flask
+from flask import Flask, abort, request, send_from_directory
+from itsdangerous import TimedJSONWebSignatureSerializer
+from setproctitle import setproctitle
 
 from airflow.configuration import conf
 
 
-def serve_logs():
-    """Serves logs generated by Worker"""
-    print("Starting flask")
-    flask_app = flask.Flask(__name__)
+def flask_app():  # noqa: D103
+    flask_app = Flask(__name__)
+    max_request_age = conf.getint('webserver', 'log_request_clock_grace', fallback=30)
+    log_directory = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER'))
+
+    signer = TimedJSONWebSignatureSerializer(
+        secret_key=conf.get('webserver', 'secret_key'),
+        algorithm_name='HS512',
+        expires_in=max_request_age,
+        # This isn't really a "salt", more of a signing context
+        salt='task-instance-logs',
+    )
+
+    # Prevent direct access to the logs port
+    @flask_app.before_request
+    def validate_pre_signed_url():
+        try:
+            auth = request.headers['Authorization']
+
+            # We don't actually care about the payload, just that the signature
+            # was valid and the `exp` claim is correct
+            filename, headers = signer.loads(auth, return_header=True)
+
+            issued_at = int(headers['iat'])
+            expires_at = int(headers['exp'])
+        except Exception:
+            abort(403)
+
+        if filename != request.view_args['filename']:
+            abort(403)
+
+        # Validate the `iat` and `exp` are within `max_request_age` of now.
+        now = int(time.time())
+        if abs(now - issued_at) > max_request_age:
+            abort(403)
+        if abs(now - expires_at) > max_request_age:
+            abort(403)
+        if issued_at > expires_at or expires_at - issued_at > max_request_age:
+            abort(403)
 
     @flask_app.route('/log/<path:filename>')
-    def serve_logs_view(filename):  # pylint: disable=unused-variable
-        log_directory = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER'))
-        return flask.send_from_directory(
-            log_directory, filename, mimetype="application/json", as_attachment=False
-        )
+    def serve_logs_view(filename):
+        return send_from_directory(log_directory, filename, mimetype="application/json", as_attachment=False)
+
+    return flask_app
+
+
+def serve_logs():
+    """Serves logs generated by Worker"""
+    setproctitle("airflow serve-logs")
+    app = flask_app()
 
     worker_log_server_port = conf.getint('celery', 'WORKER_LOG_SERVER_PORT')
-    flask_app.run(host='0.0.0.0', port=worker_log_server_port)
+    app.run(host='0.0.0.0', port=worker_log_server_port)
diff --git a/tests/utils/test_serve_logs.py b/tests/utils/test_serve_logs.py
index edb4d09..080e9c9 100644
--- a/tests/utils/test_serve_logs.py
+++ b/tests/utils/test_serve_logs.py
@@ -14,33 +14,99 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import os
-import unittest
-from multiprocessing import Process
-from os.path import basename
-from tempfile import NamedTemporaryFile
-from time import sleep
+from typing import TYPE_CHECKING
 
 import pytest
-import requests
+from itsdangerous import TimedJSONWebSignatureSerializer
 
 from airflow.configuration import conf
-from airflow.utils.serve_logs import serve_logs
+from airflow.utils.serve_logs import flask_app
+from tests.test_utils.config import conf_vars
+
+# pylint: skip-file
+
+if TYPE_CHECKING:
+    from flask.testing import FlaskClient
 
 LOG_DATA = "Airflow log data" * 20
 
 
-@pytest.mark.quarantined
-class TestServeLogs(unittest.TestCase):
-    def test_should_serve_file(self):
-        log_dir = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER'))
-        log_port = conf.get('celery', 'WORKER_LOG_SERVER_PORT')
-        with NamedTemporaryFile(dir=log_dir) as f:
-            f.write(LOG_DATA.encode())
-            f.flush()
-            sub_proc = Process(target=serve_logs)
-            sub_proc.start()
-            sleep(1)
-            log_url = f"http://localhost:{log_port}/log/{basename(f.name)}"
-            assert LOG_DATA == requests.get(log_url).content.decode()
-            sub_proc.terminate()
+@pytest.fixture
+def client(tmpdir):
+    with conf_vars({('logging', 'base_log_folder'): str(tmpdir)}):
+        app = flask_app()
+
+        yield app.test_client()
+
+
+@pytest.fixture
+def sample_log(tmpdir):
+    f = tmpdir / 'sample.log'
+    f.write(LOG_DATA.encode())
+
+    return f
+
+
+@pytest.fixture
+def signer():
+    return TimedJSONWebSignatureSerializer(
+        secret_key=conf.get('webserver', 'secret_key'),
+        algorithm_name='HS512',
+        expires_in=30,
+        # This isn't really a "salt", more of a signing context
+        salt='task-instance-logs',
+    )
+
+
+@pytest.mark.usefixtures('sample_log')
+class TestServeLogs:
+    def test_forbidden_no_auth(self, client: "FlaskClient"):
+        assert 403 == client.get('/log/sample.log').status_code
+
+    def test_should_serve_file(self, client: "FlaskClient", signer):
+        assert (
+            LOG_DATA
+            == client.get(
+                '/log/sample.log',
+                headers={
+                    'Authorization': signer.dumps('sample.log'),
+                },
+            ).data.decode()
+        )
+
+    def test_forbidden_too_long_validity(self, client: "FlaskClient", signer):
+        signer.expires_in = 3600
+        assert (
+            403
+            == client.get(
+                '/log/sample.log',
+                headers={
+                    'Authorization': signer.dumps('sample.log'),
+                },
+            ).status_code
+        )
+
+    def test_forbidden_expired(self, client: "FlaskClient", signer):
+        # Fake the time we think we are
+        signer.now = lambda: 0
+        assert (
+            403
+            == client.get(
+                '/log/sample.log',
+                headers={
+                    'Authorization': signer.dumps('sample.log'),
+                },
+            ).status_code
+        )
+
+    def test_wrong_context(self, client: "FlaskClient", signer):
+        signer.salt = None
+        assert (
+            403
+            == client.get(
+                '/log/sample.log',
+                headers={
+                    'Authorization': signer.dumps('sample.log'),
+                },
+            ).status_code
+        )

[airflow] 03/03: Add changelog updates for 2.1.2 (#16838)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit dd92d9a300778d91f0722da38db0c3feb12edcb0
Author: James Timmins <ja...@astronomer.io>
AuthorDate: Tue Jul 6 11:50:48 2021 -0700

    Add changelog updates for 2.1.2 (#16838)
    
    (cherry picked from commit b5ef3c841f735ea113e5d3639a620c2b63092e43)
---
 CHANGELOG.txt | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git a/CHANGELOG.txt b/CHANGELOG.txt
index 8fc6ebd..6db86ed 100644
--- a/CHANGELOG.txt
+++ b/CHANGELOG.txt
@@ -1,4 +1,14 @@
-Airflow 2.1.1, 2021-06-29
+Airflow 2.1.2, 2021-07-09
+-------------------------
+
+Bug Fixes
+"""""""""
+
+- Only allow webserver to request from the worker log server (#16754)
+- Fix "Invalid JSON configuration, must be a dict" bug (#16648)
+
+
+Airflow 2.1.1, 2021-07-02
 -------------------------
 
 Bug Fixes