You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by hu...@apache.org on 2021/09/06 18:23:08 UTC

[incubator-ponymail-foal] branch master updated: Switch to multiprocessing for migration

This is an automated email from the ASF dual-hosted git repository.

humbedooh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git


The following commit(s) were added to refs/heads/master by this push:
     new 68a9df4  Switch to multiprocessing for migration
68a9df4 is described below

commit 68a9df41bc4ab0d4500bbbfe5d1f28aeb24f3768
Author: Daniel Gruno <hu...@apache.org>
AuthorDate: Mon Sep 6 13:22:59 2021 -0500

    Switch to multiprocessing for migration
    
    This will utilize up to 12 cores by default. Might wanna automatically determine via cpuinfo and use 3/4 of all cores or such...
---
 tools/migrate.py | 361 ++++++++++++++++++++++++++++++++++---------------------
 1 file changed, 227 insertions(+), 134 deletions(-)

diff --git a/tools/migrate.py b/tools/migrate.py
index 980e089..d3cd792 100644
--- a/tools/migrate.py
+++ b/tools/migrate.py
@@ -18,56 +18,215 @@
 # under the License.
 
 import asyncio
-from elasticsearch import AsyncElasticsearch
+
+from elasticsearch import AsyncElasticsearch, Elasticsearch, helpers
 from elasticsearch.helpers import async_scan
-from elasticsearch import helpers
+
 if not __package__:
     from plugins import generators
 else:
     from .plugins import generators
-import time
+
 import base64
 import hashlib
+import multiprocessing
+import time
 
 # Increment this number whenever breaking changes happen in the migration workflow:
-MIGRATION_MAGIC_NUMBER = "1"
+MIGRATION_MAGIC_NUMBER = "2"
+
+# Max number of parallel conversions to perform before pushing
+MAX_PARALLEL_OPS = 12
+
+
+class MultiDocProcessor:
+    """MultiProcess document processor"""
+
+    def __init__(self, old_es_url: str, new_es_url: str, target: callable, num_processes: int = 8):
+        self.processes = []
+        self.queues = []
+        self.target = target
+        self.manager = multiprocessing.Manager()
+        self.lock = self.manager.Lock()
+        self.processed = self.manager.Value("i", 0)
+        self.processed_last_count = 0
+        self.start_time = time.time()
+        self.queue_pointer = 0
+        self.num_processes = num_processes
+        for i in range(0, num_processes):
+            q = multiprocessing.Queue()
+            p = multiprocessing.Process(
+                target=self.start,
+                args=(
+                    q,
+                    old_es_url,
+                    new_es_url,
+                ),
+            )
+            self.queues.append(q)
+            self.processes.append(p)
+            p.start()
+
+    def feed(self, *params):
+        """Feed arguments to the next available processor"""
+        self.queues[self.queue_pointer].put(params)
+        self.queue_pointer += 1
+        self.queue_pointer %= self.num_processes
+
+    def sighup(self):
+        for queue in self.queues:
+            queue.put("SIGHUP")
+
+    def stop(self):
+        for queue in self.queues:
+            queue.put("DONE")
+        for proc in self.processes:
+            proc.join()
+
+    def status(self, total):
+        processed = self.processed.value
+        if processed - self.processed_last_count >= 1000:
+            self.processed_last_count = processed
+            now = time.time()
+            time_spent = now - self.start_time
+            docs_per_second = (processed / time_spent) or 1
+            time_left = (total - processed) / docs_per_second
 
+            # stringify time left
+            time_left_str = "%u seconds" % time_left
+            if time_left > 60:
+                time_left_str = "%u minute(s), %u second(s)" % (int(time_left / 60), time_left % 60)
+            if time_left > 3600:
+                time_left_str = "%u hour(s), %u minute(s), %u second(s)" % (
+                    int(time_left / 3600),
+                    int(time_left % 3600 / 60),
+                    time_left % 60,
+                )
 
-async def bulk_push(json, es):
+            print(
+                "Processed %u documents, %u remain. ETA: %s (at %u documents per second)"
+                % (processed, (total - processed), time_left_str, docs_per_second)
+            )
+
+    def start(self, queue, old_es_url, new_es_url):
+        old_es = Elasticsearch([old_es_url])
+        new_es = Elasticsearch([new_es_url])
+        bulk_array = []
+        while True:
+            params = queue.get()
+            if params == "SIGHUP":  # Push stragglers
+                if bulk_array:
+                    bulk_push(bulk_array, new_es)
+                    bulk_array[:] = []
+                continue
+            elif params == "DONE":  # Close up shop completely
+                if bulk_array:
+                    bulk_push(bulk_array, new_es)
+                old_es.close()
+                new_es.close()
+                return
+            else:
+                as_list = list(params)
+                as_list.insert(0, old_es)
+                ret_val = self.target(*as_list)
+                if ret_val:
+                    bulk_array.extend(ret_val)
+                with self.lock:
+                    self.processed.value += 1
+                if len(bulk_array) >= 200:
+                    bulk_push(bulk_array, new_es)
+                    bulk_array[:] = []
+
+
+def bulk_push(json, es):
     """Pushes a bunch of objects to ES in a bulk operation"""
     js_arr = []
     for entry in json:
         bulk_op = {
             "_op_type": "index",
-            "_index": entry['index'],
-            "_id": entry['id'],
-            "_source": entry['body'],
+            "_index": entry["index"],
+            "_id": entry["id"],
+            "_source": entry["body"],
         }
-        js_arr.append(
-            bulk_op
-        )
-    await helpers.async_bulk(es, js_arr)
+        js_arr.append(bulk_op)
+    helpers.bulk(es, js_arr)
+
+
+def process_document(old_es, doc, old_dbname, dbname_source, dbname_mbox, do_dkim):
+    now = time.time()
+    list_id = doc["_source"]["list_raw"].strip("<>")
+    try:
+        source = old_es.get(index=old_dbname, doc_type="mbox_source", id=doc["_id"])
+        # If we hit a 404 on a source, we have to fake an empty document, as we don't know the source.
+    except:
+        print("Source for %s was not found, faking it..." % doc["_id"])
+        source = {"_source": {"source": ""}}
+    source_text: str = source["_source"]["source"]
+    if ":" not in source_text:  # Base64
+        source_text = base64.b64decode(source_text)
+    else:  # bytify
+        source_text = source_text.encode("utf-8", "ignore")
+    if do_dkim:
+        dkim_id = generators.dkimid(None, None, list_id, None, source_text)
+        old_id = doc["_id"]
+        doc["_source"]["mid"] = dkim_id
+        doc["_source"]["permalinks"] = [dkim_id, old_id]
+    else:
+        doc["_source"]["permalinks"] = [doc["_id"]]
+
+    source["_source"]["permalinks"] = doc["_source"]["permalinks"]
+    doc["_source"]["dbid"] = hashlib.sha3_256(source_text).hexdigest()
+
+    # Append migration details to notes field in doc
+    notes = doc["_source"].get("_notes", [])
+    # We want a list, not a single string
+    if isinstance(notes, str):
+        notes = list(notes)
+    notes.append(
+        "MIGRATE: Document migrated from Pony Mail to Pony Mail Foal at %u, "
+        "using foal migrator v/%s" % (now, MIGRATION_MAGIC_NUMBER)
+    )
+    # If we re-indexed the document, make a note of that as well.
+    if do_dkim:
+        notes.append("REINDEX: Document re-indexed with DKIM_ID at %u, " "from %s to %s" % (now, dkim_id, old_id))
+    doc["_source"]["_notes"] = notes
+
+    # Copy to new DB
+    return (
+        {"index": dbname_mbox, "id": doc["_id"], "body": doc["_source"]},
+        {"index": dbname_source, "id": doc["_source"]["dbid"], "body": source["_source"]},
+    )
+
+
+def process_attachment(old_es, doc, dbname_attachment):
+    return {"index": dbname_attachment, "id": doc["_id"], "body": doc["_source"]},
 
 
 async def main():
     print("Welcome to the Apache Pony Mail -> Foal migrator.")
     print("This will copy your old database, adjust the structure, and insert the emails into your new foal database.")
     print("------------------------------------")
-    old_es_url = input("Enter the full URL (including http/https) of your old ES server: ") or "http://localhost:9200/"
+    old_es_url = (
+        input("Enter the full URL (including http/https) of your old ES server: ") or "http://localhost:9200/"
+    )
     new_es_url = input("Enter the full URL (including http/https) of your NEW ES server: ") or "http://localhost:9200/"
     if old_es_url == new_es_url:
         print("Old and new DB should not be the same, assuming error in input and exiting!")
         return
-    old_es = AsyncElasticsearch([old_es_url])
-    new_es = AsyncElasticsearch([new_es_url])
+    ols_es_async = AsyncElasticsearch([old_es_url])
 
     old_dbname = input("What is the database name for the old Pony Mail emails? [ponymail]: ") or "ponymail"
     new_dbprefix = input("What is the database prefix for the new Pony Mail emails? [ponymail]: ") or "ponymail"
 
     do_dkim = True
-    dkim_txt = input("Do you wish to perform DKIM re-indexing of all emails? This will still preserve old permalinks "
-                     "(y/n) [y]: ") or "y"
-    if dkim_txt.lower() == 'n':
+    dkim_txt = (
+        input(
+            "Do you wish to perform DKIM re-indexing of all emails? This will still preserve old permalinks "
+            "(y/n) [y]: "
+        )
+        or "y"
+    )
+    if dkim_txt.lower() == "n":
         do_dkim = False
 
     # Define index names for new ES
@@ -77,141 +236,75 @@ async def main():
 
     # Let's get started..!
     start_time = time.time()
-    now = start_time
-    processed = 0
-    count = await old_es.count(index=old_dbname, doc_type="mbox")
-    no_emails = count['count']
+    count = await ols_es_async.count(index=old_dbname, doc_type="mbox")
+    no_emails = count["count"]
 
     print("------------------------------------")
     print("Starting migration of %u emails, this may take quite a while..." % no_emails)
 
-    bulk_array = []
+    processes = MultiDocProcessor(old_es_url, new_es_url, process_document, MAX_PARALLEL_OPS)
 
+    processed_last_count = 0
+    current_executor = 0
+    docs_read = 0
     async for doc in async_scan(
-            client=old_es,
-            query={"query": {"match_all": {}}},
-            doc_type="mbox",
-            index=old_dbname,
+        client=ols_es_async,
+        query={"query": {"match_all": {}}},
+        doc_type="mbox",
+        index=old_dbname,
     ):
-        list_id = doc['_source']['list_raw'].strip("<>")
-        try:
-            source = await old_es.get(index=old_dbname, doc_type="mbox_source", id=doc['_id'])
-        # If we hit a 404 on a source, we have to fake an empty document, as we don't know the source.
-        except:
-            print("Source for %s was not found, faking it..." % doc['_id'])
-            source = {
-                '_source': {
-                    'source': ""
-                }
-            }
-        source_text: str = source['_source']['source']
-        if ':' not in source_text:  # Base64
-            source_text = base64.b64decode(source_text)
-        else:  # bytify
-            source_text = source_text.encode('utf-8', 'ignore')
-        if do_dkim:
-            dkim_id = generators.dkimid(None, None, list_id, None, source_text)
-            old_id = doc['_id']
-            doc['_source']['mid'] = dkim_id
-            doc['_source']['permalinks'] = [
-                dkim_id,
-                old_id
-            ]
-        else:
-            doc['_source']['permalinks'] = [
-                doc['_id']
-            ]
-
-        source['_source']['permalinks'] = doc['_source']['permalinks']
-        doc['_source']['dbid'] = hashlib.sha3_256(source_text).hexdigest()
-
-        # Append migration details to notes field in doc
-        notes = doc['_source'].get('_notes', [])
-        # We want a list, not a single string
-        if isinstance(notes, str):
-            notes = list(notes)
-        notes.append("MIGRATE: Document migrated from Pony Mail to Pony Mail Foal at %u, "
-                     "using foal migrator v/%s" % (now, MIGRATION_MAGIC_NUMBER))
-        # If we re-indexed the document, make a note of that as well.
-        if do_dkim:
-            notes.append("REINDEX: Document re-indexed with DKIM_ID at %u, "
-                         "from %s to %s" % (now, dkim_id, old_id))
-        doc['_source']['_notes'] = notes
-
-        # Copy to new DB
-        bulk_array.append({
-            'index': dbname_mbox,
-            'id': doc['_id'],
-            'body': doc['_source']
-        })
-        bulk_array.append({
-            'index': dbname_source,
-            'id': doc['_source']['dbid'],
-            'body': source['_source']
-        })
-
-        if len(bulk_array) > 100:
-            await bulk_push(bulk_array, new_es)
-            bulk_array[:] = []
-
-        processed += 1
-        if processed % 500 == 0:
-            now = time.time()
-            time_spent = now - start_time
-            docs_per_second = processed / time_spent
-            time_left = (no_emails - processed) / docs_per_second
+        docs_read += 1
+        processes.feed(doc, old_dbname, dbname_source, dbname_mbox, do_dkim)
+        # Don't speed too far ahead of processing...
+        processed = processes.processed.value
+        while docs_read - processed > 100 * MAX_PARALLEL_OPS:
+            await asyncio.sleep(0.01)
+            processed = processes.processed.value + 0
 
-            # stringify time left
-            time_left_str = "%u seconds" % time_left
-            if time_left > 60:
-                time_left_str = "%u minute(s), %u second(s)" % (int(time_left / 60), time_left % 60)
-            if time_left > 3600:
-                time_left_str = "%u hour(s), %u minute(s), %u second(s)" % (
-                int(time_left / 3600), int(time_left % 3600 / 60), time_left % 60)
-
-            print("Processed %u emails, %u remain. ETA: %s (at %u emails per second)" %
-                  (processed, (no_emails - processed), time_left_str, docs_per_second)
-                  )
+        processes.status(no_emails)
 
     # There may be some docs left over to push
-    if bulk_array:
-        await bulk_push(bulk_array, new_es)
+    processes.sighup()
+    while processed < no_emails:  # Wait for all documents to have been processed.
+        await asyncio.sleep(1)
+        print(f"Waiting for bulk push to complete ({processed} out of {no_emails} done...)")
+        processed = processes.processed.value
 
+    processes.stop()
+
+    # Process attachments
     start_time = time.time()
-    processed = 0
-    count = await old_es.count(index=old_dbname, doc_type="attachment")
-    no_att = count['count']
+    processes = MultiDocProcessor(old_es_url, new_es_url, process_attachment, MAX_PARALLEL_OPS)
+    docs_read = 0
+    count = await ols_es_async.count(index=old_dbname, doc_type="attachment")
+    no_att = count["count"]
     print("Transferring %u attachments..." % no_att)
     async for doc in async_scan(
-            client=old_es,
-            query={"query": {"match_all": {}}},
-            doc_type="attachment",
-            index=old_dbname,
+        client=ols_es_async,
+        query={"query": {"match_all": {}}},
+        doc_type="attachment",
+        index=old_dbname,
     ):
-        # Copy to new DB
-        await new_es.index(index=dbname_attachment, doc_type='_doc', id=doc['_id'], body=doc['_source'])
+        processes.feed(doc, dbname_attachment)
+        docs_read += 1
 
-        processed += 1
-        if processed % 500 == 0:
-            now = time.time()
-            time_spent = now - start_time
-            docs_per_second = processed / time_spent
-            time_left = (no_att - processed) / docs_per_second
+        # Don't speed ahead
+        processed = processes.processed.value + 0
+        while docs_read - processed > 10 * MAX_PARALLEL_OPS:
+            await asyncio.sleep(0.01)
+            processed = processes.processed.value + 0
 
-            # stringify time left
-            time_left_str = "%u seconds" % time_left
-            if time_left > 60:
-                time_left_str = "%u minute(s), %u second(s)" % (int(time_left / 60), time_left % 60)
-            if time_left > 3600:
-                time_left_str = "%u hour(s), %u minute(s), %u second(s)" % (
-                int(time_left / 3600), int(time_left % 3600 / 60), time_left % 60)
+        processes.status(no_att)
 
-            print("Processed %u emails, %u remain. ETA: %s (at %u attachments per second)" %
-                  (processed, (no_att - processed), time_left_str, docs_per_second)
-                  )
+    # There may be some docs left over to push
+    processes.sighup()
+    while processed < no_att:  # Wait for all attachments to have been processed.
+        await asyncio.sleep(1)
+        print(f"Waiting for bulk push to complete ({processed} out of {no_att} done...)")
+        processed = processes.processed.value
 
-    await old_es.close()
-    await new_es.close()
+    processes.stop()
+    await ols_es_async.close()
     print("All done, enjoy!")