You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ponymail.apache.org by hu...@apache.org on 2021/09/11 21:14:43 UTC

[incubator-ponymail-foal] branch master updated: Allow for graceful handling of processing errors

This is an automated email from the ASF dual-hosted git repository.

humbedooh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git


The following commit(s) were added to refs/heads/master by this push:
     new 14cb161  Allow for graceful handling of processing errors
14cb161 is described below

commit 14cb1616c356af2f37e39c31698234ef415f58c1
Author: Daniel Gruno <hu...@apache.org>
AuthorDate: Sat Sep 11 16:14:36 2021 -0500

    Allow for graceful handling of processing errors
---
 tools/migrate.py | 41 +++++++++++++++++++++++++++++++----------
 1 file changed, 31 insertions(+), 10 deletions(-)

diff --git a/tools/migrate.py b/tools/migrate.py
index 6c9bad8..6aa11ff 100644
--- a/tools/migrate.py
+++ b/tools/migrate.py
@@ -34,6 +34,7 @@ import hashlib
 import multiprocessing
 import os
 import time
+import sys
 
 # Increment this number whenever breaking changes happen in the migration workflow:
 MIGRATION_MAGIC_NUMBER = "2"
@@ -46,10 +47,11 @@ MAX_PARALLEL_OPS = max(min(int((cores + 1) * 0.75), cores - 1), 1)
 class MultiDocProcessor:
     """MultiProcess document processor"""
 
-    def __init__(self, old_es_url: str, new_es_url: str, target: callable, num_processes: int = 8):
+    def __init__(self, old_es_url: str, new_es_url: str, target: callable, num_processes: int = 8, graceful: bool = False):
         self.processes = []
         self.queues = []
         self.target = target
+        self.graceful = graceful
         self.manager = multiprocessing.Manager()
         self.lock = self.manager.Lock()
         self.processed = self.manager.Value("i", 0)
@@ -120,29 +122,36 @@ class MultiDocProcessor:
             params = queue.get()
             if params == "SIGHUP":  # Push stragglers
                 if bulk_array:
-                    bulk_push(bulk_array, new_es)
+                    bulk_push(bulk_array, new_es, self.graceful)
                     bulk_array[:] = []
                 continue
             elif params == "DONE":  # Close up shop completely
                 if bulk_array:
-                    bulk_push(bulk_array, new_es)
+                    bulk_push(bulk_array, new_es, self.graceful)
                 old_es.close()
                 new_es.close()
                 return
             else:
                 as_list = list(params)
                 as_list.insert(0, old_es)
-                ret_val = self.target(*as_list)
+                try:
+                    ret_val = self.target(*as_list)
+                except:
+                    if self.graceful:
+                        print("Unexpected error:", sys.exc_info()[0])
+                    else:
+                        print("Unexpected error:", sys.exc_info()[0])
+                        raise
                 if ret_val:
                     bulk_array.extend(ret_val)
                 with self.lock:
                     self.processed.value += 1
                 if len(bulk_array) >= 200:
-                    bulk_push(bulk_array, new_es)
+                    bulk_push(bulk_array, new_es, self.graceful)
                     bulk_array[:] = []
 
 
-def bulk_push(json, es):
+def bulk_push(json, es, graceful=False):
     """Pushes a bunch of objects to ES in a bulk operation"""
     js_arr = []
     for entry in json:
@@ -153,7 +162,13 @@ def bulk_push(json, es):
             "_source": entry["body"],
         }
         js_arr.append(bulk_op)
-    helpers.bulk(es, js_arr)
+    try:
+        helpers.bulk(es, js_arr)
+    except helpers.errors.BulkIndexError as e:
+        if graceful:
+            print("Bulk index error: %s" % e)
+        else:
+            raise
 
 
 def process_document(old_es, doc, old_dbname, dbname_source, dbname_mbox, do_dkim):
@@ -212,7 +227,7 @@ def process_attachment(old_es, doc, dbname_attachment):
     return ({"index": dbname_attachment, "id": doc["_id"], "body": doc["_source"]},)
 
 
-async def main(no_jobs):
+async def main(no_jobs, graceful):
     print("Welcome to the Apache Pony Mail -> Foal migrator.")
     print("This will copy your old database, adjust the structure, and insert the emails into your new foal database.")
     print("We will be utilizing %u cores for this operation." % no_jobs)
@@ -283,7 +298,7 @@ async def main(no_jobs):
 
     # Process attachments
     start_time = time.time()
-    processes = MultiDocProcessor(old_es_url, new_es_url, process_attachment, no_jobs)
+    processes = MultiDocProcessor(old_es_url, new_es_url, process_attachment, no_jobs, graceful)
     docs_read = 0
     count = await ols_es_async.count(index=old_dbname, doc_type="attachment")
     no_att = count["count"]
@@ -326,6 +341,12 @@ if __name__ == "__main__":
         type=int,
         default=MAX_PARALLEL_OPS,
     )
+    parser.add_argument(
+        "--graceful",
+        "-g",
+        help="Fail gracefully and continue if a processing error occurs",
+        action='store_true'
+    )
     args = parser.parse_args()
     loop = asyncio.get_event_loop()
-    loop.run_until_complete(main(args.jobs))
+    loop.run_until_complete(main(args.jobs, arg.graceful))