You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by to...@apache.org on 2022/07/14 02:26:48 UTC

[lucene-jira-archive] branch main updated: Allow to specify number of worker processes for jira2github_import.py (#41)

This is an automated email from the ASF dual-hosted git repository.

tomoko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/lucene-jira-archive.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cc53ad8 Allow to specify number of worker processes for jira2github_import.py (#41)
5cc53ad8 is described below

commit 5cc53ad8d166ff5823235e2d82f0fcd997429d92
Author: Tomoko Uchida <to...@gmail.com>
AuthorDate: Thu Jul 14 11:26:43 2022 +0900

    Allow to specify number of worker processes for jira2github_import.py (#41)
    
    * allow to specify number of workers for jira2github_import
    
    * remove obsolete code
    
    * try to make it work on windows (not tested...)
    
    * properly name the function that are passed to Pool.apply_async() as 'task'
---
 migration/src/common.py             | 53 ++++++++++++++++++++++++++++++++++---
 migration/src/jira2github_import.py | 51 ++++++++++++++++++++++-------------
 2 files changed, 82 insertions(+), 22 deletions(-)

diff --git a/migration/src/common.py b/migration/src/common.py
index e4d4727e..ee37a6a5 100644
--- a/migration/src/common.py
+++ b/migration/src/common.py
@@ -4,7 +4,8 @@ from datetime import datetime
 import functools
 import time
 import os
-import tempfile
+import multiprocessing
+from logging.handlers import QueueHandler
 
 LOG_DIRNAME = "log"
 
@@ -19,25 +20,69 @@ ACCOUNT_MAPPING_FILENAME = "account-map.csv"
 
 ASF_JIRA_BASE_URL = "https://issues.apache.org/jira/browse"
 
+LOGGING_FOMATTER = logging.Formatter("[%(asctime)s] %(levelname)s:%(module)s: %(message)s")
 
 logging.basicConfig(level=logging.DEBUG, handlers=[])
 
 def logging_setup(log_dir: Path, name: str) -> logging.Logger:
     if not log_dir.exists():
         log_dir.mkdir()
-    formatter = logging.Formatter("[%(asctime)s] %(levelname)s:%(module)s: %(message)s")
     file_handler = logging.FileHandler(log_dir.joinpath(f'{name}_{datetime.now().isoformat(timespec="seconds")}.log'))
     file_handler.setLevel(logging.DEBUG)
-    file_handler.setFormatter(formatter)
+    file_handler.setFormatter(LOGGING_FOMATTER)
     console_handler = logging.StreamHandler()
     console_handler.setLevel(logging.INFO)
-    console_handler.setFormatter(formatter)
+    console_handler.setFormatter(LOGGING_FOMATTER)
     logger = logging.getLogger(name)
+    logger.handlers = []  # clear current handlers
     logger.addHandler(file_handler)
     logger.addHandler(console_handler)
     return logger
 
 
+# helper to support logging to a single file from multiple processes
+# https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
+def log_listener(log_dir: Path, name: str) -> tuple[multiprocessing.Process, multiprocessing.Queue]:
+
+    def listener_process(queue: multiprocessing.Queue, path: Path):
+        file_handler = logging.FileHandler(path)
+        file_handler.setLevel(logging.DEBUG)
+        file_handler.setFormatter(LOGGING_FOMATTER)
+        console_handler = logging.StreamHandler()
+        console_handler.setLevel(logging.INFO)
+        console_handler.setFormatter(LOGGING_FOMATTER)
+        root = logging.getLogger()
+        root.addHandler(file_handler)
+        root.addHandler(console_handler)
+
+        while True:
+            try:
+                record: logging.LogRecord = queue.get()
+                if record is None:  # sentinel
+                    break
+                logger = logging.getLogger(record.name)
+                logger.handle(record)
+            except Exception:
+                import sys, traceback
+                print('Whoops! Problem:', file=sys.stderr)
+                traceback.print_exc(file=sys.stderr)
+
+    if not log_dir.exists():
+        log_dir.mkdir()
+    path = log_dir.joinpath(f'{name}_{datetime.now().isoformat(timespec="seconds")}.log')
+    queue = multiprocessing.Queue(-1)
+    listener = multiprocessing.Process(target=listener_process, args=(queue, path))
+    return (listener, queue)
+
+
+def logging_setup_worker(queue: multiprocessing.Queue):
+    logger = logging.getLogger()
+    queue_handler = QueueHandler(queue)
+    logger.handlers = []  # clear current handlers
+    logger.addHandler(queue_handler)
+    logger.setLevel(logging.DEBUG)
+
+
 def jira_issue_url(issue_id: str) -> str:
     return ASF_JIRA_BASE_URL + f"/{issue_id}"
 
diff --git a/migration/src/jira2github_import.py b/migration/src/jira2github_import.py
index caff59b5..9262d3e3 100644
--- a/migration/src/jira2github_import.py
+++ b/migration/src/jira2github_import.py
@@ -1,11 +1,12 @@
 #
 # Convert Jira issues to GitHub issues for Import Issues API (https://gist.github.com/jonmagic/5282384165e0f86ef105)
 # Usage:
-#   python src/jira2github_import.py --issues <jira issue number list>
-#   python src/jira2github_import.py --min <min issue number> --max <max issue number>
+#   python src/jira2github_import.py --issues <jira issue number list> [--num-workers <# worker processes>]
+#   python src/jira2github_import.py --min <min issue number> --max <max issue number> [--num-workers <# worker processes>]
 #
 
 import argparse
+from logging import Logger
 from pathlib import Path
 import json
 import sys
@@ -13,30 +14,23 @@ from urllib.parse import quote
 import dateutil.parser
 import os
 import traceback
+import multiprocessing
 
-from common import LOG_DIRNAME, JIRA_DUMP_DIRNAME, GITHUB_IMPORT_DATA_DIRNAME, MAPPINGS_DATA_DIRNAME, ACCOUNT_MAPPING_FILENAME, ISSUE_TYPE_TO_LABEL_MAP, COMPONENT_TO_LABEL_MAP, \
-    logging_setup, jira_issue_url, jira_dump_file, jira_issue_id, github_data_file, make_github_title, read_account_map
+from common import *
 from jira_util import *
 
-log_dir = Path(__file__).resolve().parent.parent.joinpath(LOG_DIRNAME)
-logger = logging_setup(log_dir, "jira2github_import")
-
 
 def attachment_url(issue_num: int, filename: str, att_repo: str, att_branch: str) -> str:
     return f"https://raw.githubusercontent.com/{att_repo}/{att_branch}/attachments/{jira_issue_id(issue_num)}/{quote(filename)}"
 
 
-#def may_markup(gh_account: str) -> bool:
-#    return gh_account if gh_account in ["@mocobeta", "@dweiss"] else f"`{gh_account}`"
-
-
 def jira_timestamp_to_github_timestamp(ts: str) -> str:
     # convert Jira timestamp format to GitHub acceptable format
     # e.g., "2006-06-06T06:24:38.000+0000" -> "2006-06-06T06:24:38Z"
     return ts[:-9] + "Z"
 
 
-def convert_issue(num: int, dump_dir: Path, output_dir: Path, account_map: dict[str, str], att_repo: str, att_branch: str) -> bool:
+def convert_issue(num: int, dump_dir: Path, output_dir: Path, account_map: dict[str, str], att_repo: str, att_branch: str, logger: Logger) -> bool:
     jira_id = jira_issue_id(num)
     dump_file = jira_dump_file(dump_dir, num)
     if not dump_file.exists():
@@ -213,11 +207,12 @@ if __name__ == "__main__":
     parser.add_argument('--issues', type=int, required=False, nargs='*', help='Jira issue number list to be downloaded')
     parser.add_argument('--min', type=int, dest='min', required=False, default=1, help='Minimum Jira issue number to be converted')
     parser.add_argument('--max', type=int, dest='max', required=False, help='Maximum Jira issue number to be converted')
+    parser.add_argument('--num_workers', type=int, dest='num_workers', required=False, default=1, help='Number of worker processes')
     args = parser.parse_args()
 
     dump_dir = Path(__file__).resolve().parent.parent.joinpath(JIRA_DUMP_DIRNAME)
     if not dump_dir.exists():
-        logger.error(f"Jira dump dir not exists: {dump_dir}")
+        print(f"Jira dump dir not exists: {dump_dir}")
         sys.exit(1)
 
     mappings_dir = Path(__file__).resolve().parent.parent.joinpath(MAPPINGS_DATA_DIRNAME)
@@ -238,14 +233,34 @@ if __name__ == "__main__":
             issues.extend(list(range(args.min, args.max + 1)))
         else:
             issues.append(args.min)
+    num_workers = args.num_workers
+
+    log_dir = Path(__file__).resolve().parent.parent.joinpath(LOG_DIRNAME)
+    name = "jira2github_import"
+    (listener, queue) = log_listener(log_dir, name)
+    listener.start()
+    logging_setup_worker(queue)
+    logger = logging.getLogger(name)
 
-    logger.info(f"Converting Jira issues to GitHub issues in {output_dir}")
-    for num in issues:
+    logger.info(f"Converting Jira issues to GitHub issues in {output_dir}. num_workers={num_workers}")
+
+    def task(num):
+        logger = logging.getLogger(name)
         try:
-            convert_issue(num, dump_dir, output_dir, account_map, github_att_repo, github_att_branch)
+            convert_issue(num, dump_dir, output_dir, account_map, github_att_repo, github_att_branch, logger)
         except Exception as e:
             logger.error(traceback.format_exc(limit=100))
             logger.error(f"Failed to convert Jira issue. An error '{str(e)}' occurred; skipped {jira_issue_id(num)}.")
-    
-    logger.info("Done.")
 
+    results = []
+    # Try to support Windows: The worker configuration is done at the start of the worker process run.
+    with multiprocessing.Pool(num_workers, initializer=logging_setup_worker, initargs=(queue,)) as pool:
+        for num in issues:
+            result = pool.apply_async(task, (num,))
+            results.append(result)
+        for res in results:
+            res.get()
+
+    logger.info("Done.")
+    queue.put_nowait(None)
+    listener.join()