You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by hu...@apache.org on 2021/09/06 18:23:08 UTC
[incubator-ponymail-foal] branch master updated: Switch to
multiprocessing for migration
This is an automated email from the ASF dual-hosted git repository.
humbedooh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git
The following commit(s) were added to refs/heads/master by this push:
new 68a9df4 Switch to multiprocessing for migration
68a9df4 is described below
commit 68a9df41bc4ab0d4500bbbfe5d1f28aeb24f3768
Author: Daniel Gruno <hu...@apache.org>
AuthorDate: Mon Sep 6 13:22:59 2021 -0500
Switch to multiprocessing for migration
This will utilize up to 12 cores by default. Might wanna automatically determine via cpuinfo and use 3/4 of all cores or such...
---
tools/migrate.py | 361 ++++++++++++++++++++++++++++++++++---------------------
1 file changed, 227 insertions(+), 134 deletions(-)
diff --git a/tools/migrate.py b/tools/migrate.py
index 980e089..d3cd792 100644
--- a/tools/migrate.py
+++ b/tools/migrate.py
@@ -18,56 +18,215 @@
# under the License.
import asyncio
-from elasticsearch import AsyncElasticsearch
+
+from elasticsearch import AsyncElasticsearch, Elasticsearch, helpers
from elasticsearch.helpers import async_scan
-from elasticsearch import helpers
+
if not __package__:
from plugins import generators
else:
from .plugins import generators
-import time
+
import base64
import hashlib
+import multiprocessing
+import time
# Increment this number whenever breaking changes happen in the migration workflow:
-MIGRATION_MAGIC_NUMBER = "1"
+MIGRATION_MAGIC_NUMBER = "2"
+
+# Max number of parallel conversions to perform before pushing
+MAX_PARALLEL_OPS = 12
+
+
+class MultiDocProcessor:
+ """MultiProcess document processor"""
+
+ def __init__(self, old_es_url: str, new_es_url: str, target: callable, num_processes: int = 8):
+ self.processes = []
+ self.queues = []
+ self.target = target
+ self.manager = multiprocessing.Manager()
+ self.lock = self.manager.Lock()
+ self.processed = self.manager.Value("i", 0)
+ self.processed_last_count = 0
+ self.start_time = time.time()
+ self.queue_pointer = 0
+ self.num_processes = num_processes
+ for i in range(0, num_processes):
+ q = multiprocessing.Queue()
+ p = multiprocessing.Process(
+ target=self.start,
+ args=(
+ q,
+ old_es_url,
+ new_es_url,
+ ),
+ )
+ self.queues.append(q)
+ self.processes.append(p)
+ p.start()
+
+ def feed(self, *params):
+ """Feed arguments to the next available processor"""
+ self.queues[self.queue_pointer].put(params)
+ self.queue_pointer += 1
+ self.queue_pointer %= self.num_processes
+
+ def sighup(self):
+ for queue in self.queues:
+ queue.put("SIGHUP")
+
+ def stop(self):
+ for queue in self.queues:
+ queue.put("DONE")
+ for proc in self.processes:
+ proc.join()
+
+ def status(self, total):
+ processed = self.processed.value
+ if processed - self.processed_last_count >= 1000:
+ self.processed_last_count = processed
+ now = time.time()
+ time_spent = now - self.start_time
+ docs_per_second = (processed / time_spent) or 1
+ time_left = (total - processed) / docs_per_second
+ # stringify time left
+ time_left_str = "%u seconds" % time_left
+ if time_left > 60:
+ time_left_str = "%u minute(s), %u second(s)" % (int(time_left / 60), time_left % 60)
+ if time_left > 3600:
+ time_left_str = "%u hour(s), %u minute(s), %u second(s)" % (
+ int(time_left / 3600),
+ int(time_left % 3600 / 60),
+ time_left % 60,
+ )
-async def bulk_push(json, es):
+ print(
+ "Processed %u documents, %u remain. ETA: %s (at %u documents per second)"
+ % (processed, (total - processed), time_left_str, docs_per_second)
+ )
+
+ def start(self, queue, old_es_url, new_es_url):
+ old_es = Elasticsearch([old_es_url])
+ new_es = Elasticsearch([new_es_url])
+ bulk_array = []
+ while True:
+ params = queue.get()
+ if params == "SIGHUP": # Push stragglers
+ if bulk_array:
+ bulk_push(bulk_array, new_es)
+ bulk_array[:] = []
+ continue
+ elif params == "DONE": # Close up shop completely
+ if bulk_array:
+ bulk_push(bulk_array, new_es)
+ old_es.close()
+ new_es.close()
+ return
+ else:
+ as_list = list(params)
+ as_list.insert(0, old_es)
+ ret_val = self.target(*as_list)
+ if ret_val:
+ bulk_array.extend(ret_val)
+ with self.lock:
+ self.processed.value += 1
+ if len(bulk_array) >= 200:
+ bulk_push(bulk_array, new_es)
+ bulk_array[:] = []
+
+
+def bulk_push(json, es):
"""Pushes a bunch of objects to ES in a bulk operation"""
js_arr = []
for entry in json:
bulk_op = {
"_op_type": "index",
- "_index": entry['index'],
- "_id": entry['id'],
- "_source": entry['body'],
+ "_index": entry["index"],
+ "_id": entry["id"],
+ "_source": entry["body"],
}
- js_arr.append(
- bulk_op
- )
- await helpers.async_bulk(es, js_arr)
+ js_arr.append(bulk_op)
+ helpers.bulk(es, js_arr)
+
+
+def process_document(old_es, doc, old_dbname, dbname_source, dbname_mbox, do_dkim):
+ now = time.time()
+ list_id = doc["_source"]["list_raw"].strip("<>")
+ try:
+ source = old_es.get(index=old_dbname, doc_type="mbox_source", id=doc["_id"])
+ # If we hit a 404 on a source, we have to fake an empty document, as we don't know the source.
+ except:
+ print("Source for %s was not found, faking it..." % doc["_id"])
+ source = {"_source": {"source": ""}}
+ source_text: str = source["_source"]["source"]
+ if ":" not in source_text: # Base64
+ source_text = base64.b64decode(source_text)
+ else: # bytify
+ source_text = source_text.encode("utf-8", "ignore")
+ if do_dkim:
+ dkim_id = generators.dkimid(None, None, list_id, None, source_text)
+ old_id = doc["_id"]
+ doc["_source"]["mid"] = dkim_id
+ doc["_source"]["permalinks"] = [dkim_id, old_id]
+ else:
+ doc["_source"]["permalinks"] = [doc["_id"]]
+
+ source["_source"]["permalinks"] = doc["_source"]["permalinks"]
+ doc["_source"]["dbid"] = hashlib.sha3_256(source_text).hexdigest()
+
+ # Append migration details to notes field in doc
+ notes = doc["_source"].get("_notes", [])
+ # We want a list, not a single string
+ if isinstance(notes, str):
+ notes = list(notes)
+ notes.append(
+ "MIGRATE: Document migrated from Pony Mail to Pony Mail Foal at %u, "
+ "using foal migrator v/%s" % (now, MIGRATION_MAGIC_NUMBER)
+ )
+ # If we re-indexed the document, make a note of that as well.
+ if do_dkim:
+ notes.append("REINDEX: Document re-indexed with DKIM_ID at %u, " "from %s to %s" % (now, dkim_id, old_id))
+ doc["_source"]["_notes"] = notes
+
+ # Copy to new DB
+ return (
+ {"index": dbname_mbox, "id": doc["_id"], "body": doc["_source"]},
+ {"index": dbname_source, "id": doc["_source"]["dbid"], "body": source["_source"]},
+ )
+
+
+def process_attachment(old_es, doc, dbname_attachment):
+ return {"index": dbname_attachment, "id": doc["_id"], "body": doc["_source"]},
async def main():
print("Welcome to the Apache Pony Mail -> Foal migrator.")
print("This will copy your old database, adjust the structure, and insert the emails into your new foal database.")
print("------------------------------------")
- old_es_url = input("Enter the full URL (including http/https) of your old ES server: ") or "http://localhost:9200/"
+ old_es_url = (
+ input("Enter the full URL (including http/https) of your old ES server: ") or "http://localhost:9200/"
+ )
new_es_url = input("Enter the full URL (including http/https) of your NEW ES server: ") or "http://localhost:9200/"
if old_es_url == new_es_url:
print("Old and new DB should not be the same, assuming error in input and exiting!")
return
- old_es = AsyncElasticsearch([old_es_url])
- new_es = AsyncElasticsearch([new_es_url])
+ ols_es_async = AsyncElasticsearch([old_es_url])
old_dbname = input("What is the database name for the old Pony Mail emails? [ponymail]: ") or "ponymail"
new_dbprefix = input("What is the database prefix for the new Pony Mail emails? [ponymail]: ") or "ponymail"
do_dkim = True
- dkim_txt = input("Do you wish to perform DKIM re-indexing of all emails? This will still preserve old permalinks "
- "(y/n) [y]: ") or "y"
- if dkim_txt.lower() == 'n':
+ dkim_txt = (
+ input(
+ "Do you wish to perform DKIM re-indexing of all emails? This will still preserve old permalinks "
+ "(y/n) [y]: "
+ )
+ or "y"
+ )
+ if dkim_txt.lower() == "n":
do_dkim = False
# Define index names for new ES
@@ -77,141 +236,75 @@ async def main():
# Let's get started..!
start_time = time.time()
- now = start_time
- processed = 0
- count = await old_es.count(index=old_dbname, doc_type="mbox")
- no_emails = count['count']
+ count = await ols_es_async.count(index=old_dbname, doc_type="mbox")
+ no_emails = count["count"]
print("------------------------------------")
print("Starting migration of %u emails, this may take quite a while..." % no_emails)
- bulk_array = []
+ processes = MultiDocProcessor(old_es_url, new_es_url, process_document, MAX_PARALLEL_OPS)
+ processed_last_count = 0
+ current_executor = 0
+ docs_read = 0
async for doc in async_scan(
- client=old_es,
- query={"query": {"match_all": {}}},
- doc_type="mbox",
- index=old_dbname,
+ client=ols_es_async,
+ query={"query": {"match_all": {}}},
+ doc_type="mbox",
+ index=old_dbname,
):
- list_id = doc['_source']['list_raw'].strip("<>")
- try:
- source = await old_es.get(index=old_dbname, doc_type="mbox_source", id=doc['_id'])
- # If we hit a 404 on a source, we have to fake an empty document, as we don't know the source.
- except:
- print("Source for %s was not found, faking it..." % doc['_id'])
- source = {
- '_source': {
- 'source': ""
- }
- }
- source_text: str = source['_source']['source']
- if ':' not in source_text: # Base64
- source_text = base64.b64decode(source_text)
- else: # bytify
- source_text = source_text.encode('utf-8', 'ignore')
- if do_dkim:
- dkim_id = generators.dkimid(None, None, list_id, None, source_text)
- old_id = doc['_id']
- doc['_source']['mid'] = dkim_id
- doc['_source']['permalinks'] = [
- dkim_id,
- old_id
- ]
- else:
- doc['_source']['permalinks'] = [
- doc['_id']
- ]
-
- source['_source']['permalinks'] = doc['_source']['permalinks']
- doc['_source']['dbid'] = hashlib.sha3_256(source_text).hexdigest()
-
- # Append migration details to notes field in doc
- notes = doc['_source'].get('_notes', [])
- # We want a list, not a single string
- if isinstance(notes, str):
- notes = list(notes)
- notes.append("MIGRATE: Document migrated from Pony Mail to Pony Mail Foal at %u, "
- "using foal migrator v/%s" % (now, MIGRATION_MAGIC_NUMBER))
- # If we re-indexed the document, make a note of that as well.
- if do_dkim:
- notes.append("REINDEX: Document re-indexed with DKIM_ID at %u, "
- "from %s to %s" % (now, dkim_id, old_id))
- doc['_source']['_notes'] = notes
-
- # Copy to new DB
- bulk_array.append({
- 'index': dbname_mbox,
- 'id': doc['_id'],
- 'body': doc['_source']
- })
- bulk_array.append({
- 'index': dbname_source,
- 'id': doc['_source']['dbid'],
- 'body': source['_source']
- })
-
- if len(bulk_array) > 100:
- await bulk_push(bulk_array, new_es)
- bulk_array[:] = []
-
- processed += 1
- if processed % 500 == 0:
- now = time.time()
- time_spent = now - start_time
- docs_per_second = processed / time_spent
- time_left = (no_emails - processed) / docs_per_second
+ docs_read += 1
+ processes.feed(doc, old_dbname, dbname_source, dbname_mbox, do_dkim)
+ # Don't speed too far ahead of processing...
+ processed = processes.processed.value
+ while docs_read - processed > 100 * MAX_PARALLEL_OPS:
+ await asyncio.sleep(0.01)
+ processed = processes.processed.value + 0
- # stringify time left
- time_left_str = "%u seconds" % time_left
- if time_left > 60:
- time_left_str = "%u minute(s), %u second(s)" % (int(time_left / 60), time_left % 60)
- if time_left > 3600:
- time_left_str = "%u hour(s), %u minute(s), %u second(s)" % (
- int(time_left / 3600), int(time_left % 3600 / 60), time_left % 60)
-
- print("Processed %u emails, %u remain. ETA: %s (at %u emails per second)" %
- (processed, (no_emails - processed), time_left_str, docs_per_second)
- )
+ processes.status(no_emails)
# There may be some docs left over to push
- if bulk_array:
- await bulk_push(bulk_array, new_es)
+ processes.sighup()
+ while processed < no_emails: # Wait for all documents to have been processed.
+ await asyncio.sleep(1)
+ print(f"Waiting for bulk push to complete ({processed} out of {no_emails} done...)")
+ processed = processes.processed.value
+ processes.stop()
+
+ # Process attachments
start_time = time.time()
- processed = 0
- count = await old_es.count(index=old_dbname, doc_type="attachment")
- no_att = count['count']
+ processes = MultiDocProcessor(old_es_url, new_es_url, process_attachment, MAX_PARALLEL_OPS)
+ docs_read = 0
+ count = await ols_es_async.count(index=old_dbname, doc_type="attachment")
+ no_att = count["count"]
print("Transferring %u attachments..." % no_att)
async for doc in async_scan(
- client=old_es,
- query={"query": {"match_all": {}}},
- doc_type="attachment",
- index=old_dbname,
+ client=ols_es_async,
+ query={"query": {"match_all": {}}},
+ doc_type="attachment",
+ index=old_dbname,
):
- # Copy to new DB
- await new_es.index(index=dbname_attachment, doc_type='_doc', id=doc['_id'], body=doc['_source'])
+ processes.feed(doc, dbname_attachment)
+ docs_read += 1
- processed += 1
- if processed % 500 == 0:
- now = time.time()
- time_spent = now - start_time
- docs_per_second = processed / time_spent
- time_left = (no_att - processed) / docs_per_second
+ # Don't speed ahead
+ processed = processes.processed.value + 0
+ while docs_read - processed > 10 * MAX_PARALLEL_OPS:
+ await asyncio.sleep(0.01)
+ processed = processes.processed.value + 0
- # stringify time left
- time_left_str = "%u seconds" % time_left
- if time_left > 60:
- time_left_str = "%u minute(s), %u second(s)" % (int(time_left / 60), time_left % 60)
- if time_left > 3600:
- time_left_str = "%u hour(s), %u minute(s), %u second(s)" % (
- int(time_left / 3600), int(time_left % 3600 / 60), time_left % 60)
+ processes.status(no_att)
- print("Processed %u emails, %u remain. ETA: %s (at %u attachments per second)" %
- (processed, (no_att - processed), time_left_str, docs_per_second)
- )
+ # There may be some docs left over to push
+ processes.sighup()
+ while processed < no_att: # Wait for all attachments to have been processed.
+ await asyncio.sleep(1)
+ print(f"Waiting for bulk push to complete ({processed} out of {no_att} done...)")
+ processed = processes.processed.value
- await old_es.close()
- await new_es.close()
+ processes.stop()
+ await ols_es_async.close()
print("All done, enjoy!")