You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by hu...@apache.org on 2021/09/07 13:31:21 UTC

[incubator-ponymail-foal] branch humbedooh/mbox-stream updated: Convert to a streaming response

This is an automated email from the ASF dual-hosted git repository.

humbedooh pushed a commit to branch humbedooh/mbox-stream
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git


The following commit(s) were added to refs/heads/humbedooh/mbox-stream by this push:
     new a7f04a5  Convert to a streaming response
a7f04a5 is described below

commit a7f04a5a41f20d91d89d253fbb73997e84f9889b
Author: Daniel Gruno <hu...@apache.org>
AuthorDate: Tue Sep 7 08:31:15 2021 -0500

    Convert to a streaming response
    
    Instead of using several (if perhaps not hundreds) of megabytes to store everything in memory before spitting out as a giant file,
    it would be better to stream the mbox entries as they are fetched, and break early if anything happens to the stream.
---
 server/endpoints/mbox.py | 78 +++++++++++++++++++++++++++++++-----------------
 1 file changed, 51 insertions(+), 27 deletions(-)

diff --git a/server/endpoints/mbox.py b/server/endpoints/mbox.py
index bcbd871..da05f4a 100644
--- a/server/endpoints/mbox.py
+++ b/server/endpoints/mbox.py
@@ -16,6 +16,7 @@
 # limitations under the License.
 
 """Endpoint for returning emails in mbox format as a single archive"""
+import asyncio
 import plugins.server
 import plugins.session
 import plugins.messages
@@ -23,48 +24,71 @@ import plugins.defuzzer
 import re
 import typing
 import aiohttp.web
+import asyncio.exceptions
+
+
+async def convert_source(session, email):
+    source = await plugins.messages.get_source(session, permalink=email.get("dbid", email["mid"]))
+    if source:
+        source_as_text = source["_source"]["source"]
+        # Convert to mboxrd format
+        mboxrd_source = ""
+        line_no = 0
+        for line in source_as_text.split("\n"):
+            line_no += 1
+            if line_no > 1 and re.match(r"^>*From\s+", line):
+                line = ">" + line
+            mboxrd_source += line + "\n"
+        return mboxrd_source
+    return ""
 
 
 async def process(
-    server: plugins.server.BaseServer, session: plugins.session.SessionObject, indata: dict,
-) -> typing.Union[dict, aiohttp.web.Response]:
+    server: plugins.server.BaseServer,
+    request: aiohttp.web.BaseRequest,
+    session: plugins.session.SessionObject,
+    indata: dict,
+) -> typing.Union[dict, aiohttp.web.Response, aiohttp.web.StreamResponse]:
 
     lid = indata.get("list", "_")
     domain = indata.get("domain", "_")
-    
+
     try:
         query_defuzzed = plugins.defuzzer.defuzz(indata, list_override="@" in lid and lid or None)
     except AssertionError as e:  # If defuzzer encounters syntax errors, it will throw an AssertionError
-        return aiohttp.web.Response(headers={"content-type": "text/plain",}, status=500, text=str(e))
-    results = await plugins.messages.query(session, query_defuzzed, query_limit=server.config.database.max_hits,)
-
-    sources = []
-    for email in results:
-        source = await plugins.messages.get_source(session, permalink=email.get("dbid", email["mid"]))
-        if source:
-            stext = source["_source"]["source"]
-            # Convert to mboxrd format
-            mboxrd_source = ""
-            line_no = 0
-            for line in stext.split("\n"):
-                line_no += 1
-                if line_no > 1 and re.match(r"^>*From\s+", line):
-                    line = ">" + line
-                mboxrd_source += line + "\n"
-            sources.append(mboxrd_source)
+        return aiohttp.web.Response(
+            headers={
+                "content-type": "text/plain",
+            },
+            status=500,
+            text=str(e),
+        )
+    results = await plugins.messages.query(
+        session,
+        query_defuzzed,
+        query_limit=server.config.database.max_hits,
+    )
 
     # Figure out a sane filename
     xlist = re.sub(r"[^-_.a-z0-9]+", "_", lid)
     xdomain = re.sub(r"[^-_.a-z0-9]+", "_", domain)
     dlfile = f"{xlist}-{xdomain}.mbox"
+    headers = {"Content-Type": "application/mbox", "Content-Disposition": f"attachment; filename={dlfile}"}
 
-    # Return mbox archive with filename
-    return aiohttp.web.Response(
-        headers={"Content-Type": "application/mbox", "Content-Disposition": f"attachment; filename={dlfile}",},
-        status=200,
-        text="\n\n".join(sources),
-    )
+    # Return mbox archive with filename as a stream
+    response = aiohttp.web.StreamResponse(status=200, headers=headers)
+    response.enable_chunked_encoding()
+    await response.prepare(request)
+    for email in results:
+        mboxrd_source = await convert_source(session, email)
+        try:
+            async with server.streamlock:
+                await asyncio.wait_for(response.write(mboxrd_source.encode("utf-8")), timeout=5)
+        except (TimeoutError, RuntimeError, asyncio.exceptions.CancelledError):
+            break  # Writing stream failed, break it off.
+    return response
 
 
 def register(server: plugins.server.BaseServer):
-    return plugins.server.Endpoint(process)
+    # Note that this is a StreamingEndpoint!
+    return plugins.server.StreamingEndpoint(process)