You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by to...@apache.org on 2022/07/14 02:26:48 UTC
[lucene-jira-archive] branch main updated: Allow to specify number of worker processes for jira2github_import.py (#41)
This is an automated email from the ASF dual-hosted git repository.
tomoko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/lucene-jira-archive.git
The following commit(s) were added to refs/heads/main by this push:
new 5cc53ad8 Allow to specify number of worker processes for jira2github_import.py (#41)
5cc53ad8 is described below
commit 5cc53ad8d166ff5823235e2d82f0fcd997429d92
Author: Tomoko Uchida <to...@gmail.com>
AuthorDate: Thu Jul 14 11:26:43 2022 +0900
Allow to specify number of worker processes for jira2github_import.py (#41)
* allow to specify number of workers for jira2github_import
* remove obsolete code
* try to make it work on windows (not tested...)
* properly name the function that are passed to Pool.apply_async() as 'task'
---
migration/src/common.py | 53 ++++++++++++++++++++++++++++++++++---
migration/src/jira2github_import.py | 51 ++++++++++++++++++++++-------------
2 files changed, 82 insertions(+), 22 deletions(-)
diff --git a/migration/src/common.py b/migration/src/common.py
index e4d4727e..ee37a6a5 100644
--- a/migration/src/common.py
+++ b/migration/src/common.py
@@ -4,7 +4,8 @@ from datetime import datetime
import functools
import time
import os
-import tempfile
+import multiprocessing
+from logging.handlers import QueueHandler
LOG_DIRNAME = "log"
@@ -19,25 +20,69 @@ ACCOUNT_MAPPING_FILENAME = "account-map.csv"
ASF_JIRA_BASE_URL = "https://issues.apache.org/jira/browse"
+LOGGING_FOMATTER = logging.Formatter("[%(asctime)s] %(levelname)s:%(module)s: %(message)s")
logging.basicConfig(level=logging.DEBUG, handlers=[])
def logging_setup(log_dir: Path, name: str) -> logging.Logger:
if not log_dir.exists():
log_dir.mkdir()
- formatter = logging.Formatter("[%(asctime)s] %(levelname)s:%(module)s: %(message)s")
file_handler = logging.FileHandler(log_dir.joinpath(f'{name}_{datetime.now().isoformat(timespec="seconds")}.log'))
file_handler.setLevel(logging.DEBUG)
- file_handler.setFormatter(formatter)
+ file_handler.setFormatter(LOGGING_FOMATTER)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
- console_handler.setFormatter(formatter)
+ console_handler.setFormatter(LOGGING_FOMATTER)
logger = logging.getLogger(name)
+ logger.handlers = [] # clear current handlers
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
+# helper to support logging to a single file from multiple processes
+# https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
+def log_listener(log_dir: Path, name: str) -> tuple[multiprocessing.Process, multiprocessing.Queue]:
+
+ def listener_process(queue: multiprocessing.Queue, path: Path):
+ file_handler = logging.FileHandler(path)
+ file_handler.setLevel(logging.DEBUG)
+ file_handler.setFormatter(LOGGING_FOMATTER)
+ console_handler = logging.StreamHandler()
+ console_handler.setLevel(logging.INFO)
+ console_handler.setFormatter(LOGGING_FOMATTER)
+ root = logging.getLogger()
+ root.addHandler(file_handler)
+ root.addHandler(console_handler)
+
+ while True:
+ try:
+ record: logging.LogRecord = queue.get()
+ if record is None: # sentinel
+ break
+ logger = logging.getLogger(record.name)
+ logger.handle(record)
+ except Exception:
+ import sys, traceback
+ print('Whoops! Problem:', file=sys.stderr)
+ traceback.print_exc(file=sys.stderr)
+
+ if not log_dir.exists():
+ log_dir.mkdir()
+ path = log_dir.joinpath(f'{name}_{datetime.now().isoformat(timespec="seconds")}.log')
+ queue = multiprocessing.Queue(-1)
+ listener = multiprocessing.Process(target=listener_process, args=(queue, path))
+ return (listener, queue)
+
+
+def logging_setup_worker(queue: multiprocessing.Queue):
+ logger = logging.getLogger()
+ queue_handler = QueueHandler(queue)
+ logger.handlers = [] # clear current handlers
+ logger.addHandler(queue_handler)
+ logger.setLevel(logging.DEBUG)
+
+
def jira_issue_url(issue_id: str) -> str:
return ASF_JIRA_BASE_URL + f"/{issue_id}"
diff --git a/migration/src/jira2github_import.py b/migration/src/jira2github_import.py
index caff59b5..9262d3e3 100644
--- a/migration/src/jira2github_import.py
+++ b/migration/src/jira2github_import.py
@@ -1,11 +1,12 @@
#
# Convert Jira issues to GitHub issues for Import Issues API (https://gist.github.com/jonmagic/5282384165e0f86ef105)
# Usage:
-# python src/jira2github_import.py --issues <jira issue number list>
-# python src/jira2github_import.py --min <min issue number> --max <max issue number>
+# python src/jira2github_import.py --issues <jira issue number list> [--num-workers <# worker processes>]
+# python src/jira2github_import.py --min <min issue number> --max <max issue number> [--num-workers <# worker processes>]
#
import argparse
+from logging import Logger
from pathlib import Path
import json
import sys
@@ -13,30 +14,23 @@ from urllib.parse import quote
import dateutil.parser
import os
import traceback
+import multiprocessing
-from common import LOG_DIRNAME, JIRA_DUMP_DIRNAME, GITHUB_IMPORT_DATA_DIRNAME, MAPPINGS_DATA_DIRNAME, ACCOUNT_MAPPING_FILENAME, ISSUE_TYPE_TO_LABEL_MAP, COMPONENT_TO_LABEL_MAP, \
- logging_setup, jira_issue_url, jira_dump_file, jira_issue_id, github_data_file, make_github_title, read_account_map
+from common import *
from jira_util import *
-log_dir = Path(__file__).resolve().parent.parent.joinpath(LOG_DIRNAME)
-logger = logging_setup(log_dir, "jira2github_import")
-
def attachment_url(issue_num: int, filename: str, att_repo: str, att_branch: str) -> str:
return f"https://raw.githubusercontent.com/{att_repo}/{att_branch}/attachments/{jira_issue_id(issue_num)}/{quote(filename)}"
-#def may_markup(gh_account: str) -> bool:
-# return gh_account if gh_account in ["@mocobeta", "@dweiss"] else f"`{gh_account}`"
-
-
def jira_timestamp_to_github_timestamp(ts: str) -> str:
# convert Jira timestamp format to GitHub acceptable format
# e.g., "2006-06-06T06:24:38.000+0000" -> "2006-06-06T06:24:38Z"
return ts[:-9] + "Z"
-def convert_issue(num: int, dump_dir: Path, output_dir: Path, account_map: dict[str, str], att_repo: str, att_branch: str) -> bool:
+def convert_issue(num: int, dump_dir: Path, output_dir: Path, account_map: dict[str, str], att_repo: str, att_branch: str, logger: Logger) -> bool:
jira_id = jira_issue_id(num)
dump_file = jira_dump_file(dump_dir, num)
if not dump_file.exists():
@@ -213,11 +207,12 @@ if __name__ == "__main__":
parser.add_argument('--issues', type=int, required=False, nargs='*', help='Jira issue number list to be downloaded')
parser.add_argument('--min', type=int, dest='min', required=False, default=1, help='Minimum Jira issue number to be converted')
parser.add_argument('--max', type=int, dest='max', required=False, help='Maximum Jira issue number to be converted')
+ parser.add_argument('--num_workers', type=int, dest='num_workers', required=False, default=1, help='Number of worker processes')
args = parser.parse_args()
dump_dir = Path(__file__).resolve().parent.parent.joinpath(JIRA_DUMP_DIRNAME)
if not dump_dir.exists():
- logger.error(f"Jira dump dir not exists: {dump_dir}")
+ print(f"Jira dump dir not exists: {dump_dir}")
sys.exit(1)
mappings_dir = Path(__file__).resolve().parent.parent.joinpath(MAPPINGS_DATA_DIRNAME)
@@ -238,14 +233,34 @@ if __name__ == "__main__":
issues.extend(list(range(args.min, args.max + 1)))
else:
issues.append(args.min)
+ num_workers = args.num_workers
+
+ log_dir = Path(__file__).resolve().parent.parent.joinpath(LOG_DIRNAME)
+ name = "jira2github_import"
+ (listener, queue) = log_listener(log_dir, name)
+ listener.start()
+ logging_setup_worker(queue)
+ logger = logging.getLogger(name)
- logger.info(f"Converting Jira issues to GitHub issues in {output_dir}")
- for num in issues:
+ logger.info(f"Converting Jira issues to GitHub issues in {output_dir}. num_workers={num_workers}")
+
+ def task(num):
+ logger = logging.getLogger(name)
try:
- convert_issue(num, dump_dir, output_dir, account_map, github_att_repo, github_att_branch)
+ convert_issue(num, dump_dir, output_dir, account_map, github_att_repo, github_att_branch, logger)
except Exception as e:
logger.error(traceback.format_exc(limit=100))
logger.error(f"Failed to convert Jira issue. An error '{str(e)}' occurred; skipped {jira_issue_id(num)}.")
-
- logger.info("Done.")
+ results = []
+ # Try to support Windows: The worker configuration is done at the start of the worker process run.
+ with multiprocessing.Pool(num_workers, initializer=logging_setup_worker, initargs=(queue,)) as pool:
+ for num in issues:
+ result = pool.apply_async(task, (num,))
+ results.append(result)
+ for res in results:
+ res.get()
+
+ logger.info("Done.")
+ queue.put_nowait(None)
+ listener.join()