You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by hu...@apache.org on 2020/09/09 11:49:07 UTC

[incubator-ponymail-foal] branch master updated: Add initial migrator for old PM > Foal

This is an automated email from the ASF dual-hosted git repository.

humbedooh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git


The following commit(s) were added to refs/heads/master by this push:
     new ed3b989  Add initial migrator for old PM > Foal
ed3b989 is described below

commit ed3b989f5e344e03781fe02eddc77b73c4bc46a5
Author: Daniel Gruno <hu...@apache.org>
AuthorDate: Wed Sep 9 13:48:39 2020 +0200

    Add initial migrator for old PM > Foal
---
 tools/migrate.py | 170 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 170 insertions(+)

diff --git a/tools/migrate.py b/tools/migrate.py
new file mode 100644
index 0000000..5d5e689
--- /dev/null
+++ b/tools/migrate.py
@@ -0,0 +1,170 @@
+import asyncio
+from elasticsearch import AsyncElasticsearch
+from elasticsearch.helpers import async_scan
+from elasticsearch import helpers
+import plugins.generators
+import time
+import base64
+import hashlib
+
+# ES connections
+es = None
+new_es = None
+
+
+async def bulk_push(json):
+    """Pushes a bunch of objects to ES in a bulk operation"""
+    js_arr = []
+    for entry in json:
+        bulk_op = {
+                "_op_type": "index",
+                "_index": entry['index'],
+                "_id": entry['id'],
+                "_source": entry['body'],
+            }
+        js_arr.append(
+            bulk_op
+        )
+    await helpers.async_bulk(new_es, js_arr)
+
+
+async def main():
+    global es, new_es
+    print("Welcome to the Apache Pony Mail -> Foal migrator.")
+    print("This will copy your old database, adjust the structure, and insert the emails into your new foal database.")
+    print("------------------------------------")
+    old_es_url = input("Enter the full URL (including http/https) of your old ES server: ") or "http://localhost:9200/"
+    new_es_url = input("Enter the full URL (including http/https) of your NEW ES server: ") or "http://localhost:9200/"
+    if old_es_url == new_es_url:
+        print("Old and new DB should not be the same, assuming error in input and exiting!")
+        return
+    es = AsyncElasticsearch([old_es_url])
+    new_es = AsyncElasticsearch([new_es_url])
+
+    old_dbname = input("What is the database name for the old Pony Mail emails? [ponymail]: ") or "ponymail"
+    new_dbprefix = input("What is the database prefix for the new Pony Mail emails? [ponymail]: ") or "ponymail"
+
+    do_dkim = True
+    dkim_txt = input("Do you wish to perform DKIM re-indexing of all emails? This will still preserve old permalinks "
+                     "(y/n) [y]: ") or "y"
+    if dkim_txt.lower() == 'n':
+        do_dkim = False
+
+
+    # Define index names for new ES
+    dbname_mbox = new_dbprefix + "-mbox"
+    dbname_source = new_dbprefix + "-source"
+    dbname_attachment = new_dbprefix + "-attachment"
+
+    # Let's get started..!
+    start_time = time.time()
+    processed = 0
+    count = await es.count(index=old_dbname, doc_type="mbox")
+    no_emails = count['count']
+
+    print("------------------------------------")
+    print("Starting migration of %u emails, this may take quite a while..." % no_emails)
+
+    bulk_array = []
+
+    async for doc in async_scan(
+        client=es,
+        query={"query": {"match_all": {}}},
+        doc_type="mbox",
+        index=old_dbname,
+    ):
+        list_id = doc['_source']['list_raw'].strip("<>")
+        source = await es.get(index=old_dbname, doc_type="mbox_source", id=doc['_id'])
+        source_text: str = source['_source']['source']
+        if ':' not in source_text:  # Base64
+            source_text = base64.b64decode(source_text)
+        else:  # bytify
+            source_text = source_text.encode('utf-8', 'ignore')
+        if do_dkim:
+            dkim_id = plugins.generators.dkim(None, None, list_id, None, source_text)
+            old_id = doc['_id']
+            doc['_source']['mid'] = dkim_id
+            doc['_source']['permalinks'] = [
+                dkim_id,
+                old_id
+            ]
+        else:
+            doc['_source']['permalinks'] = [
+                doc['_id']
+            ]
+
+        source['_source']['permalinks'] = doc['_source']['permalinks']
+        doc['_source']['dbid'] = hashlib.sha3_256(source_text).hexdigest()
+
+        # Copy to new DB
+        bulk_array.append({
+            'index': dbname_mbox,
+            'id': doc['_id'],
+            'body': doc['_source']
+        })
+        bulk_array.append({
+            'index': dbname_source,
+            'id': doc['_source']['dbid'],
+            'body': source['_source']
+        })
+
+        if len(bulk_array) > 100:
+            await bulk_push(bulk_array)
+            bulk_array = []
+
+        processed += 1
+        if processed % 500 == 0:
+            now = time.time()
+            time_spent = now - start_time
+            docs_per_second = processed / time_spent
+            time_left = (no_emails - processed) / docs_per_second
+
+            # stringify time left
+            time_left_str = "%u seconds" % time_left
+            if time_left > 60:
+                time_left_str = "%u minute(s), %u second(s)" % ( int(time_left/60), time_left % 60)
+            if time_left > 3600:
+                time_left_str = "%u hour(s), %u minute(s), %u second(s)" % ( int(time_left/3600), int(time_left%3600/60), time_left % 60)
+
+            print("Processed %u emails, %u remain. ETA: %s (at %u emails per second)" %
+                  (processed, (no_emails - processed), time_left_str, docs_per_second)
+                  )
+
+    start_time = time.time()
+    processed = 0
+    count = await es.count(index=old_dbname, doc_type="attachment")
+    no_att = count['count']
+    print("Transferring %u attachments..." % no_att)
+    async for doc in async_scan(
+        client=es,
+        query={"query": {"match_all": {}}},
+        doc_type="attachment",
+        index=old_dbname,
+    ):
+        # Copy to new DB
+        await new_es.index(index=dbname_attachment, doc_type='_doc', id=doc['_id'], body=doc['_source'])
+
+        processed += 1
+        if processed % 500 == 0:
+            now = time.time()
+            time_spent = now - start_time
+            docs_per_second = processed / time_spent
+            time_left = (no_att - processed) / docs_per_second
+
+            # stringify time left
+            time_left_str = "%u seconds" % time_left
+            if time_left > 60:
+                time_left_str = "%u minute(s), %u second(s)" % ( int(time_left/60), time_left % 60)
+            if time_left > 3600:
+                time_left_str = "%u hour(s), %u minute(s), %u second(s)" % ( int(time_left/3600), int(time_left%3600/60), time_left % 60)
+
+            print("Processed %u emails, %u remain. ETA: %s (at %u attachments per second)" %
+                  (processed, (no_att - processed), time_left_str, docs_per_second)
+                  )
+
+    await es.close()
+    await new_es.close()
+    print("All done, enjoy!")
+
+loop = asyncio.get_event_loop()
+loop.run_until_complete(main())