You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by se...@apache.org on 2022/01/29 11:17:52 UTC

[incubator-ponymail-foal] 01/03: Centralise access checking

This is an automated email from the ASF dual-hosted git repository.

sebb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git

commit 3226344af513a4b48271dccf097d6e3a67cf6b67
Author: Sebb <se...@apache.org>
AuthorDate: Sat Jan 29 11:15:21 2022 +0000

    Centralise access checking
    
    This relates to #212
---
 server/endpoints/source.py |   2 +-
 server/plugins/messages.py |  11 +-
 test/itest_integration2.py | 268 +++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 276 insertions(+), 5 deletions(-)

diff --git a/server/endpoints/source.py b/server/endpoints/source.py
index f43c7e0..927107d 100644
--- a/server/endpoints/source.py
+++ b/server/endpoints/source.py
@@ -40,7 +40,7 @@ async def process(
     if email and isinstance(email, dict) and not email.get("deleted"):
         if plugins.aaa.can_access_email(session, email):
             source = await plugins.messages.get_source(session, permalink=email["dbid"])
-            if source and not source["_source"].get("deleted"):
+            if source:
                 return aiohttp.web.Response(
                     headers={"Content-Type": "text/plain"}, status=200, text=source["_source"]["source"],
                 )
diff --git a/server/plugins/messages.py b/server/plugins/messages.py
index 4f9be6a..8d46284 100644
--- a/server/plugins/messages.py
+++ b/server/plugins/messages.py
@@ -310,17 +310,20 @@ async def get_email_irt(
 
 async def get_source(session: plugins.session.SessionObject, permalink: str = None, raw=False):
     """
-        Get the source document for an email, or None if it does not find exactly one match.
+        Get the source document for an email, or None
 
-        The caller must check if access is allowed.
+        The caller must check if access is allowed by the parent mbox entry
     """
     assert session.database, DATABASE_NOT_CONNECTED
-    doctype = session.database.dbs.db_source
     try:
-        doc = await session.database.get(index=doctype, id=permalink)
+        doc = await session.database.get(index=session.database.dbs.db_source, id=permalink)
     except plugins.database.DBError:
         doc = None
     if doc:
+        # If hidden, only return source if session is admin
+        is_admin = session.credentials and session.credentials.admin
+        if doc["_source"].get("deleted", False) and not is_admin:
+            return None
         if raw:
             return doc
         # Check for base64-encoded source
diff --git a/test/itest_integration2.py b/test/itest_integration2.py
new file mode 100644
index 0000000..14bb490
--- /dev/null
+++ b/test/itest_integration2.py
@@ -0,0 +1,268 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+import random
+import requests
+
+# Run as: python3 -m pytest [-s] test/itest_integration.py
+
+API_BASE='http://localhost:8080/api'
+TEST_DOMAIN = 'ponymail.apache.org'
+TEST_LIST = 'users'
+
+# Emulate how test auth is used by GUI
+def get_cookies(user='user'):
+    state=random.randint(
+        1000000000000000000,
+        2000000000000000000) # roughly equivalent to code in oauth.js
+    testauth='testauth'
+    res = requests.get(f"{API_BASE}/{testauth}?state={state}&redirect_uri=x&state={state}&key=ignored",allow_redirects=False)
+    code = res.headers['Location'][1:]
+    res = requests.get(f"{API_BASE}/oauth.lua?key=ignored{code}&oauth_token={API_BASE}/{testauth}&state={state}&user={user}")
+    cookies = res.cookies
+    jzon = requests.get(f"{API_BASE}/preferences", cookies=cookies).json()
+    assert 'credentials' in jzon['login']
+    return cookies
+
+def check_access(email, cookies):
+        # check email accessibility
+        mid = email['mid']
+        private = email['private']
+        res = requests.get(
+            f"{API_BASE}/email.lua",
+            params={"id": mid},
+            cookies=cookies
+        )
+        assert res.status_code == 200
+        jzon = res.json()
+        assert mid == jzon['mid']
+        assert mid in jzon['permalinks']
+        # check email access by message-id
+        msgid = jzon['message-id']
+        listid = jzon['list_raw']
+        res = requests.get(
+            f"{API_BASE}/email.lua",
+            params={"id": msgid, "listid": listid},
+            cookies=cookies
+        )
+        assert res.status_code == 200
+        if private:
+            # should not be visible without cookies
+            res = requests.get(
+                f"{API_BASE}/email.lua",
+                params={"id": mid}
+            )
+            assert res.status_code == 404
+            res = requests.get(
+                f"{API_BASE}/email.lua",
+                params={"id": msgid, "listid": listid}
+            )
+            assert res.status_code == 404
+        # check source accessibility
+        res = requests.get(
+            f"{API_BASE}/source.lua",
+            params={"id": mid},
+            cookies=cookies
+        )
+        assert res.status_code == 200, mid
+        res = requests.get(
+            f"{API_BASE}/source.lua",
+            params={"id": msgid, "listid": listid},
+            cookies=cookies
+        )
+        assert res.status_code == 200
+        if private:
+            # should not be visible without cookies
+            res = requests.get(
+                f"{API_BASE}/source.lua",
+                params={"id": mid}
+            )
+            assert res.status_code == 404
+            res = requests.get(
+                f"{API_BASE}/source.lua",
+                params={"id": msgid, "listid": listid}
+            )
+            assert res.status_code == 404
+
+def test_lists():
+    jzon = requests.get(f"{API_BASE}/preferences").json()
+    # print(jzon)
+    lists = jzon['lists']
+    assert TEST_DOMAIN in lists
+    assert TEST_LIST in lists[TEST_DOMAIN]
+    assert len(lists) == 1 # only expecting one domain
+
+def test_public_stats():
+    jzon = requests.get(
+        f"{API_BASE}/stats.lua",
+        params={"list": TEST_LIST, "domain": TEST_DOMAIN, "emailsOnly": True, "d": 'gte=0d'}
+    ).json()
+    assert jzon['firstYear'] == 2022
+    assert jzon['firstMonth'] == 1
+    assert jzon['lastYear'] == 2022
+    assert jzon['lastMonth'] == 1
+    assert jzon['hits'] == 6
+    for email in jzon['emails']:
+        assert email['list_raw'] == '<users.ponymail.apache.org>'
+        assert email['list'] == email['list_raw']
+        assert email['id'] == email['mid']
+        assert email['private'] == False
+        check_access(email, None)
+    # Check we cannot see the private emails
+    jzon = requests.get(
+        f"{API_BASE}/stats.lua",
+        params={"list": TEST_LIST, "domain": TEST_DOMAIN, "emailsOnly": True, "d": '2019-09'}
+        ).json()
+    assert jzon['hits'] == 0
+
+def test_private_stats():
+    cookies = get_cookies('user')
+    # only fetch the private mail stats
+    jzon = requests.get(
+        f"{API_BASE}/stats.lua",
+        params={"list": TEST_LIST, "domain": TEST_DOMAIN, "emailsOnly": True, "d": '2019-09'},
+        cookies=cookies
+    ).json()
+    # The earlier mails are private
+    assert jzon['firstYear'] == 2019
+    assert jzon['firstMonth'] == 9
+    assert jzon['lastYear'] == 2022
+    assert jzon['lastMonth'] == 1
+    assert jzon['hits'] == 4
+    for email in jzon['emails']:
+        assert email['list_raw'] == '<users.ponymail.apache.org>'
+        assert email['list'] == email['list_raw']
+        assert email['id'] == email['mid']
+        assert email['private']
+        check_access(email, cookies)
+
+def mgmt_get_text(params, cookies, expected=200):
+    res = requests.post(f"{API_BASE}/mgmt.lua", params=params, cookies=cookies)
+    assert res.status_code == expected, res.text
+    return res.text
+
+def mgmt_get_json(params, cookies, expected=200):
+    res = requests.post(f"{API_BASE}/mgmt.lua", params=params, cookies=cookies)
+    assert res.status_code == expected, res.text
+    return res.json()
+
+def test_mgmt_validation():
+    admin_cookies = get_cookies('admin')
+    user_cookies = get_cookies('user')
+    mgmt_get_text({"action": 'log'}, user_cookies, 403)
+    mgmt_get_text({"action": 'any'}, admin_cookies, 404)
+
+    text = mgmt_get_text({"action": 'delete'}, admin_cookies)
+    assert text == "Removed 0 emails from archives."
+
+    text = mgmt_get_text({"action": 'hide'}, admin_cookies)
+    assert text == "Hid 0 emails from archives."
+
+    text = mgmt_get_text({"action": 'unhide'}, admin_cookies)
+    assert text == "Unhid 0 emails from archives."
+
+    text = mgmt_get_text({"action": 'delatt'}, admin_cookies)
+    assert text == "Removed 0 attachments from archives."
+
+    text = mgmt_get_text({"action": 'edit'}, admin_cookies, 500)
+    assert "ValueError: Document ID is missing or invalid" in text
+
+    text = mgmt_get_text({"action": 'edit', "document": '1234'}, admin_cookies, 500)
+    assert "ValueError: Author field" in text
+
+    text = mgmt_get_text({"action": 'edit', "document": '1234', "from": 'sender'}, admin_cookies, 500)
+    assert "ValueError: Subject field" in text
+
+    text = mgmt_get_text({"action": 'edit', "document": '1234', "from": 'sender', "subject": 'Test Email'}, admin_cookies, 500)
+    assert "ValueError: List ID field" in text
+
+    text = mgmt_get_text(
+        {"action": 'edit', "document": '1234', "from": 'sender', "subject": 'Test Email', "list": 'abc'},
+        admin_cookies, 500)
+    assert "ValueError: Email body" in text
+
+    text = mgmt_get_text(
+        {"action": 'edit', "document": '1234', "from": 'sender', "subject": 'Test Email', "list": 'abc', "body": 'body'},
+        admin_cookies, 404)
+    assert "Email not found!" in text
+
+def test_mgmt_log():
+    admin_cookies = get_cookies('admin')
+    jzon = mgmt_get_json({"action": 'log'}, admin_cookies)
+    # for entry in jzon['entries']:
+        # print(entry)
+    # assert jzon == {"entries": []}
+
+def test_mgmt_hiding():
+    admin_cookies = get_cookies('admin')
+
+    # reset in case of earlier failure
+    text = mgmt_get_text({"action": 'unhide', "document": "c396ps3p5pb05srb4269dzcg9j7sof42"}, admin_cookies)
+    assert text == "Unhid 1 emails from archives."
+
+    jzon = requests.get(
+        f"{API_BASE}/stats.lua",
+        params={"list": TEST_LIST, "domain": TEST_DOMAIN, "emailsOnly": True, "d": 'gte=0d'}
+    ).json()
+
+    assert jzon['hits'] == 6
+
+    check_access({"mid": 'c396ps3p5pb05srb4269dzcg9j7sof42', "private": False}, None)
+
+    text = mgmt_get_text({"action": 'hide', "document": "c396ps3p5pb05srb4269dzcg9j7sof42"}, admin_cookies)
+    assert text == "Hid 1 emails from archives."
+
+    jzon = requests.get(
+        f"{API_BASE}/stats.lua",
+        params={"list": TEST_LIST, "domain": TEST_DOMAIN, "emailsOnly": True, "d": 'gte=0d'}
+    ).json()
+    assert jzon['hits'] == 5
+
+
+
+    text = mgmt_get_text({"action": 'unhide', "document": "c396ps3p5pb05srb4269dzcg9j7sof42"}, admin_cookies)
+    assert text == "Unhid 1 emails from archives."
+
+    jzon = requests.get(
+        f"{API_BASE}/stats.lua",
+        params={"list": TEST_LIST, "domain": TEST_DOMAIN, "emailsOnly": True, "d": 'gte=0d'}
+    ).json()
+    assert jzon['hits'] == 6
+
+    check_access({"mid": 'c396ps3p5pb05srb4269dzcg9j7sof42', "private": False}, None)
+
+def xtest_mgmt_get():
+    """This test causes the source for an entry to be hidden"""
+    admin_cookies = get_cookies('admin')
+
+    jzon = requests.get(
+        f"{API_BASE}/stats.lua",
+        params={"list": TEST_LIST, "domain": TEST_DOMAIN, "emailsOnly": True, "d": 'gte=0d'}
+    ).json()
+
+    assert jzon['hits'] == 6
+
+    text = mgmt_get_text(
+        {
+            "action": 'edit', "document": "c396ps3p5pb05srb4269dzcg9j7sof42",
+            "from": '', "subject": '', "list": 'users.ponymail.apache.org', "body": 'body', "private": False,
+        },
+        admin_cookies
+        )
+    assert text == "Email successfully saved"
+