You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by hu...@apache.org on 2021/09/07 02:14:47 UTC

[incubator-ponymail-foal] branch master updated: Make no. of jobs configurable at runtime, lint

This is an automated email from the ASF dual-hosted git repository.

humbedooh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git


The following commit(s) were added to refs/heads/master by this push:
     new 894bd07  Make no. of jobs configurable at runtime, lint
894bd07 is described below

commit 894bd0750438d5617e95651d4a3d7307eef61ea8
Author: Daniel Gruno <hu...@apache.org>
AuthorDate: Mon Sep 6 21:14:40 2021 -0500

    Make no. of jobs configurable at runtime, lint
---
 tools/migrate.py | 39 ++++++++++++++++++++++++---------------
 1 file changed, 24 insertions(+), 15 deletions(-)

diff --git a/tools/migrate.py b/tools/migrate.py
index ff295f0..160810a 100644
--- a/tools/migrate.py
+++ b/tools/migrate.py
@@ -27,18 +27,19 @@ if not __package__:
 else:
     from .plugins import generators
 
+import argparse
 import base64
 import hashlib
 import multiprocessing
-import time
 import os
+import time
 
 # Increment this number whenever breaking changes happen in the migration workflow:
 MIGRATION_MAGIC_NUMBER = "2"
 
 # Max number of parallel conversions to perform before pushing. 75-ish percent of max cores.
-cores = os.cpu_count()
-MAX_PARALLEL_OPS = max(min(int((cores + 1) * 0.75), cores-1), 1)
+cores = len(os.sched_getaffinity(0))
+MAX_PARALLEL_OPS = max(min(int((cores + 1) * 0.75), cores - 1), 1)
 
 
 class MultiDocProcessor:
@@ -201,17 +202,15 @@ def process_document(old_es, doc, old_dbname, dbname_source, dbname_mbox, do_dki
 
 
 def process_attachment(old_es, doc, dbname_attachment):
-    return {"index": dbname_attachment, "id": doc["_id"], "body": doc["_source"]},
+    return ({"index": dbname_attachment, "id": doc["_id"], "body": doc["_source"]},)
 
 
-async def main():
+async def main(no_jobs):
     print("Welcome to the Apache Pony Mail -> Foal migrator.")
     print("This will copy your old database, adjust the structure, and insert the emails into your new foal database.")
-    print("We will be utilizing %u cores for this operation." % MAX_PARALLEL_OPS)
+    print("We will be utilizing %u cores for this operation." % no_jobs)
     print("------------------------------------")
-    old_es_url = (
-        input("Enter the full URL (including http/https) of your old ES server: ") or "http://localhost:9200/"
-    )
+    old_es_url = input("Enter the full URL (including http/https) of your old ES server: ") or "http://localhost:9200/"
     new_es_url = input("Enter the full URL (including http/https) of your NEW ES server: ") or "http://localhost:9200/"
     if old_es_url == new_es_url:
         print("Old and new DB should not be the same, assuming error in input and exiting!")
@@ -245,7 +244,7 @@ async def main():
     print("------------------------------------")
     print("Starting migration of %u emails, this may take quite a while..." % no_emails)
 
-    processes = MultiDocProcessor(old_es_url, new_es_url, process_document, MAX_PARALLEL_OPS)
+    processes = MultiDocProcessor(old_es_url, new_es_url, process_document, no_jobs)
 
     processed_last_count = 0
     current_executor = 0
@@ -260,7 +259,7 @@ async def main():
         processes.feed(doc, old_dbname, dbname_source, dbname_mbox, do_dkim)
         # Don't speed too far ahead of processing...
         processed = processes.processed.value
-        while docs_read - processed > 100 * MAX_PARALLEL_OPS:
+        while docs_read - processed > 100 * no_jobs:
             await asyncio.sleep(0.01)
             processed = processes.processed.value + 0
 
@@ -277,7 +276,7 @@ async def main():
 
     # Process attachments
     start_time = time.time()
-    processes = MultiDocProcessor(old_es_url, new_es_url, process_attachment, MAX_PARALLEL_OPS)
+    processes = MultiDocProcessor(old_es_url, new_es_url, process_attachment, no_jobs)
     docs_read = 0
     count = await ols_es_async.count(index=old_dbname, doc_type="attachment")
     no_att = count["count"]
@@ -293,7 +292,7 @@ async def main():
 
         # Don't speed ahead
         processed = processes.processed.value + 0
-        while docs_read - processed > 10 * MAX_PARALLEL_OPS:
+        while docs_read - processed > 10 * no_jobs:
             await asyncio.sleep(0.01)
             processed = processes.processed.value + 0
 
@@ -311,5 +310,15 @@ async def main():
     print("All done, enjoy!")
 
 
-loop = asyncio.get_event_loop()
-loop.run_until_complete(main())
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+        "--jobs",
+        "-j",
+        help="Number of concurrent processing jobs to run. Default is %u." % MAX_PARALLEL_OPS,
+        type=int,
+        default=MAX_PARALLEL_OPS,
+    )
+    args = parser.parse_args()
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(main(args.jobs))