You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by to...@apache.org on 2022/07/13 11:16:36 UTC

[lucene-jira-archive] 01/01: allow to specify number of workers for jira2github_import

This is an automated email from the ASF dual-hosted git repository.

tomoko pushed a commit to branch parallelize-jira2github
in repository https://gitbox.apache.org/repos/asf/lucene-jira-archive.git

commit 4e4e24c0c2b7dd7334a66dc6d85ee656260b9904
Author: Tomoko Uchida <to...@gmail.com>
AuthorDate: Wed Jul 13 20:16:24 2022 +0900

    allow to specify number of workers for jira2github_import
---
 migration/src/common.py             | 52 ++++++++++++++++++++++++++++++++++---
 migration/src/jira2github_import.py | 50 ++++++++++++++++++++++-------------
 2 files changed, 80 insertions(+), 22 deletions(-)

diff --git a/migration/src/common.py b/migration/src/common.py
index e4d4727e..5da0536e 100644
--- a/migration/src/common.py
+++ b/migration/src/common.py
@@ -4,7 +4,8 @@ from datetime import datetime
 import functools
 import time
 import os
-import tempfile
+import multiprocessing
+from logging.handlers import QueueHandler
 
 LOG_DIRNAME = "log"
 
@@ -19,25 +20,68 @@ ACCOUNT_MAPPING_FILENAME = "account-map.csv"
 
 ASF_JIRA_BASE_URL = "https://issues.apache.org/jira/browse"
 
+LOGGING_FOMATTER = logging.Formatter("[%(asctime)s] %(levelname)s:%(module)s: %(message)s")
 
 logging.basicConfig(level=logging.DEBUG, handlers=[])
 
 def logging_setup(log_dir: Path, name: str) -> logging.Logger:
     if not log_dir.exists():
         log_dir.mkdir()
-    formatter = logging.Formatter("[%(asctime)s] %(levelname)s:%(module)s: %(message)s")
     file_handler = logging.FileHandler(log_dir.joinpath(f'{name}_{datetime.now().isoformat(timespec="seconds")}.log'))
     file_handler.setLevel(logging.DEBUG)
-    file_handler.setFormatter(formatter)
+    file_handler.setFormatter(LOGGING_FOMATTER)
     console_handler = logging.StreamHandler()
     console_handler.setLevel(logging.INFO)
-    console_handler.setFormatter(formatter)
+    console_handler.setFormatter(LOGGING_FOMATTER)
     logger = logging.getLogger(name)
     logger.addHandler(file_handler)
     logger.addHandler(console_handler)
     return logger
 
 
+# helper to support logging to a single file from multiple processes
+# https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
+def log_listener(log_dir: Path, name: str) -> tuple[multiprocessing.Process, multiprocessing.Queue]:
+
+    def listener_process(queue: multiprocessing.Queue, path: Path):
+        file_handler = logging.FileHandler(path)
+        file_handler.setLevel(logging.DEBUG)
+        file_handler.setFormatter(LOGGING_FOMATTER)
+        console_handler = logging.StreamHandler()
+        console_handler.setLevel(logging.INFO)
+        console_handler.setFormatter(LOGGING_FOMATTER)
+        root = logging.getLogger()
+        root.addHandler(file_handler)
+        root.addHandler(console_handler)
+
+        while True:
+            try:
+                record: logging.LogRecord = queue.get()
+                if record is None:  # sentinel
+                    break
+                logger = logging.getLogger(record.name)
+                logger.handle(record)
+            except Exception:
+                import sys, traceback
+                print('Whoops! Problem:', file=sys.stderr)
+                traceback.print_exc(file=sys.stderr)
+
+    if not log_dir.exists():
+        log_dir.mkdir()
+    path = log_dir.joinpath(f'{name}_{datetime.now().isoformat(timespec="seconds")}.log')
+    queue = multiprocessing.Queue(-1)
+    listener = multiprocessing.Process(target=listener_process, args=(queue, path))
+    return (listener, queue)
+
+
+def logging_setup_worker(name: str, queue: multiprocessing.Queue) -> logging.Logger:
+    logger = logging.getLogger(name)
+    queue_handler = QueueHandler(queue)
+    logger.addHandler(queue_handler)
+    logger.setLevel(logging.DEBUG)
+    return logger
+
+
 def jira_issue_url(issue_id: str) -> str:
     return ASF_JIRA_BASE_URL + f"/{issue_id}"
 
diff --git a/migration/src/jira2github_import.py b/migration/src/jira2github_import.py
index 233ccd15..86d9792c 100644
--- a/migration/src/jira2github_import.py
+++ b/migration/src/jira2github_import.py
@@ -1,11 +1,12 @@
 #
 # Convert Jira issues to GitHub issues for Import Issues API (https://gist.github.com/jonmagic/5282384165e0f86ef105)
 # Usage:
-#   python src/jira2github_import.py --issues <jira issue number list>
-#   python src/jira2github_import.py --min <min issue number> --max <max issue number>
+#   python src/jira2github_import.py --issues <jira issue number list> [--num-workers <# worker processes>]
+#   python src/jira2github_import.py --min <min issue number> --max <max issue number> [--num-workers <# worker processes>]
 #
 
 import argparse
+from logging import Logger
 from pathlib import Path
 import json
 import sys
@@ -13,30 +14,26 @@ from urllib.parse import quote
 import dateutil.parser
 import os
 import traceback
+import multiprocessing
 
-from common import LOG_DIRNAME, JIRA_DUMP_DIRNAME, GITHUB_IMPORT_DATA_DIRNAME, MAPPINGS_DATA_DIRNAME, ACCOUNT_MAPPING_FILENAME, ISSUE_TYPE_TO_LABEL_MAP, COMPONENT_TO_LABEL_MAP, \
-    logging_setup, jira_issue_url, jira_dump_file, jira_issue_id, github_data_file, make_github_title, read_account_map
+from common import *
 from jira_util import *
 
-log_dir = Path(__file__).resolve().parent.parent.joinpath(LOG_DIRNAME)
-logger = logging_setup(log_dir, "jira2github_import")
+#log_dir = Path(__file__).resolve().parent.parent.joinpath(LOG_DIRNAME)
+#logger = logging_setup(log_dir, "jira2github_import")
 
 
 def attachment_url(issue_num: int, filename: str, att_repo: str, att_branch: str) -> str:
     return f"https://raw.githubusercontent.com/{att_repo}/{att_branch}/attachments/{jira_issue_id(issue_num)}/{quote(filename)}"
 
 
-#def may_markup(gh_account: str) -> bool:
-#    return gh_account if gh_account in ["@mocobeta", "@dweiss"] else f"`{gh_account}`"
-
-
 def jira_timestamp_to_github_timestamp(ts: str) -> str:
     # convert Jira timestamp format to GitHub acceptable format
     # e.g., "2006-06-06T06:24:38.000+0000" -> "2006-06-06T06:24:38Z"
     return ts[:-9] + "Z"
 
 
-def convert_issue(num: int, dump_dir: Path, output_dir: Path, account_map: dict[str, str], att_repo: str, att_branch: str) -> bool:
+def convert_issue(num: int, dump_dir: Path, output_dir: Path, account_map: dict[str, str], att_repo: str, att_branch: str, logger: Logger) -> bool:
     jira_id = jira_issue_id(num)
     dump_file = jira_dump_file(dump_dir, num)
     if not dump_file.exists():
@@ -73,7 +70,6 @@ def convert_issue(num: int, dump_dir: Path, output_dir: Path, account_map: dict[
         for (filename, cnt) in attachments:
             attachment_list_items.append(f"[{filename}]({attachment_url(num, filename, att_repo, att_branch)})" + (f" (versions: {cnt})" if cnt > 1 else ""))
             att_replace_map[filename] = attachment_url(num, filename, att_repo, att_branch)
-            print(f'{jira_id}: attachments: {attachment_list_items}')
 
         # embed github issue number next to linked issue keys
         linked_issues_list_items = []
@@ -212,11 +208,12 @@ if __name__ == "__main__":
     parser.add_argument('--issues', type=int, required=False, nargs='*', help='Jira issue number list to be downloaded')
     parser.add_argument('--min', type=int, dest='min', required=False, default=1, help='Minimum Jira issue number to be converted')
     parser.add_argument('--max', type=int, dest='max', required=False, help='Maximum Jira issue number to be converted')
+    parser.add_argument('--num_workers', type=int, dest='num_workers', required=False, default=1, help='Number of worker processes')
     args = parser.parse_args()
 
     dump_dir = Path(__file__).resolve().parent.parent.joinpath(JIRA_DUMP_DIRNAME)
     if not dump_dir.exists():
-        logger.error(f"Jira dump dir not exists: {dump_dir}")
+        print(f"Jira dump dir not exists: {dump_dir}")
         sys.exit(1)
 
     mappings_dir = Path(__file__).resolve().parent.parent.joinpath(MAPPINGS_DATA_DIRNAME)
@@ -237,14 +234,31 @@ if __name__ == "__main__":
             issues.extend(list(range(args.min, args.max + 1)))
         else:
             issues.append(args.min)
+    num_workers = args.num_workers
+
+    log_dir = Path(__file__).resolve().parent.parent.joinpath(LOG_DIRNAME)
+    name = "jira2github_import"
+    (listener, queue) = log_listener(log_dir, name)
+    listener.start()
+    logger = logging_setup_worker(name, queue)
+
+    logger.info(f"Converting Jira issues to GitHub issues in {output_dir}. num_workers={num_workers}")
 
-    logger.info(f"Converting Jira issues to GitHub issues in {output_dir}")
-    for num in issues:
+    def worker(num):
         try:
-            convert_issue(num, dump_dir, output_dir, account_map, github_att_repo, github_att_branch)
+            convert_issue(num, dump_dir, output_dir, account_map, github_att_repo, github_att_branch, logger)
         except Exception as e:
             logger.error(traceback.format_exc(limit=100))
             logger.error(f"Failed to convert Jira issue. An error '{str(e)}' occurred; skipped {jira_issue_id(num)}.")
-    
-    logger.info("Done.")
 
+    results = []
+    with multiprocessing.Pool(num_workers) as pool:
+        for num in issues:
+            result = pool.apply_async(worker, (num,))
+            results.append(result)
+        for res in results:
+            res.get()
+
+    logger.info("Done.")
+    queue.put_nowait(None)
+    listener.join()