You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by hu...@apache.org on 2020/08/17 11:25:05 UTC
[incubator-ponymail-foal] branch master updated: Conform bulk
operations with archiver.py's index format, run code through Black.
This is an automated email from the ASF dual-hosted git repository.
humbedooh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git
The following commit(s) were added to refs/heads/master by this push:
new ff1bf8c Conform bulk operations with archiver.py's index format, run code through Black.
ff1bf8c is described below
commit ff1bf8c38fcafa25f55e201330626da0f78306f8
Author: Daniel Gruno <hu...@apache.org>
AuthorDate: Mon Aug 17 13:24:50 2020 +0200
Conform bulk operations with archiver.py's index format, run code through Black.
This spits out a lot of changes, they are - bulk ops aside - only style
changes.
---
tools/import-mbox.py | 581 ++++++++++++++++++++++++++++++++-------------------
1 file changed, 371 insertions(+), 210 deletions(-)
diff --git a/tools/import-mbox.py b/tools/import-mbox.py
index 051b05a..bd2f717 100755
--- a/tools/import-mbox.py
+++ b/tools/import-mbox.py
@@ -16,33 +16,35 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import sys
-import random
-import time
-import hashlib
-import os
-from threading import Thread, Lock
-import mailbox
-import email.errors, email.utils, email.header
-from urllib.request import urlopen
-import re
-from elastic import Elastic
import argparse
-from os import listdir
-from os.path import isfile, join, isdir
+import email.errors
+import email.header
+import email.utils
import glob
+import gzip
+import hashlib
+import logging
+import mailbox
import multiprocessing
+import os
+import random
+import re
+import sys
import tempfile
-import gzip
+import time
+from os import listdir
+from os.path import isdir, isfile, join
+from threading import Lock, Thread
+from urllib.request import urlopen
import archiver
-
+from plugins.elastic import Elastic
goodies = 0
baddies = 0
-duplicates={} # detect if mid is re-used this run
+duplicates = {} # detect if mid is re-used this run
block = Lock()
-lists = [] # N.B. the entries in this list depend on the import type:
+lists = [] # N.B. the entries in this list depend on the import type:
# globDir: [filename, list-id]
# modMbox: [list-id, mbox]
# piperMail: [filename, list-id]
@@ -70,7 +72,7 @@ timeout = 600
fromFilter = None
dedup = False
dedupped = 0
-noMboxo = False # Don't skip MBoxo patch
+noMboxo = False # Don't skip MBoxo patch
# Fetch config and set up ES
es = Elastic()
@@ -79,7 +81,8 @@ dbname = es.getdbname()
rootURL = ""
-def bulk_insert(name, json, xes, dtype, wc = 'quorum'):
+
+def bulk_insert(name, json, xes, dtype, wc="quorum"):
if args.dry:
return
@@ -88,26 +91,28 @@ def bulk_insert(name, json, xes, dtype, wc = 'quorum'):
js_arr = []
for entry in json:
js = entry
- mid = js['mid']
- if dtype == 'mbox_source':
- del js['mid']
- js_arr.append({
- '_op_type': 'index',
- '_consistency': wc,
- '_index': dbname,
- '_type': dtype,
- '_id': mid,
- 'doc': js,
- '_source': js
- })
+ document_id = js["mid"]
+ if dtype == "source":
+ del js["mid"]
+ js_arr.append(
+ {
+ "_op_type": "index",
+ "_consistency": wc,
+ "_index": dbname + "-" + dtype,
+ "_type": "_doc",
+ "_id": document_id,
+ "doc": js,
+ "_source": js,
+ }
+ )
try:
- xes.bulk(js_arr,ignore=404)
-# print("%s: Inserted %u entries into %s" % (name, len(js_arr),dtype))
+ xes.bulk(js_arr, ignore=404)
+ # print("%s: Inserted %u entries into %s" % (name, len(js_arr),dtype))
except Exception as err:
- print("%s: Warning: Could not bulk insert: %s into %s" % (name,err,dtype))
+ print("%s: Warning: Could not bulk insert: %s into %s" % (name, err, dtype))
-class SlurpThread(Thread):
+class SlurpThread(Thread):
def printid(self, message):
print("%s: %s" % (self.name, message))
@@ -121,11 +126,10 @@ class SlurpThread(Thread):
ml = ""
mboxfile = ""
filename = ""
-
if args.generator:
- archie = archiver.Archiver(generator=args.generator, parse_html=parseHTML)
+ archie = archiver.Archiver(generator=args.generator, parse_html=parseHTML)
else:
- archie = archiver.Archiver(parse_html=parseHTML)
+ archie = archiver.Archiver(parse_html=parseHTML)
while len(lists) > 0:
self.printid("%u elements left to slurp" % len(lists))
@@ -146,10 +150,12 @@ class SlurpThread(Thread):
dFile = False
if imap:
imap4 = mla[2]
+
def mailgen(_list):
for uid in _list:
- msgbytes = imap4.uid('fetch', uid, '(RFC822)')[1][0][1]
+ msgbytes = imap4.uid("fetch", uid, "(RFC822)")[1][0][1]
yield email.message_from_bytes(msgbytes)
+
messages = mailgen(mla[0])
elif filebased:
@@ -160,132 +166,172 @@ class SlurpThread(Thread):
try:
with open(filename, "rb") as bf:
bmd = bf.read()
- bf.close() # explicit early close
+ bf.close() # explicit early close
bmd = gzip.decompress(bmd)
- tmpfile = tempfile.NamedTemporaryFile(mode='w+b', buffering=1, delete=False)
+ tmpfile = tempfile.NamedTemporaryFile(
+ mode="w+b", buffering=1, delete=False
+ )
tmpfile.write(bmd)
tmpfile.flush()
tmpfile.close()
tmpname = tmpfile.name
- dFile = True # Slated for deletion upon having been read
+ dFile = True # Slated for deletion upon having been read
self.printid("%s -> %u bytes" % (tmpname, len(bmd)))
except Exception as err:
- self.printid("This wasn't a gzip file: %s" % err )
+ self.printid("This wasn't a gzip file: %s" % err)
self.printid("Slurping %s" % filename)
if maildir:
messages = mailbox.Maildir(tmpname, create=False)
else:
- messages = mailbox.mbox(tmpname, None if noMboxo else MboxoFactory, create=False)
+ messages = mailbox.mbox(
+ tmpname, None if noMboxo else MboxoFactory, create=False
+ )
else:
ml = mla[0]
mboxfile = mla[1]
self.printid("Slurping %s/%s" % (ml, mboxfile))
- ctx = urlopen("%s%s/%s" % (source, ml, mboxfile ))
- inp = ctx.read().decode(ctx.headers.get_content_charset() or 'utf-8', errors='ignore')
-
- tmpname = hashlib.sha224(("%f-%f-%s-%s.mbox" % (random.random(), time.time(), ml, mboxfile)).encode('utf-8') ).hexdigest()
+ ctx = urlopen("%s%s/%s" % (source, ml, mboxfile))
+ inp = ctx.read().decode(
+ ctx.headers.get_content_charset() or "utf-8", errors="ignore"
+ )
+
+ tmpname = hashlib.sha224(
+ (
+ "%f-%f-%s-%s.mbox"
+ % (random.random(), time.time(), ml, mboxfile)
+ ).encode("utf-8")
+ ).hexdigest()
with open(tmpname, "w") as f:
f.write(inp)
if maildir:
messages = mailbox.Maildir(tmpname, create=False)
else:
- messages = mailbox.mbox(tmpname, None if noMboxo else MboxoFactory, create=False)
+ messages = mailbox.mbox(
+ tmpname, None if noMboxo else MboxoFactory, create=False
+ )
count = 0
bad = 0
-
for key in messages.iterkeys():
- message_raw = messages.get_bytes(key) # True raw format, as opposed to calling .as_bytes()
- message=messages.get(key)
+ message = messages.get(key)
# If --filter is set, discard any messages not matching by continuing to next email
- if fromFilter and 'from' in message and message['from'].find(fromFilter) == -1:
+ if (
+ fromFilter
+ and "from" in message
+ and message["from"].find(fromFilter) == -1
+ ):
continue
if resendTo:
- self.printid("Delivering message %s via MTA" % message['message-id'] if 'message-id' in message else '??')
- s = SMTP('localhost')
+ self.printid(
+ "Delivering message %s via MTA" % message["message-id"]
+ if "message-id" in message
+ else "??"
+ )
+ s = SMTP("localhost")
try:
if list_override:
- message.replace_header('List-ID', list_override)
- message.replace_header('To', resendTo)
+ message.replace_header("List-ID", list_override)
+ message.replace_header("To", resendTo)
except:
if list_override:
- message['List-ID'] = list_override
- message['cc'] = None
+ message["List-ID"] = list_override
+ message["cc"] = None
s.send_message(message, from_addr=None, to_addrs=(resendTo))
continue
- if (time.time() - stime > timeout): # break out after N seconds, it shouldn't take this long..!
- self.printid("Whoa, this is taking way too long, ignoring %s for now" % tmpname)
+ if (
+ time.time() - stime > timeout
+ ): # break out after N seconds, it shouldn't take this long..!
+ self.printid(
+ "Whoa, this is taking way too long, ignoring %s for now"
+ % tmpname
+ )
break
# Don't pass message to archiver unless we have a list id
- if not (list_override or message['list-id']):
- self.printid("No list id found for %s " % message['message-id'])
+ if not (list_override or message["list-id"]):
+ self.printid("No list id found for %s " % message["message-id"])
bad += 1
continue
- json, contents, _msgdata, _irt = archie.compute_updates(args, list_override, private, message, message_raw)
+ json, contents, _msgdata, _irt = archie.compute_updates(
+ list_override, private, message
+ )
# Not sure this can ever happen
- if json and not (json['list'] and json['list_raw']):
- self.printid("No list id found for %s " % json['message-id'])
+ if json and not (json["list"] and json["list_raw"]):
+ self.printid("No list id found for %s " % json["message-id"])
bad += 1
continue
# If --dedup is active, try to filter out any messages that already exist on the list
- if json and dedup and message.get('message-id', None):
+ if json and dedup and message.get("message-id", None):
res = es.search(
- doc_type="mbox",
- size = 1,
- _source = ['mid'], # so can report the match source
- body = {
- 'query': {
- 'bool': {
- 'must': [
+ index=dbname + "-" + "mbox",
+ doc_type="_doc",
+ size=1,
+ _source=["mid"], # so can report the match source
+ body={
+ "query": {
+ "bool": {
+ "must": [
{
- 'term': {
- 'message-id': message.get('message-id', None)
+ "term": {
+ "message-id": message.get(
+ "message-id", None
+ )
}
},
- {
- 'term': {
- 'list_raw': json['list']
- }
- }
+ {"term": {"list_raw": json["list"]}},
]
}
}
- }
+ },
)
- if res and res['hits']['total'] > 0:
- self.printid("Dedupping %s - matched in %s" % (json['message-id'], res['hits']['hits'][0]['_source']['mid']))
+ if res and res["hits"]["total"] > 0:
+ self.printid(
+ "Dedupping %s - matched in %s"
+ % (
+ json["message-id"],
+ res["hits"]["hits"][0]["_source"]["mid"],
+ )
+ )
dedupped += 1
continue
if json:
- file=messages.get_file(key, True)
+ file = messages.get_file(key, True)
# If the parsed data is filtered, also need to filter the raw input
# so the source agrees with the summary info
- if message.__class__.__name__ == 'MboxoFactory':
- file=MboxoReader(file)
- raw_msg=file.read()
+ if message.__class__.__name__ == "MboxoFactory":
+ file = MboxoReader(file)
+ raw_msg = file.read()
file.close()
if args.dups:
try:
- duplicates[json['mid']].append(json['message-id'] + " in " + filename)
+ duplicates[json["mid"]].append(
+ json["message-id"] + " in " + filename
+ )
except:
- duplicates[json['mid']]=[json['message-id'] + " in " + filename]
+ duplicates[json["mid"]] = [
+ json["message-id"] + " in " + filename
+ ]
- try: # temporary hack to try and find an encoding issue
+ try: # temporary hack to try and find an encoding issue
# needs to be replaced by proper exception handling
json_source = {
- 'mid': json['mid'], # needed for bulk-insert only, not needed in database
- 'message-id': json['message-id'],
- 'source': archie.mbox_source(raw_msg)
+ "mid": json[
+ "mid"
+ ], # needed for bulk-insert only, not needed in database
+ "message-id": json["message-id"],
+ "source": archie.mbox_source(raw_msg),
}
except Exception as e:
- self.printid("Error '%s' processing id %s msg %s " % (e, json['mid'], json['message-id']))
+ self.printid(
+ "Error '%s' processing id %s msg %s "
+ % (e, json["mid"], json["message-id"])
+ )
bad += 1
continue
@@ -294,95 +340,193 @@ class SlurpThread(Thread):
jas.append(json_source)
if args.verbose and verbose_logger:
# TODO optionally show other fields (e.g. From_ line)
- verbose_logger.info("MID:%(mid)s MSGID:%(message-id)s" , json)
+ verbose_logger.info("MID:%(mid)s MSGID:%(message-id)s", json)
if contents:
if not args.dry:
for key in contents:
es.index(
- doc_type="attachment",
+ index=dbname + "_attachment",
+ doc_type="_doc",
id=key,
- body = {
- 'source': contents[key]
- }
+ body={"source": contents[key]},
)
if len(ja) >= 40:
- bulk_insert(self.name, ja, es, 'mbox')
+ bulk_insert(self.name, ja, es, "mbox")
ja = []
- bulk_insert(self.name, jas, es, 'mbox_source')
+ bulk_insert(self.name, jas, es, "source")
jas = []
else:
- self.printid("Failed to parse: Return=%s Message-Id=%s" % (message.get('Return-Path'), message.get('Message-Id')))
+ self.printid(
+ "Failed to parse: Return=%s Message-Id=%s"
+ % (message.get("Return-Path"), message.get("Message-Id"))
+ )
bad += 1
if filebased:
- self.printid("Parsed %u records (failed: %u) from %s" % (count, bad, filename))
+ self.printid(
+ "Parsed %u records (failed: %u) from %s" % (count, bad, filename)
+ )
if dFile:
os.unlink(tmpname)
elif imap:
self.printid("Parsed %u records (failed: %u) from imap" % (count, bad))
else:
- self.printid("Parsed %s/%s: %u records (failed: %u) from %s" % (ml, mboxfile, count, bad, tmpname))
+ self.printid(
+ "Parsed %s/%s: %u records (failed: %u) from %s"
+ % (ml, mboxfile, count, bad, tmpname)
+ )
os.unlink(tmpname)
goodies += count
baddies += bad
if len(ja) > 0:
- bulk_insert(self.name, ja, es, 'mbox')
+ bulk_insert(self.name, ja, es, "mbox")
ja = []
if len(jas) > 0:
- bulk_insert(self.name, jas, es, 'mbox_source')
+ bulk_insert(self.name, jas, es, "source")
jas = []
self.printid("Done, %u elements left to slurp" % len(lists))
-parser = argparse.ArgumentParser(description='Command line options.')
-parser.add_argument('--source', dest='source', type=str, nargs=1,
- help='Source to scan (http(s)://, imap(s):// or file path)')
-parser.add_argument('--dir', dest='dir', action='store_true',
- help='Input is in Maildir format')
-parser.add_argument('--interactive', dest='interactive', action='store_true',
- help='Ask for help when possible')
-parser.add_argument('--quick', dest='quick', action='store_true',
- help='Only grab the first file you can find')
-parser.add_argument('--mod-mbox', dest='modmbox', action='store_true',
- help='This is mod_mbox, derive list-id and files from it')
-parser.add_argument('--pipermail', dest='pipermail', action='store_true',
- help='This is pipermail, derive files from it (list ID has to be set!)')
-parser.add_argument('--lid', dest='listid', type=str, nargs=1,
- help='Optional List-ID to override source with. Format: <list-name>@<domain>')
-parser.add_argument('--project', dest='project', type=str, nargs=1,
- help='Optional project to look for ($project-* will be imported as well)')
-parser.add_argument('--ext', dest='ext', type=str, nargs=1,
- help='Optional file extension e.g. ".gz" (or call it with an empty string to not care)')
-parser.add_argument('--domain', dest='domain', type=str, nargs=1,
- help='Optional domain extension for MIDs and List ID reconstruction)')
-parser.add_argument('--private', dest='private', action='store_true',
- help='This is a privately archived list. Filter through auth proxy.')
-parser.add_argument('--dry', dest='dry', action='store_true',
- help='Do not save emails to elasticsearch, only test importing')
-parser.add_argument('--verbose', dest='verbose', action='store_true',
- help='Show details of generated id (for use with --dry)')
-parser.add_argument('--duplicates', dest='dups', action='store_true',
- help='Detect duplicate mids in this run')
-parser.add_argument('--html2text', dest='html2text', action='store_true',
- help='If no text/plain is found, try to parse HTML using html2text')
-parser.add_argument('--requirelid', dest='requirelid', action='store_true',
- help='Require a List ID to be present, ignore otherwise')
-parser.add_argument('--dedup', dest='dedup', action='store_true',
- help='Try to dedup messages based on ID before importing')
-parser.add_argument('--ignorebody', dest='ibody', type=str, nargs=1,
- help='Optional email bodies to treat as empty (in conjunction with --html2text)')
-parser.add_argument('--resend', dest='resend', type=str, nargs=1,
- help='DANGER ZONE: Resend every read email to this recipient as a new email')
-parser.add_argument('--timeout', dest='timeout', type=int, nargs=1,
- help='Optional timeout in secs for importing an mbox/maildir file (default is 600 seconds)')
-parser.add_argument('--filter', dest = 'fromfilter', type=str, nargs=1,
- help = 'Optional sender filter: Only import emails from this address')
-parser.add_argument('--nomboxo', dest = 'nomboxo', action='store_true',
- help = 'Skip Mboxo processing')
-parser.add_argument('--generator', dest='generator',
- help='Override the generator.')
+
+parser = argparse.ArgumentParser(description="Command line options.")
+parser.add_argument(
+ "--source",
+ dest="source",
+ type=str,
+ nargs=1,
+ help="Source to scan (http(s)://, imap(s):// or file path)",
+)
+parser.add_argument(
+ "--dir", dest="dir", action="store_true", help="Input is in Maildir format"
+)
+parser.add_argument(
+ "--interactive",
+ dest="interactive",
+ action="store_true",
+ help="Ask for help when possible",
+)
+parser.add_argument(
+ "--quick",
+ dest="quick",
+ action="store_true",
+ help="Only grab the first file you can find",
+)
+parser.add_argument(
+ "--mod-mbox",
+ dest="modmbox",
+ action="store_true",
+ help="This is mod_mbox, derive list-id and files from it",
+)
+parser.add_argument(
+ "--pipermail",
+ dest="pipermail",
+ action="store_true",
+ help="This is pipermail, derive files from it (list ID has to be set!)",
+)
+parser.add_argument(
+ "--lid",
+ dest="listid",
+ type=str,
+ nargs=1,
+ help="Optional List-ID to override source with. Format: <list-name>@<domain>",
+)
+parser.add_argument(
+ "--project",
+ dest="project",
+ type=str,
+ nargs=1,
+ help="Optional project to look for ($project-* will be imported as well)",
+)
+parser.add_argument(
+ "--ext",
+ dest="ext",
+ type=str,
+ nargs=1,
+ help='Optional file extension e.g. ".gz" (or call it with an empty string to not care)',
+)
+parser.add_argument(
+ "--domain",
+ dest="domain",
+ type=str,
+ nargs=1,
+ help="Optional domain extension for MIDs and List ID reconstruction)",
+)
+parser.add_argument(
+ "--private",
+ dest="private",
+ action="store_true",
+ help="This is a privately archived list. Filter through auth proxy.",
+)
+parser.add_argument(
+ "--dry",
+ dest="dry",
+ action="store_true",
+ help="Do not save emails to elasticsearch, only test importing",
+)
+parser.add_argument(
+ "--verbose",
+ dest="verbose",
+ action="store_true",
+ help="Show details of generated id (for use with --dry)",
+)
+parser.add_argument(
+ "--duplicates",
+ dest="dups",
+ action="store_true",
+ help="Detect duplicate mids in this run",
+)
+parser.add_argument(
+ "--html2text",
+ dest="html2text",
+ action="store_true",
+ help="If no text/plain is found, try to parse HTML using html2text",
+)
+parser.add_argument(
+ "--requirelid",
+ dest="requirelid",
+ action="store_true",
+ help="Require a List ID to be present, ignore otherwise",
+)
+parser.add_argument(
+ "--dedup",
+ dest="dedup",
+ action="store_true",
+ help="Try to dedup messages based on ID before importing",
+)
+parser.add_argument(
+ "--ignorebody",
+ dest="ibody",
+ type=str,
+ nargs=1,
+ help="Optional email bodies to treat as empty (in conjunction with --html2text)",
+)
+parser.add_argument(
+ "--resend",
+ dest="resend",
+ type=str,
+ nargs=1,
+ help="DANGER ZONE: Resend every read email to this recipient as a new email",
+)
+parser.add_argument(
+ "--timeout",
+ dest="timeout",
+ type=int,
+ nargs=1,
+ help="Optional timeout in secs for importing an mbox/maildir file (default is 600 seconds)",
+)
+parser.add_argument(
+ "--filter",
+ dest="fromfilter",
+ type=str,
+ nargs=1,
+ help="Optional sender filter: Only import emails from this address",
+)
+parser.add_argument(
+ "--nomboxo", dest="nomboxo", action="store_true", help="Skip Mboxo processing"
+)
+parser.add_argument("--generator", dest="generator", help="Override the generator.")
args = parser.parse_args()
@@ -391,7 +535,6 @@ if len(sys.argv) <= 2:
sys.exit(-1)
-
if args.source:
source = args.source[0]
if args.dir:
@@ -414,8 +557,6 @@ if args.ext:
extension = args.ext[0]
if args.html2text:
parseHTML = True
-if args.ibody:
- archiver.iBody = args.ibody[0]
if args.fromfilter:
fromFilter = args.fromfilter[0]
if args.nomboxo:
@@ -436,7 +577,6 @@ baddies = 0
print("Checking that the database index %s exists ... " % dbname)
# elasticsearch logs lots of warnings on retries/connection failure
-import logging
logging.getLogger("elasticsearch").setLevel(logging.ERROR)
verbose_logger = None
@@ -452,7 +592,7 @@ if args.dry:
else:
# Need to check the index before starting bulk operations
try:
- if not es.indices.exists(index=dbname):
+ if not es.indices.exists(index=dbname + "_mbox"):
print("Error: the index '%s' does not exist!" % (dbname))
sys.exit(1)
print("Database exists OK")
@@ -460,9 +600,13 @@ else:
print("Error: unable to check if the index %s exists!: %s" % (dbname, err))
sys.exit(1)
-def globDir(d):
- dirs = [ f for f in listdir(d) if isdir(join(d,f)) ]
- mboxes = [ f for f in glob.glob(join(d,"*" + extension)) if isfile(f) ]
+if args.generator:
+ archiver.archiver_generator = args.generator
+
+
+def glob_dir(d):
+ dirs = [f for f in listdir(d) if isdir(join(d, f))]
+ mboxes = [f for f in glob.glob(join(d, "*" + extension)) if isfile(f)]
if not d in fileToLID and len(mboxes) > 0 and interactive:
print("Would you like to set a list-ID override for %s?:" % d)
lo = sys.stdin.readline()
@@ -475,12 +619,12 @@ def globDir(d):
lists.append([fi, fileToLID.get(d) if fileToLID.get(d) else list_override])
for nd in sorted(dirs):
- globDir(join(d,nd))
+ glob_dir(join(d, nd))
# HTTP(S) based import?
if re.match(r"https?://", source):
- data = urlopen(source).read().decode('utf-8')
+ data = urlopen(source).read().decode("utf-8")
print("Fetched %u bytes of main data, parsing month lists" % len(data))
if project:
@@ -489,14 +633,16 @@ if re.match(r"https?://", source):
ns = r"<a href='(%s-[-a-z0-9]+)/'" % project
if project.find("-") != -1:
ns = r"<a href='(%s)/'" % project
- else: # match all possible project names
+ else: # match all possible project names
ns = r"<a href='([-a-z0-9]+)/'"
if args.modmbox:
for mlist in re.finditer(ns, data):
ml = mlist.group(1)
- mldata = urlopen("%s%s/" % (source, ml)).read().decode('utf-8')
- present = re.search(r"<th colspan=\"3\">Year 20[\d]{2}</th>", mldata) # Check that year 2014-2017 exists, otherwise why keep it?
+ mldata = urlopen("%s%s/" % (source, ml)).read().decode("utf-8")
+ present = re.search(
+ r"<th colspan=\"3\">Year 20[\d]{2}</th>", mldata
+ ) # Check that year 2014-2017 exists, otherwise why keep it?
if present:
qn = 0
for mbox in re.finditer(r"(\d+\.mbox)/thread", mldata):
@@ -511,19 +657,21 @@ if re.match(r"https?://", source):
filebased = True
piperWeirdness = True
if not list_override:
- print("You need to specify a list ID with --lid when importing from Pipermail!")
+ print(
+ "You need to specify a list ID with --lid when importing from Pipermail!"
+ )
sys.exit(-1)
ns = r"href=\"(\d+-[a-zA-Z]+\.txt(\.gz)?)\""
qn = 0
for mlist in re.finditer(ns, data):
ml = mlist.group(1)
mldata = urlopen("%s%s" % (source, ml)).read()
- tmpfile = tempfile.NamedTemporaryFile(mode='w+b', buffering=1, delete=False)
+ tmpfile = tempfile.NamedTemporaryFile(mode="w+b", buffering=1, delete=False)
try:
if ml.find(".gz") != -1:
mldata = gzip.decompress(mldata)
except Exception as err:
- print("This wasn't a gzip file: %s" % err )
+ print("This wasn't a gzip file: %s" % err)
print(len(mldata))
tmpfile.write(mldata)
tmpfile.flush()
@@ -537,46 +685,50 @@ if re.match(r"https?://", source):
# IMAP(S) based import?
elif re.match(r"imaps?://", source):
imap = True
- import urllib, getpass, imaplib
+ import getpass
+ import imaplib
+ import urllib
+
url = urllib.parse.urlparse(source)
- port = url.port or (143 if url.scheme == 'imap' else 993)
+ port = url.port or (143 if url.scheme == "imap" else 993)
user = url.username or getpass.getuser()
- password = url.password or getpass.getpass('IMAP Password: ')
- folder = url.path.strip('/') or 'INBOX'
+ password = url.password or getpass.getpass("IMAP Password: ")
+ folder = url.path.strip("/") or "INBOX"
listname = list_override or "<%s/%s.%s>" % (user, folder, url.hostname)
# fetch message-id => _id pairs from elasticsearch
- result = es.search(scroll = '5m',
- body = {
- 'size': 1024,
- 'fields': ['message-id'],
- 'query': {'match': {'list': listname}}
- }
+ result = es.search(
+ scroll="5m",
+ body={
+ "size": 1024,
+ "fields": ["message-id"],
+ "query": {"match": {"list": listname}},
+ },
)
db = {}
- while len(result['hits']['hits']) > 0:
- for hit in result['hits']['hits']:
- db[hit['fields']['message-id'][0]] = hit['_id']
- result = es.scroll(scroll='5m', scroll_id=result['_scroll_id'])
+ while len(result["hits"]["hits"]) > 0:
+ for hit in result["hits"]["hits"]:
+ db[hit["fields"]["message-id"][0]] = hit["_id"]
+ result = es.scroll(scroll="5m", scroll_id=result["_scroll_id"])
# fetch message-id => uid pairs from imap
- if url.scheme == 'imaps':
+ if url.scheme == "imaps":
imap4 = imaplib.IMAP4_SSL(url.hostname, port)
else:
imap4 = imaplib.IMAP4(url.hostname, port)
imap4.login(user, password)
imap4.select(folder, readonly=True)
- results = imap4.uid('search', None, 'ALL')
- uids = b','.join(results[1][0].split())
- results = imap4.uid('fetch', uids, '(BODY[HEADER.FIELDS (MESSAGE-ID)])')
+ results = imap4.uid("search", None, "ALL")
+ uids = b",".join(results[1][0].split())
+ results = imap4.uid("fetch", uids, "(BODY[HEADER.FIELDS (MESSAGE-ID)])")
mail = {}
- uid_re = re.compile(b'^\d+ \(UID (\d+) BODY\[')
- mid_re = re.compile(b'^Message-ID:\s*(.*?)\s*$', re.I)
+ uid_re = re.compile(b"^\d+ \(UID (\d+) BODY\[")
+ mid_re = re.compile(b"^Message-ID:\s*(.*?)\s*$", re.I)
uid = None
for result in results[1]:
for line in result:
@@ -588,7 +740,7 @@ elif re.match(r"imaps?://", source):
match = mid_re.match(line)
if match:
try:
- mail[match.group(1).decode('utf-8')] = uid
+ mail[match.group(1).decode("utf-8")] = uid
uid = None
except ValueError:
pass
@@ -598,19 +750,18 @@ elif re.match(r"imaps?://", source):
queue1 = []
queue2 = []
for mid, _id in db.items():
- if not mid in mail:
- queue1.append({
- '_op_type': 'delete',
- '_index': dbname,
- '_type': 'mbox',
- '_id': _id
- })
- queue2.append({
- '_op_type': 'delete',
- '_index': dbname,
- '_type': 'mbox_source',
- '_id': _id
- })
+ if mid not in mail:
+ queue1.append(
+ {"_op_type": "delete", "_index": dbname, "_type": "mbox", "_id": _id}
+ )
+ queue2.append(
+ {
+ "_op_type": "delete",
+ "_index": dbname + '-source',
+ "_type": "_doc",
+ "_id": _id,
+ }
+ )
print("deleting: " + mid)
while len(queue1) > 0:
@@ -633,19 +784,26 @@ else:
print("Doing file based import")
filebased = True
if maildir:
- lists.append([source, fileToLID.get(source) if fileToLID.get(source) else list_override])
+ lists.append(
+ [source, fileToLID.get(source) if fileToLID.get(source) else list_override]
+ )
else:
if os.path.isfile(source):
- lists.append([source, fileToLID.get(source) if fileToLID.get(source) else list_override])
+ lists.append(
+ [
+ source,
+ fileToLID.get(source) if fileToLID.get(source) else list_override,
+ ]
+ )
else:
- globDir(source)
+ glob_dir(source)
threads = []
# Don't start more threads than there are lists
-cc = min(len(lists), int( multiprocessing.cpu_count() / 2) + 1)
+cc = min(len(lists), int(multiprocessing.cpu_count() / 2) + 1)
print("Starting up to %u threads to fetch the %u %s lists" % (cc, len(lists), project))
-for i in range(1,cc+1):
+for i in range(1, cc + 1):
t = SlurpThread()
threads.append(t)
t.start()
@@ -662,6 +820,9 @@ if args.dups:
for msg in duplicates[mid]:
print(msg)
-print("All done! %u records inserted/updated after %u seconds. %u records were bad and ignored" % (goodies, int(time.time() - start), baddies))
+print(
+ "All done! %u records inserted/updated after %u seconds. %u records were bad and ignored"
+ % (goodies, int(time.time() - start), baddies)
+)
if dedupped > 0:
print("%u records were not inserted due to deduplication" % dedupped)