You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by hu...@apache.org on 2021/09/11 21:14:43 UTC
[incubator-ponymail-foal] branch master updated: Allow for graceful
handling of processing errors
This is an automated email from the ASF dual-hosted git repository.
humbedooh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git
The following commit(s) were added to refs/heads/master by this push:
new 14cb161 Allow for graceful handling of processing errors
14cb161 is described below
commit 14cb1616c356af2f37e39c31698234ef415f58c1
Author: Daniel Gruno <hu...@apache.org>
AuthorDate: Sat Sep 11 16:14:36 2021 -0500
Allow for graceful handling of processing errors
---
tools/migrate.py | 41 +++++++++++++++++++++++++++++++----------
1 file changed, 31 insertions(+), 10 deletions(-)
diff --git a/tools/migrate.py b/tools/migrate.py
index 6c9bad8..6aa11ff 100644
--- a/tools/migrate.py
+++ b/tools/migrate.py
@@ -34,6 +34,7 @@ import hashlib
import multiprocessing
import os
import time
+import sys
# Increment this number whenever breaking changes happen in the migration workflow:
MIGRATION_MAGIC_NUMBER = "2"
@@ -46,10 +47,11 @@ MAX_PARALLEL_OPS = max(min(int((cores + 1) * 0.75), cores - 1), 1)
class MultiDocProcessor:
"""MultiProcess document processor"""
- def __init__(self, old_es_url: str, new_es_url: str, target: callable, num_processes: int = 8):
+ def __init__(self, old_es_url: str, new_es_url: str, target: callable, num_processes: int = 8, graceful: bool = False):
self.processes = []
self.queues = []
self.target = target
+ self.graceful = graceful
self.manager = multiprocessing.Manager()
self.lock = self.manager.Lock()
self.processed = self.manager.Value("i", 0)
@@ -120,29 +122,36 @@ class MultiDocProcessor:
params = queue.get()
if params == "SIGHUP": # Push stragglers
if bulk_array:
- bulk_push(bulk_array, new_es)
+ bulk_push(bulk_array, new_es, self.graceful)
bulk_array[:] = []
continue
elif params == "DONE": # Close up shop completely
if bulk_array:
- bulk_push(bulk_array, new_es)
+ bulk_push(bulk_array, new_es, self.graceful)
old_es.close()
new_es.close()
return
else:
as_list = list(params)
as_list.insert(0, old_es)
- ret_val = self.target(*as_list)
+ try:
+ ret_val = self.target(*as_list)
+ except:
+ if self.graceful:
+ print("Unexpected error:", sys.exc_info()[0])
+ else:
+ print("Unexpected error:", sys.exc_info()[0])
+ raise
if ret_val:
bulk_array.extend(ret_val)
with self.lock:
self.processed.value += 1
if len(bulk_array) >= 200:
- bulk_push(bulk_array, new_es)
+ bulk_push(bulk_array, new_es, self.graceful)
bulk_array[:] = []
-def bulk_push(json, es):
+def bulk_push(json, es, graceful=False):
"""Pushes a bunch of objects to ES in a bulk operation"""
js_arr = []
for entry in json:
@@ -153,7 +162,13 @@ def bulk_push(json, es):
"_source": entry["body"],
}
js_arr.append(bulk_op)
- helpers.bulk(es, js_arr)
+ try:
+ helpers.bulk(es, js_arr)
+ except helpers.errors.BulkIndexError as e:
+ if graceful:
+ print("Bulk index error: %s" % e)
+ else:
+ raise
def process_document(old_es, doc, old_dbname, dbname_source, dbname_mbox, do_dkim):
@@ -212,7 +227,7 @@ def process_attachment(old_es, doc, dbname_attachment):
return ({"index": dbname_attachment, "id": doc["_id"], "body": doc["_source"]},)
-async def main(no_jobs):
+async def main(no_jobs, graceful):
print("Welcome to the Apache Pony Mail -> Foal migrator.")
print("This will copy your old database, adjust the structure, and insert the emails into your new foal database.")
print("We will be utilizing %u cores for this operation." % no_jobs)
@@ -283,7 +298,7 @@ async def main(no_jobs):
# Process attachments
start_time = time.time()
- processes = MultiDocProcessor(old_es_url, new_es_url, process_attachment, no_jobs)
+ processes = MultiDocProcessor(old_es_url, new_es_url, process_attachment, no_jobs, graceful)
docs_read = 0
count = await ols_es_async.count(index=old_dbname, doc_type="attachment")
no_att = count["count"]
@@ -326,6 +341,12 @@ if __name__ == "__main__":
type=int,
default=MAX_PARALLEL_OPS,
)
+ parser.add_argument(
+ "--graceful",
+ "-g",
+ help="Fail gracefully and continue if a processing error occurs",
+ action='store_true'
+ )
args = parser.parse_args()
loop = asyncio.get_event_loop()
- loop.run_until_complete(main(args.jobs))
+ loop.run_until_complete(main(args.jobs, arg.graceful))