You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by hu...@apache.org on 2020/09/06 17:34:50 UTC
[incubator-ponymail-foal] 05/15: add mbox utulity plugin
This is an automated email from the ASF dual-hosted git repository.
humbedooh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git
commit a639cb0687862e5d3ce209820ce1ed6161d052ff
Author: Daniel Gruno <hu...@apache.org>
AuthorDate: Sun Sep 6 19:16:48 2020 +0200
add mbox utulity plugin
---
server/plugins/mbox.py | 432 +++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 432 insertions(+)
diff --git a/server/plugins/mbox.py b/server/plugins/mbox.py
new file mode 100644
index 0000000..1da3f43
--- /dev/null
+++ b/server/plugins/mbox.py
@@ -0,0 +1,432 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This is the mbox library for Pony Mail.
+It handles fetching (the right) emails for
+pages that need it.
+"""
+
+
+import base64
+import binascii
+import datetime
+import email.utils
+import hashlib
+# Main imports
+import re
+import typing
+
+from elasticsearch.helpers import async_scan
+
+import plugins.aaa
+import plugins.session
+
+PYPONY_RE_PREFIX = re.compile(r"^([a-zA-Z]+:\s*)+")
+
+mbox_cache_privacy = {}
+
+
+def extract_name(addr):
+ """ Extract name and email from from: header """
+ m = re.match(r"^([^<]+)\s*<(.+)>$", addr)
+ if m:
+ return [m.group(1), m.group(2)]
+ else:
+ addr = addr.strip("<>")
+ return [addr, addr]
+
+
+def anonymize(doc):
+ """ Anonymizes an email, hiding author email addresses."""
+ # ES direct hit?
+ ptr: typing.Dict[str, str] = doc
+ if "_source" in doc:
+ ptr = doc["_source"]
+
+ if "from" in ptr:
+ frname, fremail = extract_name(ptr["from"])
+ ptr["md5"] = hashlib.md5(
+ bytes(fremail.lower(), encoding="ascii", errors="replace")
+ ).hexdigest()
+ ptr["from"] = re.sub(
+ r"<(\S{1,2})\S*@([-a-zA-Z0-9_.]+)>", "<\\1..@\\2>", ptr["from"]
+ )
+ if ptr["body"]:
+ ptr["body"] = re.sub(
+ r"<(\S{1,2})\S*@([-a-zA-Z0-9_.]+)>", "<\\1..@\\2>", ptr["body"]
+ )
+ return doc
+
+
+async def find_parent(session, doc: typing.Dict[str, str]):
+ """
+ Locates the first email in a thread by going back through all the
+ in-reply-to headers and finding their source.
+ """
+ step = 0
+ # max 50 steps up in the hierarchy
+ while step < 50:
+ step = step + 1
+ irt: str = doc["in-reply-to"] if "in-reply-to" in doc else None
+ if not irt:
+ break # Shouldn't happen because irt is always present currently
+ # Extract the reference, if any
+ m = re.search(r"(<[^>]+>)", irt)
+ if not m:
+ break
+ ref = m.group(1)
+ newdoc = await get_email(session, messageid=ref)
+ # Did we find something, and can the user access it?
+ if not newdoc or not plugins.aaa.can_access_email(session, newdoc):
+ break
+ else:
+ doc = newdoc
+ return doc
+
+
+async def fetch_children(session, pdoc, counter=0, pdocs=None, short=False):
+ """
+ Fetches all child messages of a parent email
+ """
+ if pdocs is None:
+ pdocs = {}
+ counter = counter + 1
+ if counter > 250:
+ return []
+ docs = await get_email(session, irt=pdoc["message-id"])
+
+ thread = []
+ emails = []
+ for doc in docs:
+ # Make sure email is accessible
+ if plugins.aaa.can_access_email(session, doc):
+ if doc["mid"] not in pdocs:
+ mykids, myemails, pdocs = await fetch_children(
+ session, doc, counter, pdocs, short=short
+ )
+ if short:
+ xdoc = {
+ "tid": doc["mid"],
+ "mid": doc["mid"],
+ "message-id": doc["message-id"],
+ "subject": doc["subject"],
+ "from": doc["from"],
+ "id": doc["mid"],
+ "epoch": doc["epoch"],
+ "children": mykids,
+ "irt": doc["in-reply-to"],
+ "list_raw": doc["list_raw"],
+ }
+ thread.append(xdoc)
+ pdocs[doc["mid"]] = xdoc
+ else:
+ thread.append(doc)
+ pdocs[doc["mid"]] = doc
+ for kid in mykids:
+ if kid["mid"] not in pdocs:
+ pdocs[kid["mid"]] = kid
+ emails.append(kid)
+ return thread, emails, pdocs
+
+
+async def get_email(
+ session: plugins.session.SessionObject,
+ permalink: str = None,
+ messageid=None,
+ irt=None,
+ source=False,
+):
+ doctype = session.database.dbs.mbox
+ if source:
+ doctype = session.database.dbs.source
+ # Older indexes may need a match instead of a strict terms agg in order to find
+ # emails in DBs that may have been incorrectly analyzed.
+ aggtype = "match"
+ if permalink:
+ try:
+ doc = await session.database.get(index=doctype, id=permalink)
+ if doc and plugins.aaa.can_access_email(session, doc):
+ if not session.credentials:
+ doc = anonymize(doc)
+ doc["_source"]["id"] = doc["_source"]["mid"]
+ return doc["_source"]
+ except session.database.DBError:
+ pass
+ elif messageid:
+ res = await session.database.search(
+ index=doctype,
+ size=1,
+ body={"query": {"bool": {"must": [{aggtype: {"message-id": messageid}}]}}},
+ )
+ if len(res["hits"]["hits"]) == 1:
+ doc = res["hits"]["hits"][0]["_source"]
+ doc["id"] = doc["mid"]
+ if plugins.aaa.can_access_email(session, doc):
+ if not session.credentials:
+ doc = anonymize(doc)
+ return doc
+ elif irt:
+ res = await session.database.search(
+ index=doctype,
+ size=250,
+ body={"query": {"bool": {"must": [{aggtype: {"in-reply-to": irt}}]}}},
+ )
+ docs = []
+ for doc in res["hits"]["hits"]:
+ if plugins.aaa.can_access_email(session, doc):
+ if not session.credentials:
+ doc = anonymize(doc)
+ doc["_source"]["id"] = doc["_source"]["mid"]
+ docs.append(doc["_source"])
+ return docs
+ return None
+
+
+async def get_source(session: plugins.session.SessionObject, permalink: str = None):
+ doctype = session.database.dbs.source
+ try:
+ doc = await session.database.get(index=doctype, id=permalink)
+ return doc
+ except session.database.DBError:
+ pass
+ res = await session.database.search(
+ index=doctype,
+ size=1,
+ body={"query": {"bool": {"must": [{"match": {"permalink": permalink}}]}}},
+ )
+ if len(res["hits"]["hits"]) == 1:
+ doc = res["hits"]["hits"][0]
+ doc["id"] = doc["_id"]
+ # Check for base64-encoded source
+ if ':' not in doc['_source']['source']:
+ try:
+ doc['_source']['source'] = base64.standard_b64decode(doc['_source']['source'])\
+ .decode('utf-8', 'replace')
+ except binascii.Error:
+ pass # If it wasn't base64 after all, just return as is
+ return doc
+ return None
+
+
+def get_list(session, listid, fr=None, to=None, limit=10000):
+ """
+ Loads emails from a specified list.
+ If fr and to are not specified, loads the last 30 days.
+ """
+ res = session.DB.ES.search(
+ index=session.DB.dbs.mbox,
+ size=limit,
+ body={
+ "query": {"bool": {"must": [{"term": {"list_raw": listid}}]}},
+ "sort": [{"epoch": {"order": "asc"}}],
+ },
+ )
+ docs = []
+ for hit in res["hits"]["hits"]:
+ doc = hit["_source"]
+ if plugins.aaa.can_access_email(session, doc):
+ if not session.user:
+ doc = anonymize(doc)
+ docs.append(doc)
+ return docs
+
+
+async def query(
+ session: plugins.session.SessionObject,
+ query_defuzzed,
+ query_limit=10000,
+ shorten=False,
+):
+ """
+ Advanced query and grab for stats.py
+ """
+ docs = []
+ hits = 0
+ async for hit in async_scan(
+ client=session.database.client,
+ query={
+ "query": {"bool": query_defuzzed},
+ "sort": [{"epoch": {"order": "asc"}}],
+ },
+ ):
+ doc = hit["_source"]
+ doc["id"] = doc["mid"]
+ if plugins.aaa.can_access_email(session, doc):
+ if not session.credentials:
+ doc = anonymize(doc)
+ if shorten:
+ doc["body"] = (doc["body"] or "")[:200]
+ docs.append(doc)
+ hits += 1
+ if hits > query_limit:
+ break
+ return docs
+
+
+async def wordcloud(session, query_defuzzed):
+ """
+ Wordclouds via significant terms query in ES
+ """
+ wc = {}
+ res = await session.database.search(
+ body={
+ "size": 0,
+ "query": {"bool": query_defuzzed},
+ "aggregations": {
+ "cloud": {"significant_terms": {"field": "subject", "size": 10}}
+ },
+ }
+ )
+
+ for hit in res["aggregations"]["cloud"]["buckets"]:
+ wc[hit["key"]] = hit["doc_count"]
+
+ return wc
+
+
+def is_public(session: plugins.session.SessionObject, listname):
+ """ Quickly determine if a list if fully public, private or mixed """
+ if "@" not in listname:
+ lname, ldomain = listname.strip("<>").split(".", 1)
+ listname = f"{lname}@{ldomain}"
+ if listname in session.server.data.lists:
+ return not session.server.data.lists[listname]["private"]
+ else:
+ return False # Default to not public
+
+
+async def get_list_stats(session, maxage="90d", admin=False):
+ """
+ Fetches a list of all mailing lists available (and visible).
+ Use the admin flag to override AAA and see everything
+ """
+ daterange = {"gt": "now-%s" % maxage, "lt": "now+1d"}
+ res = session.database.search(
+ index=session.DB.dbs.mbox,
+ size=0,
+ body={
+ "query": {"bool": {"must": [{"range": {"date": daterange}}]}},
+ "aggs": {"listnames": {"terms": {"field": "list_raw", "size": 10000}}},
+ },
+ )
+ lists = {}
+ for entry in res["aggregations"]["listnames"]["buckets"]:
+
+ # Normalize list name
+ listname = entry["key"].lower().strip("<>")
+
+ # Check access
+ if (
+ is_public(session, listname)
+ or admin
+ or plugins.aaa.canViewList(session, listname)
+ ):
+ # Change foo.bar.baz to foo@bar.baz
+ listname = listname.replace(".", "@", 1)
+ lists[listname] = entry["doc_count"]
+ return lists
+
+
+async def get_years(session, query_defuzzed):
+ """ Fetches the oldest and youngest email, returns the years between them """
+
+ # Get oldest doc
+ res = await session.database.search(
+ index=session.database.dbs.mbox,
+ size=1,
+ body={"query": {"bool": query_defuzzed}, "sort": {"epoch": "asc"}},
+ )
+ oldest = 1970
+ if res["hits"]["hits"]:
+ doc = res["hits"]["hits"][0]
+ oldest = datetime.datetime.fromtimestamp(doc["_source"]["epoch"]).year
+
+ # Get youngest doc
+ res = await session.database.search(
+ size=1, body={"query": {"bool": query_defuzzed}, "sort": {"epoch": "desc"}}
+ )
+ youngest = 1970
+ if res["hits"]["hits"]:
+ doc = res["hits"]["hits"][0]
+ youngest = datetime.datetime.fromtimestamp(doc["_source"]["epoch"]).year
+
+ return oldest, youngest
+
+
+def find_root_subject(threads, hashdict, root_email, osubject=None):
+ """Finds the discussion origin of an email, if present"""
+ irt = root_email.get("in-reply-to")
+ subject = root_email.get("subject")
+ subject = subject.replace("\n", "") # Crop multi-line subjects
+
+ # First, the obvious - look for an in-reply-to in our existing dict with a matching subject
+ if irt and irt in hashdict:
+ if hashdict[irt].get("subject") == subject:
+ return hashdict[irt]
+
+ # If that failed, we break apart our subject
+ if osubject:
+ rsubject = osubject
+ else:
+ rsubject = PYPONY_RE_PREFIX.sub("", subject) + "_" + root_email.get("list_raw")
+ for thread in threads:
+ if thread.get("tsubject") == rsubject:
+ return thread
+
+ return None
+
+
+def construct_threads(emails: typing.List[typing.Dict]):
+ """Turns a list of emails into a nested thread structure"""
+ threads = []
+ authors = {}
+ hashdict = {}
+ for cur_email in sorted(emails, key=lambda x: x["epoch"]):
+ author = cur_email.get("from")
+ if author not in authors:
+ authors[author] = 0
+ authors[author] += 1
+ subject = cur_email.get("subject").replace("\n", "") # Crop multi-line subjects
+ tsubject = PYPONY_RE_PREFIX.sub("", subject) + "_" + cur_email.get("list_raw")
+ parent = find_root_subject(threads, hashdict, cur_email, tsubject)
+ xemail = {
+ "children": [],
+ "tid": cur_email.get("mid"),
+ "subject": subject,
+ "tsubject": tsubject,
+ "epoch": cur_email.get("epoch"),
+ "nest": 1,
+ }
+ if not parent:
+ threads.append(xemail)
+ else:
+ xemail["nest"] = parent["nest"] + 1
+ parent["children"].append(xemail)
+ hashdict[cur_email.get("message-id", "??")] = xemail
+ return threads, authors
+
+
+def gravatar(eml):
+ """Generates a gravatar hash from an email address"""
+ if isinstance(eml, str):
+ header_from = eml
+ else:
+ header_from = eml.get("from", "??")
+ mailaddr = email.utils.parseaddr(header_from)[1]
+ ghash = hashlib.md5(mailaddr.encode("utf-8")).hexdigest()
+ return ghash