You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by to...@apache.org on 2022/07/13 11:16:36 UTC
[lucene-jira-archive] 01/01: allow to specify number of workers for jira2github_import
This is an automated email from the ASF dual-hosted git repository.
tomoko pushed a commit to branch parallelize-jira2github
in repository https://gitbox.apache.org/repos/asf/lucene-jira-archive.git
commit 4e4e24c0c2b7dd7334a66dc6d85ee656260b9904
Author: Tomoko Uchida <to...@gmail.com>
AuthorDate: Wed Jul 13 20:16:24 2022 +0900
allow to specify number of workers for jira2github_import
---
migration/src/common.py | 52 ++++++++++++++++++++++++++++++++++---
migration/src/jira2github_import.py | 50 ++++++++++++++++++++++-------------
2 files changed, 80 insertions(+), 22 deletions(-)
diff --git a/migration/src/common.py b/migration/src/common.py
index e4d4727e..5da0536e 100644
--- a/migration/src/common.py
+++ b/migration/src/common.py
@@ -4,7 +4,8 @@ from datetime import datetime
import functools
import time
import os
-import tempfile
+import multiprocessing
+from logging.handlers import QueueHandler
LOG_DIRNAME = "log"
@@ -19,25 +20,68 @@ ACCOUNT_MAPPING_FILENAME = "account-map.csv"
ASF_JIRA_BASE_URL = "https://issues.apache.org/jira/browse"
+LOGGING_FOMATTER = logging.Formatter("[%(asctime)s] %(levelname)s:%(module)s: %(message)s")
logging.basicConfig(level=logging.DEBUG, handlers=[])
def logging_setup(log_dir: Path, name: str) -> logging.Logger:
if not log_dir.exists():
log_dir.mkdir()
- formatter = logging.Formatter("[%(asctime)s] %(levelname)s:%(module)s: %(message)s")
file_handler = logging.FileHandler(log_dir.joinpath(f'{name}_{datetime.now().isoformat(timespec="seconds")}.log'))
file_handler.setLevel(logging.DEBUG)
- file_handler.setFormatter(formatter)
+ file_handler.setFormatter(LOGGING_FOMATTER)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
- console_handler.setFormatter(formatter)
+ console_handler.setFormatter(LOGGING_FOMATTER)
logger = logging.getLogger(name)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
+# helper to support logging to a single file from multiple processes
+# https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
+def log_listener(log_dir: Path, name: str) -> tuple[multiprocessing.Process, multiprocessing.Queue]:
+
+ def listener_process(queue: multiprocessing.Queue, path: Path):
+ file_handler = logging.FileHandler(path)
+ file_handler.setLevel(logging.DEBUG)
+ file_handler.setFormatter(LOGGING_FOMATTER)
+ console_handler = logging.StreamHandler()
+ console_handler.setLevel(logging.INFO)
+ console_handler.setFormatter(LOGGING_FOMATTER)
+ root = logging.getLogger()
+ root.addHandler(file_handler)
+ root.addHandler(console_handler)
+
+ while True:
+ try:
+ record: logging.LogRecord = queue.get()
+ if record is None: # sentinel
+ break
+ logger = logging.getLogger(record.name)
+ logger.handle(record)
+ except Exception:
+ import sys, traceback
+ print('Whoops! Problem:', file=sys.stderr)
+ traceback.print_exc(file=sys.stderr)
+
+ if not log_dir.exists():
+ log_dir.mkdir()
+ path = log_dir.joinpath(f'{name}_{datetime.now().isoformat(timespec="seconds")}.log')
+ queue = multiprocessing.Queue(-1)
+ listener = multiprocessing.Process(target=listener_process, args=(queue, path))
+ return (listener, queue)
+
+
+def logging_setup_worker(name: str, queue: multiprocessing.Queue) -> logging.Logger:
+ logger = logging.getLogger(name)
+ queue_handler = QueueHandler(queue)
+ logger.addHandler(queue_handler)
+ logger.setLevel(logging.DEBUG)
+ return logger
+
+
def jira_issue_url(issue_id: str) -> str:
return ASF_JIRA_BASE_URL + f"/{issue_id}"
diff --git a/migration/src/jira2github_import.py b/migration/src/jira2github_import.py
index 233ccd15..86d9792c 100644
--- a/migration/src/jira2github_import.py
+++ b/migration/src/jira2github_import.py
@@ -1,11 +1,12 @@
#
# Convert Jira issues to GitHub issues for Import Issues API (https://gist.github.com/jonmagic/5282384165e0f86ef105)
# Usage:
-# python src/jira2github_import.py --issues <jira issue number list>
-# python src/jira2github_import.py --min <min issue number> --max <max issue number>
+# python src/jira2github_import.py --issues <jira issue number list> [--num-workers <# worker processes>]
+# python src/jira2github_import.py --min <min issue number> --max <max issue number> [--num-workers <# worker processes>]
#
import argparse
+from logging import Logger
from pathlib import Path
import json
import sys
@@ -13,30 +14,26 @@ from urllib.parse import quote
import dateutil.parser
import os
import traceback
+import multiprocessing
-from common import LOG_DIRNAME, JIRA_DUMP_DIRNAME, GITHUB_IMPORT_DATA_DIRNAME, MAPPINGS_DATA_DIRNAME, ACCOUNT_MAPPING_FILENAME, ISSUE_TYPE_TO_LABEL_MAP, COMPONENT_TO_LABEL_MAP, \
- logging_setup, jira_issue_url, jira_dump_file, jira_issue_id, github_data_file, make_github_title, read_account_map
+from common import *
from jira_util import *
-log_dir = Path(__file__).resolve().parent.parent.joinpath(LOG_DIRNAME)
-logger = logging_setup(log_dir, "jira2github_import")
+#log_dir = Path(__file__).resolve().parent.parent.joinpath(LOG_DIRNAME)
+#logger = logging_setup(log_dir, "jira2github_import")
def attachment_url(issue_num: int, filename: str, att_repo: str, att_branch: str) -> str:
return f"https://raw.githubusercontent.com/{att_repo}/{att_branch}/attachments/{jira_issue_id(issue_num)}/{quote(filename)}"
-#def may_markup(gh_account: str) -> bool:
-# return gh_account if gh_account in ["@mocobeta", "@dweiss"] else f"`{gh_account}`"
-
-
def jira_timestamp_to_github_timestamp(ts: str) -> str:
# convert Jira timestamp format to GitHub acceptable format
# e.g., "2006-06-06T06:24:38.000+0000" -> "2006-06-06T06:24:38Z"
return ts[:-9] + "Z"
-def convert_issue(num: int, dump_dir: Path, output_dir: Path, account_map: dict[str, str], att_repo: str, att_branch: str) -> bool:
+def convert_issue(num: int, dump_dir: Path, output_dir: Path, account_map: dict[str, str], att_repo: str, att_branch: str, logger: Logger) -> bool:
jira_id = jira_issue_id(num)
dump_file = jira_dump_file(dump_dir, num)
if not dump_file.exists():
@@ -73,7 +70,6 @@ def convert_issue(num: int, dump_dir: Path, output_dir: Path, account_map: dict[
for (filename, cnt) in attachments:
attachment_list_items.append(f"[{filename}]({attachment_url(num, filename, att_repo, att_branch)})" + (f" (versions: {cnt})" if cnt > 1 else ""))
att_replace_map[filename] = attachment_url(num, filename, att_repo, att_branch)
- print(f'{jira_id}: attachments: {attachment_list_items}')
# embed github issue number next to linked issue keys
linked_issues_list_items = []
@@ -212,11 +208,12 @@ if __name__ == "__main__":
parser.add_argument('--issues', type=int, required=False, nargs='*', help='Jira issue number list to be downloaded')
parser.add_argument('--min', type=int, dest='min', required=False, default=1, help='Minimum Jira issue number to be converted')
parser.add_argument('--max', type=int, dest='max', required=False, help='Maximum Jira issue number to be converted')
+ parser.add_argument('--num_workers', type=int, dest='num_workers', required=False, default=1, help='Number of worker processes')
args = parser.parse_args()
dump_dir = Path(__file__).resolve().parent.parent.joinpath(JIRA_DUMP_DIRNAME)
if not dump_dir.exists():
- logger.error(f"Jira dump dir not exists: {dump_dir}")
+ print(f"Jira dump dir not exists: {dump_dir}")
sys.exit(1)
mappings_dir = Path(__file__).resolve().parent.parent.joinpath(MAPPINGS_DATA_DIRNAME)
@@ -237,14 +234,31 @@ if __name__ == "__main__":
issues.extend(list(range(args.min, args.max + 1)))
else:
issues.append(args.min)
+ num_workers = args.num_workers
+
+ log_dir = Path(__file__).resolve().parent.parent.joinpath(LOG_DIRNAME)
+ name = "jira2github_import"
+ (listener, queue) = log_listener(log_dir, name)
+ listener.start()
+ logger = logging_setup_worker(name, queue)
+
+ logger.info(f"Converting Jira issues to GitHub issues in {output_dir}. num_workers={num_workers}")
- logger.info(f"Converting Jira issues to GitHub issues in {output_dir}")
- for num in issues:
+ def worker(num):
try:
- convert_issue(num, dump_dir, output_dir, account_map, github_att_repo, github_att_branch)
+ convert_issue(num, dump_dir, output_dir, account_map, github_att_repo, github_att_branch, logger)
except Exception as e:
logger.error(traceback.format_exc(limit=100))
logger.error(f"Failed to convert Jira issue. An error '{str(e)}' occurred; skipped {jira_issue_id(num)}.")
-
- logger.info("Done.")
+ results = []
+ with multiprocessing.Pool(num_workers) as pool:
+ for num in issues:
+ result = pool.apply_async(worker, (num,))
+ results.append(result)
+ for res in results:
+ res.get()
+
+ logger.info("Done.")
+ queue.put_nowait(None)
+ listener.join()