You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by to...@apache.org on 2022/07/13 11:16:35 UTC

[lucene-jira-archive] branch parallelize-jira2github created (now 4e4e24c0)

This is an automated email from the ASF dual-hosted git repository.

tomoko pushed a change to branch parallelize-jira2github
in repository https://gitbox.apache.org/repos/asf/lucene-jira-archive.git


      at 4e4e24c0 allow to specify number of workers for jira2github_import

This branch includes the following new commits:

     new 4e4e24c0 allow to specify number of workers for jira2github_import

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[lucene-jira-archive] 01/01: allow to specify number of workers for jira2github_import

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tomoko pushed a commit to branch parallelize-jira2github
in repository https://gitbox.apache.org/repos/asf/lucene-jira-archive.git

commit 4e4e24c0c2b7dd7334a66dc6d85ee656260b9904
Author: Tomoko Uchida <to...@gmail.com>
AuthorDate: Wed Jul 13 20:16:24 2022 +0900

    allow to specify number of workers for jira2github_import
---
 migration/src/common.py             | 52 ++++++++++++++++++++++++++++++++++---
 migration/src/jira2github_import.py | 50 ++++++++++++++++++++++-------------
 2 files changed, 80 insertions(+), 22 deletions(-)

diff --git a/migration/src/common.py b/migration/src/common.py
index e4d4727e..5da0536e 100644
--- a/migration/src/common.py
+++ b/migration/src/common.py
@@ -4,7 +4,8 @@ from datetime import datetime
 import functools
 import time
 import os
-import tempfile
+import multiprocessing
+from logging.handlers import QueueHandler
 
 LOG_DIRNAME = "log"
 
@@ -19,25 +20,68 @@ ACCOUNT_MAPPING_FILENAME = "account-map.csv"
 
 ASF_JIRA_BASE_URL = "https://issues.apache.org/jira/browse"
 
+LOGGING_FOMATTER = logging.Formatter("[%(asctime)s] %(levelname)s:%(module)s: %(message)s")
 
 logging.basicConfig(level=logging.DEBUG, handlers=[])
 
 def logging_setup(log_dir: Path, name: str) -> logging.Logger:
     if not log_dir.exists():
         log_dir.mkdir()
-    formatter = logging.Formatter("[%(asctime)s] %(levelname)s:%(module)s: %(message)s")
     file_handler = logging.FileHandler(log_dir.joinpath(f'{name}_{datetime.now().isoformat(timespec="seconds")}.log'))
     file_handler.setLevel(logging.DEBUG)
-    file_handler.setFormatter(formatter)
+    file_handler.setFormatter(LOGGING_FOMATTER)
     console_handler = logging.StreamHandler()
     console_handler.setLevel(logging.INFO)
-    console_handler.setFormatter(formatter)
+    console_handler.setFormatter(LOGGING_FOMATTER)
     logger = logging.getLogger(name)
     logger.addHandler(file_handler)
     logger.addHandler(console_handler)
     return logger
 
 
+# helper to support logging to a single file from multiple processes
+# https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
+def log_listener(log_dir: Path, name: str) -> tuple[multiprocessing.Process, multiprocessing.Queue]:
+
+    def listener_process(queue: multiprocessing.Queue, path: Path):
+        file_handler = logging.FileHandler(path)
+        file_handler.setLevel(logging.DEBUG)
+        file_handler.setFormatter(LOGGING_FOMATTER)
+        console_handler = logging.StreamHandler()
+        console_handler.setLevel(logging.INFO)
+        console_handler.setFormatter(LOGGING_FOMATTER)
+        root = logging.getLogger()
+        root.addHandler(file_handler)
+        root.addHandler(console_handler)
+
+        while True:
+            try:
+                record: logging.LogRecord = queue.get()
+                if record is None:  # sentinel
+                    break
+                logger = logging.getLogger(record.name)
+                logger.handle(record)
+            except Exception:
+                import sys, traceback
+                print('Whoops! Problem:', file=sys.stderr)
+                traceback.print_exc(file=sys.stderr)
+
+    if not log_dir.exists():
+        log_dir.mkdir()
+    path = log_dir.joinpath(f'{name}_{datetime.now().isoformat(timespec="seconds")}.log')
+    queue = multiprocessing.Queue(-1)
+    listener = multiprocessing.Process(target=listener_process, args=(queue, path))
+    return (listener, queue)
+
+
+def logging_setup_worker(name: str, queue: multiprocessing.Queue) -> logging.Logger:
+    logger = logging.getLogger(name)
+    queue_handler = QueueHandler(queue)
+    logger.addHandler(queue_handler)
+    logger.setLevel(logging.DEBUG)
+    return logger
+
+
 def jira_issue_url(issue_id: str) -> str:
     return ASF_JIRA_BASE_URL + f"/{issue_id}"
 
diff --git a/migration/src/jira2github_import.py b/migration/src/jira2github_import.py
index 233ccd15..86d9792c 100644
--- a/migration/src/jira2github_import.py
+++ b/migration/src/jira2github_import.py
@@ -1,11 +1,12 @@
 #
 # Convert Jira issues to GitHub issues for Import Issues API (https://gist.github.com/jonmagic/5282384165e0f86ef105)
 # Usage:
-#   python src/jira2github_import.py --issues <jira issue number list>
-#   python src/jira2github_import.py --min <min issue number> --max <max issue number>
+#   python src/jira2github_import.py --issues <jira issue number list> [--num-workers <# worker processes>]
+#   python src/jira2github_import.py --min <min issue number> --max <max issue number> [--num-workers <# worker processes>]
 #
 
 import argparse
+from logging import Logger
 from pathlib import Path
 import json
 import sys
@@ -13,30 +14,26 @@ from urllib.parse import quote
 import dateutil.parser
 import os
 import traceback
+import multiprocessing
 
-from common import LOG_DIRNAME, JIRA_DUMP_DIRNAME, GITHUB_IMPORT_DATA_DIRNAME, MAPPINGS_DATA_DIRNAME, ACCOUNT_MAPPING_FILENAME, ISSUE_TYPE_TO_LABEL_MAP, COMPONENT_TO_LABEL_MAP, \
-    logging_setup, jira_issue_url, jira_dump_file, jira_issue_id, github_data_file, make_github_title, read_account_map
+from common import *
 from jira_util import *
 
-log_dir = Path(__file__).resolve().parent.parent.joinpath(LOG_DIRNAME)
-logger = logging_setup(log_dir, "jira2github_import")
+#log_dir = Path(__file__).resolve().parent.parent.joinpath(LOG_DIRNAME)
+#logger = logging_setup(log_dir, "jira2github_import")
 
 
 def attachment_url(issue_num: int, filename: str, att_repo: str, att_branch: str) -> str:
     return f"https://raw.githubusercontent.com/{att_repo}/{att_branch}/attachments/{jira_issue_id(issue_num)}/{quote(filename)}"
 
 
-#def may_markup(gh_account: str) -> bool:
-#    return gh_account if gh_account in ["@mocobeta", "@dweiss"] else f"`{gh_account}`"
-
-
 def jira_timestamp_to_github_timestamp(ts: str) -> str:
     # convert Jira timestamp format to GitHub acceptable format
     # e.g., "2006-06-06T06:24:38.000+0000" -> "2006-06-06T06:24:38Z"
     return ts[:-9] + "Z"
 
 
-def convert_issue(num: int, dump_dir: Path, output_dir: Path, account_map: dict[str, str], att_repo: str, att_branch: str) -> bool:
+def convert_issue(num: int, dump_dir: Path, output_dir: Path, account_map: dict[str, str], att_repo: str, att_branch: str, logger: Logger) -> bool:
     jira_id = jira_issue_id(num)
     dump_file = jira_dump_file(dump_dir, num)
     if not dump_file.exists():
@@ -73,7 +70,6 @@ def convert_issue(num: int, dump_dir: Path, output_dir: Path, account_map: dict[
         for (filename, cnt) in attachments:
             attachment_list_items.append(f"[{filename}]({attachment_url(num, filename, att_repo, att_branch)})" + (f" (versions: {cnt})" if cnt > 1 else ""))
             att_replace_map[filename] = attachment_url(num, filename, att_repo, att_branch)
-            print(f'{jira_id}: attachments: {attachment_list_items}')
 
         # embed github issue number next to linked issue keys
         linked_issues_list_items = []
@@ -212,11 +208,12 @@ if __name__ == "__main__":
     parser.add_argument('--issues', type=int, required=False, nargs='*', help='Jira issue number list to be downloaded')
     parser.add_argument('--min', type=int, dest='min', required=False, default=1, help='Minimum Jira issue number to be converted')
     parser.add_argument('--max', type=int, dest='max', required=False, help='Maximum Jira issue number to be converted')
+    parser.add_argument('--num_workers', type=int, dest='num_workers', required=False, default=1, help='Number of worker processes')
     args = parser.parse_args()
 
     dump_dir = Path(__file__).resolve().parent.parent.joinpath(JIRA_DUMP_DIRNAME)
     if not dump_dir.exists():
-        logger.error(f"Jira dump dir not exists: {dump_dir}")
+        print(f"Jira dump dir not exists: {dump_dir}")
         sys.exit(1)
 
     mappings_dir = Path(__file__).resolve().parent.parent.joinpath(MAPPINGS_DATA_DIRNAME)
@@ -237,14 +234,31 @@ if __name__ == "__main__":
             issues.extend(list(range(args.min, args.max + 1)))
         else:
             issues.append(args.min)
+    num_workers = args.num_workers
+
+    log_dir = Path(__file__).resolve().parent.parent.joinpath(LOG_DIRNAME)
+    name = "jira2github_import"
+    (listener, queue) = log_listener(log_dir, name)
+    listener.start()
+    logger = logging_setup_worker(name, queue)
+
+    logger.info(f"Converting Jira issues to GitHub issues in {output_dir}. num_workers={num_workers}")
 
-    logger.info(f"Converting Jira issues to GitHub issues in {output_dir}")
-    for num in issues:
+    def worker(num):
         try:
-            convert_issue(num, dump_dir, output_dir, account_map, github_att_repo, github_att_branch)
+            convert_issue(num, dump_dir, output_dir, account_map, github_att_repo, github_att_branch, logger)
         except Exception as e:
             logger.error(traceback.format_exc(limit=100))
             logger.error(f"Failed to convert Jira issue. An error '{str(e)}' occurred; skipped {jira_issue_id(num)}.")
-    
-    logger.info("Done.")
 
+    results = []
+    with multiprocessing.Pool(num_workers) as pool:
+        for num in issues:
+            result = pool.apply_async(worker, (num,))
+            results.append(result)
+        for res in results:
+            res.get()
+
+    logger.info("Done.")
+    queue.put_nowait(None)
+    listener.join()