You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by hu...@apache.org on 2021/09/07 02:14:47 UTC
[incubator-ponymail-foal] branch master updated: Make no. of jobs
configurable at runtime, lint
This is an automated email from the ASF dual-hosted git repository.
humbedooh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git
The following commit(s) were added to refs/heads/master by this push:
new 894bd07 Make no. of jobs configurable at runtime, lint
894bd07 is described below
commit 894bd0750438d5617e95651d4a3d7307eef61ea8
Author: Daniel Gruno <hu...@apache.org>
AuthorDate: Mon Sep 6 21:14:40 2021 -0500
Make no. of jobs configurable at runtime, lint
---
tools/migrate.py | 39 ++++++++++++++++++++++++---------------
1 file changed, 24 insertions(+), 15 deletions(-)
diff --git a/tools/migrate.py b/tools/migrate.py
index ff295f0..160810a 100644
--- a/tools/migrate.py
+++ b/tools/migrate.py
@@ -27,18 +27,19 @@ if not __package__:
else:
from .plugins import generators
+import argparse
import base64
import hashlib
import multiprocessing
-import time
import os
+import time
# Increment this number whenever breaking changes happen in the migration workflow:
MIGRATION_MAGIC_NUMBER = "2"
# Max number of parallel conversions to perform before pushing. 75-ish percent of max cores.
-cores = os.cpu_count()
-MAX_PARALLEL_OPS = max(min(int((cores + 1) * 0.75), cores-1), 1)
+cores = len(os.sched_getaffinity(0))
+MAX_PARALLEL_OPS = max(min(int((cores + 1) * 0.75), cores - 1), 1)
class MultiDocProcessor:
@@ -201,17 +202,15 @@ def process_document(old_es, doc, old_dbname, dbname_source, dbname_mbox, do_dki
def process_attachment(old_es, doc, dbname_attachment):
- return {"index": dbname_attachment, "id": doc["_id"], "body": doc["_source"]},
+ return ({"index": dbname_attachment, "id": doc["_id"], "body": doc["_source"]},)
-async def main():
+async def main(no_jobs):
print("Welcome to the Apache Pony Mail -> Foal migrator.")
print("This will copy your old database, adjust the structure, and insert the emails into your new foal database.")
- print("We will be utilizing %u cores for this operation." % MAX_PARALLEL_OPS)
+ print("We will be utilizing %u cores for this operation." % no_jobs)
print("------------------------------------")
- old_es_url = (
- input("Enter the full URL (including http/https) of your old ES server: ") or "http://localhost:9200/"
- )
+ old_es_url = input("Enter the full URL (including http/https) of your old ES server: ") or "http://localhost:9200/"
new_es_url = input("Enter the full URL (including http/https) of your NEW ES server: ") or "http://localhost:9200/"
if old_es_url == new_es_url:
print("Old and new DB should not be the same, assuming error in input and exiting!")
@@ -245,7 +244,7 @@ async def main():
print("------------------------------------")
print("Starting migration of %u emails, this may take quite a while..." % no_emails)
- processes = MultiDocProcessor(old_es_url, new_es_url, process_document, MAX_PARALLEL_OPS)
+ processes = MultiDocProcessor(old_es_url, new_es_url, process_document, no_jobs)
processed_last_count = 0
current_executor = 0
@@ -260,7 +259,7 @@ async def main():
processes.feed(doc, old_dbname, dbname_source, dbname_mbox, do_dkim)
# Don't speed too far ahead of processing...
processed = processes.processed.value
- while docs_read - processed > 100 * MAX_PARALLEL_OPS:
+ while docs_read - processed > 100 * no_jobs:
await asyncio.sleep(0.01)
processed = processes.processed.value + 0
@@ -277,7 +276,7 @@ async def main():
# Process attachments
start_time = time.time()
- processes = MultiDocProcessor(old_es_url, new_es_url, process_attachment, MAX_PARALLEL_OPS)
+ processes = MultiDocProcessor(old_es_url, new_es_url, process_attachment, no_jobs)
docs_read = 0
count = await ols_es_async.count(index=old_dbname, doc_type="attachment")
no_att = count["count"]
@@ -293,7 +292,7 @@ async def main():
# Don't speed ahead
processed = processes.processed.value + 0
- while docs_read - processed > 10 * MAX_PARALLEL_OPS:
+ while docs_read - processed > 10 * no_jobs:
await asyncio.sleep(0.01)
processed = processes.processed.value + 0
@@ -311,5 +310,15 @@ async def main():
print("All done, enjoy!")
-loop = asyncio.get_event_loop()
-loop.run_until_complete(main())
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--jobs",
+ "-j",
+ help="Number of concurrent processing jobs to run. Default is %u." % MAX_PARALLEL_OPS,
+ type=int,
+ default=MAX_PARALLEL_OPS,
+ )
+ args = parser.parse_args()
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(main(args.jobs))