You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tvm.apache.org by wu...@apache.org on 2021/09/08 17:08:32 UTC
[tvm] branch main updated: [AutoTVM] Use popenpool in
local_executor (#8851)
This is an automated email from the ASF dual-hosted git repository.
wuwei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tvm.git
The following commit(s) were added to refs/heads/main by this push:
new 1f2fdbf [AutoTVM] Use popenpool in local_executor (#8851)
1f2fdbf is described below
commit 1f2fdbf6c25e7117804b774cccd9d10b7da72b93
Author: Yuanjing Shi <yj...@shingjan.me>
AuthorDate: Wed Sep 8 10:08:15 2021 -0700
[AutoTVM] Use popenpool in local_executor (#8851)
* use popenpool in local_executor
* move auto_tvm_common to tvm.testing
* refactor
* nit
* remove LocalFutureNoFork
* exception handling
* handling two exceptions
* handling error
* add initiazlier
---
python/tvm/autotvm/measure/__init__.py | 1 -
python/tvm/autotvm/measure/local_executor.py | 157 ---------------------
python/tvm/autotvm/measure/measure_methods.py | 86 +++++------
python/tvm/testing/__init__.py | 4 +-
.../tvm/testing/autotvm.py | 1 +
python/tvm/testing/popen_pool.py | 16 +++
tests/python/contrib/test_popen_pool.py | 36 +++++
tests/python/unittest/test_autotvm_database.py | 2 +-
tests/python/unittest/test_autotvm_executor.py | 69 ---------
tests/python/unittest/test_autotvm_index_tuner.py | 2 +-
tests/python/unittest/test_autotvm_measure.py | 9 +-
tests/python/unittest/test_autotvm_record.py | 2 +-
.../python/unittest/test_autotvm_xgboost_model.py | 2 +-
13 files changed, 112 insertions(+), 275 deletions(-)
diff --git a/python/tvm/autotvm/measure/__init__.py b/python/tvm/autotvm/measure/__init__.py
index c4c0dc9..10b0843 100644
--- a/python/tvm/autotvm/measure/__init__.py
+++ b/python/tvm/autotvm/measure/__init__.py
@@ -31,4 +31,3 @@ from .measure_methods import (
request_remote,
)
from .executor import Executor
-from .local_executor import LocalExecutor
diff --git a/python/tvm/autotvm/measure/local_executor.py b/python/tvm/autotvm/measure/local_executor.py
deleted file mode 100644
index a9aeb790..0000000
--- a/python/tvm/autotvm/measure/local_executor.py
+++ /dev/null
@@ -1,157 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""Local based implementation of the executor using multiprocessing"""
-
-import signal
-
-from multiprocessing import Process, Queue
-
-try:
- from queue import Empty
-except ImportError:
- from Queue import Empty
-
-try:
- import psutil
-except ImportError:
- psutil = None
-
-from . import executor
-
-
-def kill_child_processes(parent_pid, sig=signal.SIGTERM):
- """kill all child processes recursively"""
- try:
- parent = psutil.Process(parent_pid)
- children = parent.children(recursive=True)
- except psutil.NoSuchProcess:
- return
- for process in children:
- try:
- process.send_signal(sig)
- except psutil.NoSuchProcess:
- return
-
-
-def _execute_func(func, queue, args, kwargs):
- """execute function and return the result or exception to a queue"""
- try:
- res = func(*args, **kwargs)
- except Exception as exc: # pylint: disable=broad-except
- res = exc
- queue.put(res)
-
-
-def call_with_timeout(queue, timeout, func, args, kwargs):
- """A wrapper to support timeout of a function call"""
-
- # start a new process for timeout (cannot use thread because we have c function)
- p = Process(target=_execute_func, args=(func, queue, args, kwargs))
- p.start()
- p.join(timeout=timeout)
-
- queue.put(executor.TimeoutError())
-
- kill_child_processes(p.pid)
- p.terminate()
- p.join()
-
-
-class LocalFuture(executor.Future):
- """Local wrapper for the future
-
- Parameters
- ----------
- process: multiprocessing.Process
- process for running this task
- queue: multiprocessing.Queue
- queue for receiving the result of this task
- """
-
- def __init__(self, process, queue):
- self._done = False
- self._process = process
- self._queue = queue
-
- def done(self):
- self._done = self._done or not self._queue.empty()
- return self._done
-
- def get(self, timeout=None):
- try:
- res = self._queue.get(block=True, timeout=timeout)
- except Empty:
- raise executor.TimeoutError()
- if self._process.is_alive():
- kill_child_processes(self._process.pid)
- self._process.terminate()
- self._process.join()
- self._queue.close()
- self._queue.join_thread()
- self._done = True
- del self._queue
- del self._process
- return res
-
-
-class LocalFutureNoFork(executor.Future):
- """Local wrapper for the future.
- This is a none-fork version of LocalFuture.
- Use this for the runtime that does not support fork (like cudnn)
- """
-
- def __init__(self, result):
- self._result = result
-
- def done(self):
- return True
-
- def get(self, timeout=None):
- return self._result
-
-
-class LocalExecutor(executor.Executor):
- """Local executor that runs workers on the same machine with multiprocessing.
-
- Parameters
- ----------
- timeout: float, optional
- timeout of a job. If time is out. A TimeoutError will be returned (not raised)
- do_fork: bool, optional
- For some runtime systems that do not support fork after initialization
- (e.g. cuda runtime, cudnn). Set this to False if you have used these runtime
- before submitting jobs.
- """
-
- def __init__(self, timeout=None, do_fork=True):
- self.timeout = timeout or executor.Executor.DEFAULT_TIMEOUT
- self.do_fork = do_fork
-
- if self.do_fork:
- if not psutil:
- raise RuntimeError(
- "Python package psutil is missing. " "please try `pip install psutil`"
- )
-
- def submit(self, func, *args, **kwargs):
- if not self.do_fork:
- return LocalFutureNoFork(func(*args, **kwargs))
-
- queue = Queue(2) # Size of 2 to avoid a race condition with size 1.
- process = Process(target=call_with_timeout, args=(queue, self.timeout, func, args, kwargs))
- process.start()
- return LocalFuture(process, queue)
diff --git a/python/tvm/autotvm/measure/measure_methods.py b/python/tvm/autotvm/measure/measure_methods.py
index eab6822..42e046a 100644
--- a/python/tvm/autotvm/measure/measure_methods.py
+++ b/python/tvm/autotvm/measure/measure_methods.py
@@ -38,7 +38,9 @@ import tvm._ffi
import tvm.ir.transform
from tvm import nd
from tvm import rpc as _rpc
+from tvm.autotvm.env import AutotvmGlobalScope, reset_global_scope
from tvm.contrib import ndk, nvcc, stackvm, tar
+from tvm.contrib.popen_pool import PopenPoolExecutor
from tvm.driver import build
from tvm.error import TVMError
from tvm.target import Target
@@ -46,7 +48,6 @@ from tvm.target import Target
from ..env import AutotvmGlobalScope
from ..task.space import InstantiationError
from ..utils import get_const_tuple
-from .local_executor import LocalExecutor
from .measure import Builder, MeasureErrorNo, MeasureResult, Runner
logger = logging.getLogger("autotvm")
@@ -98,7 +99,9 @@ class LocalBuilder(Builder):
else:
raise ValueError("Invalid build_func" + build_func)
self.build_func = _WrappedBuildFunc(build_func)
- self.executor = LocalExecutor(timeout=timeout)
+ self.executor = PopenPoolExecutor(
+ timeout=timeout, initializer=reset_global_scope, initargs=(AutotvmGlobalScope.current,)
+ )
self.tmp_dir = tempfile.mkdtemp()
def build(self, measure_inputs):
@@ -114,53 +117,52 @@ class LocalBuilder(Builder):
futures.append(ret)
for future in futures:
- res = future.get()
-
- if isinstance(res, Exception):
- # timeout or fleet error, return MeasureResult directly
- results.append(
- MeasureResult(
- (res,), MeasureErrorNo.BUILD_TIMEOUT, self.timeout, time.time()
- )
- )
- elif res.error is not None:
- # instantiation error
- if isinstance(res.error, InstantiationError):
- results.append(
- MeasureResult(
+ try:
+ res = future.result()
+ if res.error is not None:
+ # instantiation error
+ if isinstance(res.error, InstantiationError):
+ res = MeasureResult(
(res.error,),
MeasureErrorNo.INSTANTIATION_ERROR,
res.time_cost,
time.time(),
)
- )
- else:
- if "InstantiationError" in str(res.error):
- msg = str(res.error)
- try:
- msg = msg.split("\n")[-2].split(": ")[1]
- except Exception: # pylint: disable=broad-except
- pass
- results.append(
- MeasureResult(
+
+ else:
+ if "InstantiationError" in str(res.error):
+ msg = str(res.error)
+ try:
+ msg = msg.split("\n")[-2].split(": ")[1]
+ except Exception: # pylint: disable=broad-except
+ pass
+ res = MeasureResult(
(InstantiationError(msg),),
MeasureErrorNo.INSTANTIATION_ERROR,
res.time_cost,
time.time(),
)
- )
- else: # tvm error
- results.append(
- MeasureResult(
+
+ else: # tvm error
+ res = MeasureResult(
(res.error,),
MeasureErrorNo.COMPILE_HOST,
res.time_cost,
time.time(),
)
- )
- else:
- # return BuildResult
- results.append(res)
+ except TimeoutError as ex:
+ res = MeasureResult(
+ (ex,), MeasureErrorNo.BUILD_TIMEOUT, self.timeout, time.time()
+ )
+ except ChildProcessError as ex:
+ res = MeasureResult(
+ (ex,),
+ MeasureErrorNo.RUNTIME_DEVICE,
+ self.timeout,
+ time.time(),
+ )
+
+ results.append(res)
return results
@@ -242,7 +244,11 @@ class RPCRunner(Runner):
self.cooldown_interval = cooldown_interval
self.module_loader = module_loader
- self.executor = LocalExecutor(timeout=timeout * (self.n_parallel + 1))
+ self.executor = PopenPoolExecutor(
+ timeout=timeout * (self.n_parallel + 1),
+ initializer=reset_global_scope,
+ initargs=(AutotvmGlobalScope.current,),
+ )
@property
def ref_input(self):
@@ -337,15 +343,15 @@ class RPCRunner(Runner):
futures.append(ret)
for future in futures:
- res = future.get()
- if isinstance(res, Exception): # executor error or timeout
+ try:
+ res = future.result()
+ results.append(res)
+ except Exception as ex: # pylint: disable=broad-except
results.append(
MeasureResult(
- (str(res),), MeasureErrorNo.RUN_TIMEOUT, self.timeout, time.time()
+ (str(ex),), MeasureErrorNo.RUN_TIMEOUT, self.timeout, time.time()
)
)
- else:
- results.append(res)
return results
diff --git a/python/tvm/testing/__init__.py b/python/tvm/testing/__init__.py
index 75349d8..d848467 100644
--- a/python/tvm/testing/__init__.py
+++ b/python/tvm/testing/__init__.py
@@ -25,6 +25,8 @@ from ._ffi_api import test_wrap_callback, test_raise_error_callback, test_check_
from ._ffi_api import ErrorTest, FrontendTestModule, identity_cpp
from .popen_pool import initializer, after_initializer, register_ffi, call_cpp_ffi
-from .popen_pool import call_py_ffi, call_cpp_py_ffi
+from .popen_pool import call_py_ffi, call_cpp_py_ffi, fast_summation, slow_summation
+from .popen_pool import timeout_job
from . import auto_scheduler
+from . import autotvm
diff --git a/tests/python/unittest/test_autotvm_common.py b/python/tvm/testing/autotvm.py
similarity index 97%
rename from tests/python/unittest/test_autotvm_common.py
rename to python/tvm/testing/autotvm.py
index 60f7d8b..6f7bb13 100644
--- a/tests/python/unittest/test_autotvm_common.py
+++ b/python/tvm/testing/autotvm.py
@@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+# pylint: disable=invalid-name, missing-function-docstring, missing-class-docstring
"""Common utilities for testing autotvm"""
import time
diff --git a/python/tvm/testing/popen_pool.py b/python/tvm/testing/popen_pool.py
index 20345a2..b646d7a 100644
--- a/python/tvm/testing/popen_pool.py
+++ b/python/tvm/testing/popen_pool.py
@@ -16,6 +16,7 @@
# under the License.
# pylint: disable=invalid-name, missing-function-docstring
"""Common functions for popen_pool test cases"""
+import time
import tvm
TEST_GLOBAL_STATE_1 = 0
@@ -57,3 +58,18 @@ def call_cpp_ffi(arg):
def call_cpp_py_ffi(arg):
return tvm.testing.identity_cpp(arg)
+
+
+def fast_summation(n):
+ return n * (n + 1) // 2
+
+
+def slow_summation(n):
+ r = 0
+ for i in range(0, n + 1):
+ r += i
+ return r
+
+
+def timeout_job(n):
+ time.sleep(n * 1.5)
diff --git a/tests/python/contrib/test_popen_pool.py b/tests/python/contrib/test_popen_pool.py
index 9ebe4c1..b3a91e1 100644
--- a/tests/python/contrib/test_popen_pool.py
+++ b/tests/python/contrib/test_popen_pool.py
@@ -27,6 +27,9 @@ from tvm.testing import (
call_py_ffi,
call_cpp_ffi,
call_cpp_py_ffi,
+ fast_summation,
+ slow_summation,
+ timeout_job,
)
@@ -104,8 +107,41 @@ def test_popen_ffi():
assert proc.recv() == initargs[0]
+def test_popen_pool_executor_async():
+ pool = PopenPoolExecutor()
+ f1 = pool.submit(slow_summation, 9999999)
+ f2 = pool.submit(fast_summation, 9999999)
+ t1 = 0
+ t2 = 0
+ while True:
+ if t1 == 0 and f1.done():
+ t1 = time.time()
+ if t2 == 0 and f2.done():
+ t2 = time.time()
+ if t1 != 0 and t2 != 0:
+ break
+ assert t2 < t1, "Expected fast async job to finish first!"
+ assert f1.result() == f2.result()
+
+
+def test_popen_pool_executor_timeout():
+ timeout = 0.5
+
+ pool = PopenPoolExecutor(timeout=timeout)
+
+ f1 = pool.submit(timeout_job, timeout)
+ while not f1.done():
+ pass
+ try:
+ res = f1.result()
+ except Exception as ex:
+ assert isinstance(ex, TimeoutError)
+
+
if __name__ == "__main__":
test_popen_worker()
test_popen_pool_executor()
test_popen_initializer()
test_popen_ffi()
+ test_popen_pool_executor_async()
+ test_popen_pool_executor_timeout()
diff --git a/tests/python/unittest/test_autotvm_database.py b/tests/python/unittest/test_autotvm_database.py
index 197243e..d598002 100644
--- a/tests/python/unittest/test_autotvm_database.py
+++ b/tests/python/unittest/test_autotvm_database.py
@@ -21,7 +21,7 @@ import logging
from tvm.autotvm import database
from tvm.autotvm.record import encode, MeasureResult
-from test_autotvm_common import get_sample_records
+from tvm.testing.autotvm import get_sample_records
def test_save_load():
diff --git a/tests/python/unittest/test_autotvm_executor.py b/tests/python/unittest/test_autotvm_executor.py
deleted file mode 100644
index 9757576..0000000
--- a/tests/python/unittest/test_autotvm_executor.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""Test local executor"""
-import time
-
-from tvm.autotvm.measure import LocalExecutor, executor
-
-
-def slow(n):
- r = 0
- for i in range(0, n + 1):
- r += i
- return r
-
-
-def fast(n):
- return n * (n + 1) // 2
-
-
-def test_local_measure_async():
- ex = LocalExecutor()
- f1 = ex.submit(slow, 9999999)
- f2 = ex.submit(fast, 9999999)
- t1 = 0
- t2 = 0
- while True:
- if t1 == 0 and f1.done():
- t1 = time.time()
- if t2 == 0 and f2.done():
- t2 = time.time()
- if t1 != 0 and t2 != 0:
- break
- assert t2 < t1, "Expected fast async job to finish first!"
- assert f1.get() == f2.get()
-
-
-def timeout_job(n):
- time.sleep(n * 1.5)
-
-
-def test_timeout():
- timeout = 0.5
-
- ex = LocalExecutor(timeout=timeout)
-
- f1 = ex.submit(timeout_job, timeout)
- while not f1.done():
- pass
- res = f1.get()
- assert isinstance(res, executor.TimeoutError)
-
-
-if __name__ == "__main__":
- test_local_measure_async()
- test_timeout()
diff --git a/tests/python/unittest/test_autotvm_index_tuner.py b/tests/python/unittest/test_autotvm_index_tuner.py
index c433d8f..be89ee2 100644
--- a/tests/python/unittest/test_autotvm_index_tuner.py
+++ b/tests/python/unittest/test_autotvm_index_tuner.py
@@ -17,7 +17,7 @@
"""Test index based tuners"""
import multiprocessing
-from test_autotvm_common import DummyRunner, get_sample_task
+from tvm.testing.autotvm import DummyRunner, get_sample_task
from tvm import autotvm
from tvm.autotvm.tuner import GridSearchTuner, RandomTuner
diff --git a/tests/python/unittest/test_autotvm_measure.py b/tests/python/unittest/test_autotvm_measure.py
index a89c69c..3ef5cbd 100644
--- a/tests/python/unittest/test_autotvm_measure.py
+++ b/tests/python/unittest/test_autotvm_measure.py
@@ -17,13 +17,14 @@
"""Test builder and runner"""
import logging
import multiprocessing
-import time
+import concurrent
import numpy as np
import tvm
from tvm import te
-from test_autotvm_common import DummyRunner, bad_matmul, get_sample_task
+from tvm.autotvm.measure import executor
+from tvm.testing.autotvm import DummyRunner, bad_matmul, get_sample_task
from tvm import autotvm
from tvm.autotvm.measure.measure import MeasureErrorNo, MeasureResult
from tvm.autotvm import measure
@@ -76,7 +77,9 @@ def test_task_runner_with_ref_input():
self.ran_dummy_executor = True
sig = Signature.from_callable(func)
assert sig.bind(*args, **kwargs).arguments["ref_input"] == refinp
- return measure.local_executor.LocalFutureNoFork(None)
+ dummy_future = concurrent.futures.Future()
+ dummy_future.set_result(None)
+ return dummy_future
runner.executor = DummyExecutor()
runner.run([None], [None])
diff --git a/tests/python/unittest/test_autotvm_record.py b/tests/python/unittest/test_autotvm_record.py
index 51cc907..65739df 100644
--- a/tests/python/unittest/test_autotvm_record.py
+++ b/tests/python/unittest/test_autotvm_record.py
@@ -25,7 +25,7 @@ from tvm import autotvm
from tvm.autotvm.measure import MeasureInput, MeasureResult, MeasureErrorNo
from tvm.autotvm.record import encode, decode, ApplyHistoryBest, measure_str_key
-from test_autotvm_common import get_sample_task
+from tvm.testing.autotvm import get_sample_task
def test_load_dump():
diff --git a/tests/python/unittest/test_autotvm_xgboost_model.py b/tests/python/unittest/test_autotvm_xgboost_model.py
index 445cff8..baecdac 100644
--- a/tests/python/unittest/test_autotvm_xgboost_model.py
+++ b/tests/python/unittest/test_autotvm_xgboost_model.py
@@ -25,7 +25,7 @@ from tvm import autotvm
from tvm.autotvm import MeasureInput, MeasureResult
from tvm.autotvm.tuner.xgboost_cost_model import XGBoostCostModel
-from test_autotvm_common import get_sample_task, get_sample_records
+from tvm.testing.autotvm import get_sample_task, get_sample_records
def test_fit():