You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by hu...@apache.org on 2021/09/07 13:31:21 UTC
[incubator-ponymail-foal] branch humbedooh/mbox-stream updated:
Convert to a streaming response
This is an automated email from the ASF dual-hosted git repository.
humbedooh pushed a commit to branch humbedooh/mbox-stream
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git
The following commit(s) were added to refs/heads/humbedooh/mbox-stream by this push:
new a7f04a5 Convert to a streaming response
a7f04a5 is described below
commit a7f04a5a41f20d91d89d253fbb73997e84f9889b
Author: Daniel Gruno <hu...@apache.org>
AuthorDate: Tue Sep 7 08:31:15 2021 -0500
Convert to a streaming response
Instead of using several (if perhaps not hundreds) of megabytes to store everything in memory before spitting out as a giant file,
it would be better to stream the mbox entries as they are fetched, and break early if anything happens to the stream.
---
server/endpoints/mbox.py | 78 +++++++++++++++++++++++++++++++-----------------
1 file changed, 51 insertions(+), 27 deletions(-)
diff --git a/server/endpoints/mbox.py b/server/endpoints/mbox.py
index bcbd871..da05f4a 100644
--- a/server/endpoints/mbox.py
+++ b/server/endpoints/mbox.py
@@ -16,6 +16,7 @@
# limitations under the License.
"""Endpoint for returning emails in mbox format as a single archive"""
+import asyncio
import plugins.server
import plugins.session
import plugins.messages
@@ -23,48 +24,71 @@ import plugins.defuzzer
import re
import typing
import aiohttp.web
+import asyncio.exceptions
+
+
+async def convert_source(session, email):
+ source = await plugins.messages.get_source(session, permalink=email.get("dbid", email["mid"]))
+ if source:
+ source_as_text = source["_source"]["source"]
+ # Convert to mboxrd format
+ mboxrd_source = ""
+ line_no = 0
+ for line in source_as_text.split("\n"):
+ line_no += 1
+ if line_no > 1 and re.match(r"^>*From\s+", line):
+ line = ">" + line
+ mboxrd_source += line + "\n"
+ return mboxrd_source
+ return ""
async def process(
- server: plugins.server.BaseServer, session: plugins.session.SessionObject, indata: dict,
-) -> typing.Union[dict, aiohttp.web.Response]:
+ server: plugins.server.BaseServer,
+ request: aiohttp.web.BaseRequest,
+ session: plugins.session.SessionObject,
+ indata: dict,
+) -> typing.Union[dict, aiohttp.web.Response, aiohttp.web.StreamResponse]:
lid = indata.get("list", "_")
domain = indata.get("domain", "_")
-
+
try:
query_defuzzed = plugins.defuzzer.defuzz(indata, list_override="@" in lid and lid or None)
except AssertionError as e: # If defuzzer encounters syntax errors, it will throw an AssertionError
- return aiohttp.web.Response(headers={"content-type": "text/plain",}, status=500, text=str(e))
- results = await plugins.messages.query(session, query_defuzzed, query_limit=server.config.database.max_hits,)
-
- sources = []
- for email in results:
- source = await plugins.messages.get_source(session, permalink=email.get("dbid", email["mid"]))
- if source:
- stext = source["_source"]["source"]
- # Convert to mboxrd format
- mboxrd_source = ""
- line_no = 0
- for line in stext.split("\n"):
- line_no += 1
- if line_no > 1 and re.match(r"^>*From\s+", line):
- line = ">" + line
- mboxrd_source += line + "\n"
- sources.append(mboxrd_source)
+ return aiohttp.web.Response(
+ headers={
+ "content-type": "text/plain",
+ },
+ status=500,
+ text=str(e),
+ )
+ results = await plugins.messages.query(
+ session,
+ query_defuzzed,
+ query_limit=server.config.database.max_hits,
+ )
# Figure out a sane filename
xlist = re.sub(r"[^-_.a-z0-9]+", "_", lid)
xdomain = re.sub(r"[^-_.a-z0-9]+", "_", domain)
dlfile = f"{xlist}-{xdomain}.mbox"
+ headers = {"Content-Type": "application/mbox", "Content-Disposition": f"attachment; filename={dlfile}"}
- # Return mbox archive with filename
- return aiohttp.web.Response(
- headers={"Content-Type": "application/mbox", "Content-Disposition": f"attachment; filename={dlfile}",},
- status=200,
- text="\n\n".join(sources),
- )
+ # Return mbox archive with filename as a stream
+ response = aiohttp.web.StreamResponse(status=200, headers=headers)
+ response.enable_chunked_encoding()
+ await response.prepare(request)
+ for email in results:
+ mboxrd_source = await convert_source(session, email)
+ try:
+ async with server.streamlock:
+ await asyncio.wait_for(response.write(mboxrd_source.encode("utf-8")), timeout=5)
+ except (TimeoutError, RuntimeError, asyncio.exceptions.CancelledError):
+ break # Writing stream failed, break it off.
+ return response
def register(server: plugins.server.BaseServer):
- return plugins.server.Endpoint(process)
+ # Note that this is a StreamingEndpoint!
+ return plugins.server.StreamingEndpoint(process)