You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tvm.apache.org by ju...@apache.org on 2022/08/02 02:32:01 UTC
[tvm] branch main updated: [Pylint] Making hexagon tests pylint compliant Part 2 of N (#12176)
This is an automated email from the ASF dual-hosted git repository.
junrushao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tvm.git
The following commit(s) were added to refs/heads/main by this push:
new 4a6d655561 [Pylint] Making hexagon tests pylint compliant Part 2 of N (#12176)
4a6d655561 is described below
commit 4a6d6555618ae35bff06276a0aa8b403c34e9641
Author: Anirudh Sundar <qu...@quicinc.com>
AuthorDate: Tue Aug 2 08:01:56 2022 +0530
[Pylint] Making hexagon tests pylint compliant Part 2 of N (#12176)
Second set of **hexagon tests** modified to be pylint compliant as part of #11414 tracking issue. The files supported in this patch are:
* [X] tests/python/contrib/test_hexagon/test_autotvm.py
* [X] tests/python/contrib/test_hexagon/test_cache_read_write.py
* [X] tests/python/contrib/test_hexagon/test_launcher.py
* [X] tests/python/contrib/test_hexagon/test_maxpool2d_blocked.py
* [X] tests/python/contrib/test_hexagon/test_models.py
* [X] tests/python/contrib/test_hexagon/test_run_unit_tests.py
* [X] tests/python/contrib/test_hexagon/test_thread_pool.py
* [X] tests/python/contrib/test_hexagon/test_usmp.py
---
tests/lint/pylint.sh | 8 +
tests/python/contrib/test_hexagon/test_autotvm.py | 38 +++--
.../contrib/test_hexagon/test_cache_read_write.py | 100 +++++++------
tests/python/contrib/test_hexagon/test_launcher.py | 161 +++++++++++++--------
.../contrib/test_hexagon/test_maxpool2d_blocked.py | 86 +++++------
tests/python/contrib/test_hexagon/test_models.py | 15 +-
.../contrib/test_hexagon/test_run_unit_tests.py | 11 +-
.../contrib/test_hexagon/test_thread_pool.py | 13 +-
tests/python/contrib/test_hexagon/test_usmp.py | 22 ++-
9 files changed, 265 insertions(+), 189 deletions(-)
diff --git a/tests/lint/pylint.sh b/tests/lint/pylint.sh
index 0ead015f93..997afd1c00 100755
--- a/tests/lint/pylint.sh
+++ b/tests/lint/pylint.sh
@@ -32,3 +32,11 @@ python3 -m pylint tests/python/contrib/test_hexagon/conv2d/test_conv2d_blocked.p
python3 -m pylint tests/python/contrib/test_hexagon/conv2d/test_conv2d_conv2d.py --rcfile="$(dirname "$0")"/pylintrc
python3 -m pylint tests/python/contrib/test_hexagon/infrastructure.py --rcfile="$(dirname "$0")"/pylintrc
python3 -m pylint tests/python/contrib/test_hexagon/test_2d_physical_buffers.py --rcfile="$(dirname "$0")"/pylintrc
+python3 -m pylint tests/python/contrib/test_hexagon/test_autotvm.py --rcfile="$(dirname "$0")"/pylintrc
+python3 -m pylint tests/python/contrib/test_hexagon/test_cache_read_write.py --rcfile="$(dirname "$0")"/pylintrc
+python3 -m pylint tests/python/contrib/test_hexagon/test_launcher.py --rcfile="$(dirname "$0")"/pylintrc
+python3 -m pylint tests/python/contrib/test_hexagon/test_maxpool2d_blocked.py --rcfile="$(dirname "$0")"/pylintrc
+python3 -m pylint tests/python/contrib/test_hexagon/test_models.py --rcfile="$(dirname "$0")"/pylintrc
+python3 -m pylint tests/python/contrib/test_hexagon/test_run_unit_tests.py --rcfile="$(dirname "$0")"/pylintrc
+python3 -m pylint tests/python/contrib/test_hexagon/test_thread_pool.py --rcfile="$(dirname "$0")"/pylintrc
+python3 -m pylint tests/python/contrib/test_hexagon/test_usmp.py --rcfile="$(dirname "$0")"/pylintrc
diff --git a/tests/python/contrib/test_hexagon/test_autotvm.py b/tests/python/contrib/test_hexagon/test_autotvm.py
index 7f13a70ca2..513d5bdbab 100644
--- a/tests/python/contrib/test_hexagon/test_autotvm.py
+++ b/tests/python/contrib/test_hexagon/test_autotvm.py
@@ -15,40 +15,46 @@
# specific language governing permissions and limitations
# under the License.
+""" Minimal example of tuning on hexagon. """
+
import contextlib
import os
import sys
+
import pytest
-import numpy as np
import tvm
import tvm.testing
-from tvm import tir, te, TVMError
-from tvm.script import tir as T
-from tvm import autotvm
+from tvm import autotvm, te
+from tvm.autotvm.tuner import GATuner, XGBTuner
@autotvm.template("demo_template")
def demo_template():
- M, N, K = [1024] * 3
- A = te.placeholder((M, K), dtype="float32")
- B = te.placeholder((N, K), dtype="float32")
+ """Initial demo template"""
+ size_m, size_n, size_k = [1024] * 3
+ input1 = te.placeholder((size_m, size_k), dtype="float32")
+ input2 = te.placeholder((size_n, size_k), dtype="float32")
k = te.reduce_axis((0, 1024), name="k")
- C = te.compute((M, N), lambda i, j: te.sum(A[i, k] * B[j, k], axis=[k]))
+ output = te.compute(
+ (size_m, size_n), lambda i, j: te.sum(input1[i, k] * input2[j, k], axis=[k])
+ )
- s = te.create_schedule(C.op)
+ s = te.create_schedule(output.op)
cfg = autotvm.get_config()
- m_iter, n_iter = s[C].op.axis
- (k_iter,) = s[C].op.reduce_axis
+ _, _ = s[output].op.axis
+ (k_iter,) = s[output].op.reduce_axis
cfg.define_split("k_split", k_iter, num_outputs=2)
- ko, ki = cfg["k_split"].apply(s, C, k_iter)
+ _, _ = cfg["k_split"].apply(s, output, k_iter)
- return s, [A, B, C]
+ return s, [input1, input2, output]
class HexagonModuleLoader:
+ """HexagonModuleLoader"""
+
def __init__(self, hexagon_session, pre_load_function=None) -> None:
self.pre_load_function = pre_load_function
self.hexagon_session = hexagon_session
@@ -74,8 +80,7 @@ def tune_tasks(
log_filename="tuning.log",
use_transfer_learning=True,
):
- from tvm.autotvm.tuner import XGBTuner
- from tvm.autotvm.tuner import GATuner
+ """Tune tasks with different tuners"""
tmp_log_file = log_filename + ".tmp"
if os.path.exists(tmp_log_file):
@@ -83,7 +88,7 @@ def tune_tasks(
for i, tsk in enumerate(reversed(tasks)):
prefix = "[Task %2d/%2d] " % (i + 1, len(tasks))
- if tuner == "xgb" or tuner == "xgb-rank":
+ if tuner in ("xgb", "xgb-rank"):
tuner_obj = XGBTuner(tsk, loss_type="rank")
elif tuner == "xgb_knob":
tuner_obj = XGBTuner(tsk, loss_type="rank", feature_type="knob")
@@ -118,6 +123,7 @@ def tune_tasks(
@pytest.mark.skip(reason="AutoTVM tuning is not yet enabled on Hexagon")
@tvm.testing.requires_hexagon
def test_autotvm(hexagon_session):
+ """Top level test function for testing autotvm"""
logfilename = "./hexagon.autotvm.log"
options = {
diff --git a/tests/python/contrib/test_hexagon/test_cache_read_write.py b/tests/python/contrib/test_hexagon/test_cache_read_write.py
index 03bae1110f..896db8b59c 100644
--- a/tests/python/contrib/test_hexagon/test_cache_read_write.py
+++ b/tests/python/contrib/test_hexagon/test_cache_read_write.py
@@ -15,17 +15,18 @@
# specific language governing permissions and limitations
# under the License.
-import pytest
+""" Lower cache_read and cache_write to Hexagon DMA via tensorize """
+
import numpy as np
-from tvm.contrib.hexagon.session import Session
import tvm.testing
from tvm import te, tir
-from tvm.script import tir as T
from tvm.contrib.hexagon.session import Session
+from tvm.script import tir as T
def intrin_mem_copy(shape, dtype, dst_scope, src_scope):
+ """Define and return tensor intrinsic for mem copy"""
src = te.placeholder(shape=shape, dtype=dtype, name="src")
dst = te.compute(shape, lambda i: src[i], name="dst")
size = shape[0] * np.dtype(dtype).itemsize
@@ -49,15 +50,15 @@ def intrin_mem_copy(shape, dtype, dst_scope, src_scope):
zero_indices = [0 for _ in shape]
def intrin_func(ins, outs):
- ib = tvm.tir.ir_builder.create()
+ ir_builder = tvm.tir.ir_builder.create()
_src = ins[0]
_dst = outs[0]
- dst_handle = ib.buffer_ptr(dst_buffer)
- src_handle = ib.buffer_ptr(src_buffer)
+ dst_handle = ir_builder.buffer_ptr(dst_buffer)
+ src_handle = ir_builder.buffer_ptr(src_buffer)
- ib.emit(
+ ir_builder.emit(
tvm.tir.call_intrin(
"handle",
"tir.mem_copy",
@@ -66,56 +67,61 @@ def intrin_mem_copy(shape, dtype, dst_scope, src_scope):
size,
)
)
- return ib.get()
+ return ir_builder.get()
return te.decl_tensor_intrin(dst.op, intrin_func, binds={src: src_buffer, dst: dst_buffer})
-def verify(hexagon_session: Session, s, x, y, z, size):
- print(tvm.lower(s, [x, y, z]))
+def verify(hexagon_session: Session, schedule, x_tensor, y_tensor, z_tensor, size):
+ """Verify correctness with reference from numpy"""
+ print(tvm.lower(schedule, [x_tensor, y_tensor, z_tensor]))
target_hexagon = tvm.target.hexagon("v68", link_params=True)
func = tvm.build(
- s, [x, y, z], tvm.target.Target(target_hexagon, host=target_hexagon), name="dmacpy"
+ schedule,
+ [x_tensor, y_tensor, z_tensor],
+ tvm.target.Target(target_hexagon, host=target_hexagon),
+ name="dmacpy",
)
mod = hexagon_session.load_module(func)
- xt = tvm.nd.array(
- np.random.randint(low=-128, high=127, size=size, dtype=x.dtype),
+ x_array = tvm.nd.array(
+ np.random.randint(low=-128, high=127, size=size, dtype=x_tensor.dtype),
device=hexagon_session.device,
)
- yt = tvm.nd.array(
- np.random.randint(low=-128, high=127, size=size, dtype=y.dtype),
+ y_array = tvm.nd.array(
+ np.random.randint(low=-128, high=127, size=size, dtype=y_tensor.dtype),
device=hexagon_session.device,
)
- zt = tvm.nd.array(
- np.random.randint(low=-128, high=127, size=size, dtype=z.dtype),
+ z_array = tvm.nd.array(
+ np.random.randint(low=-128, high=127, size=size, dtype=z_tensor.dtype),
device=hexagon_session.device,
)
- mod["dmacpy"](xt, yt, zt)
+ mod["dmacpy"](x_array, y_array, z_array)
- ref = xt.numpy() + yt.numpy()
- np.testing.assert_equal(zt.numpy(), ref)
+ ref = x_array.numpy() + y_array.numpy()
+ np.testing.assert_equal(z_array.numpy(), ref)
@tvm.testing.requires_hexagon
def test_cache_read_write(hexagon_session: Session):
+ """Test cache_read and cache_write to global.vtcm for hexagon"""
size = 128
outer_shape = (size,)
factor = 16
inner_shape = (factor,)
dtype = "int8"
- x = te.placeholder(shape=outer_shape, dtype=dtype, name="x")
- y = te.placeholder(shape=outer_shape, dtype=dtype, name="y")
- z = te.compute(outer_shape, lambda i: x[i] + y[i], name="z")
- s = te.create_schedule(z.op)
+ x_tensor = te.placeholder(shape=outer_shape, dtype=dtype, name="x")
+ y_tensor = te.placeholder(shape=outer_shape, dtype=dtype, name="y")
+ z_tensor = te.compute(outer_shape, lambda i: x_tensor[i] + y_tensor[i], name="z")
+ s = te.create_schedule(z_tensor.op)
- x_vtcm = s.cache_read(x, "global.vtcm", [z])
- y_vtcm = s.cache_read(y, "global.vtcm", [z])
- z_vtcm = s.cache_write(z, "global.vtcm")
+ x_vtcm = s.cache_read(x_tensor, "global.vtcm", [z_tensor])
+ y_vtcm = s.cache_read(y_tensor, "global.vtcm", [z_tensor])
+ z_vtcm = s.cache_write(z_tensor, "global.vtcm")
- zouter, zinner = s[z_vtcm].split(z_vtcm.op.axis[0], factor=factor)
+ zouter, _ = s[z_vtcm].split(z_vtcm.op.axis[0], factor=factor)
s[x_vtcm].compute_at(s[z_vtcm], zouter)
s[y_vtcm].compute_at(s[z_vtcm], zouter)
@@ -130,10 +136,10 @@ def test_cache_read_write(hexagon_session: Session):
mem_copy_write = intrin_mem_copy(outer_shape, dtype, "global", "global.vtcm")
- (cache_write_z,) = s[z].op.axis
- s[z].tensorize(cache_write_z, mem_copy_write)
+ (cache_write_z,) = s[z_tensor].op.axis
+ s[z_tensor].tensorize(cache_write_z, mem_copy_write)
- verify(hexagon_session, s, x, y, z, size)
+ verify(hexagon_session, s, x_tensor, y_tensor, z_tensor, size)
def layout_transform_2d(n):
@@ -142,24 +148,25 @@ def layout_transform_2d(n):
@tvm.testing.requires_hexagon
def test_cache_read_write_2d(hexagon_session: Session):
+ """Test 2D cache_read and cache_write to global.vtcm for hexagon"""
size = 128
outer_shape = (size,)
factor = 16
inner_shape = (factor,)
dtype = "int8"
- x = te.placeholder(shape=outer_shape, dtype=dtype, name="x")
- y = te.placeholder(shape=outer_shape, dtype=dtype, name="y")
- z = te.compute(outer_shape, lambda i: x[i] + y[i], name="z")
- s = te.create_schedule(z.op)
+ x_tensor = te.placeholder(shape=outer_shape, dtype=dtype, name="x")
+ y_tensor = te.placeholder(shape=outer_shape, dtype=dtype, name="y")
+ z_tensor = te.compute(outer_shape, lambda i: x_tensor[i] + y_tensor[i], name="z")
+ s = te.create_schedule(z_tensor.op)
- x_vtcm = s.cache_read(x, "global.vtcm", [z])
- y_vtcm = s.cache_read(y, "global.vtcm", [z])
- z_vtcm = s.cache_write(z, "global.vtcm")
+ x_vtcm = s.cache_read(x_tensor, "global.vtcm", [z_tensor])
+ y_vtcm = s.cache_read(y_tensor, "global.vtcm", [z_tensor])
+ z_vtcm = s.cache_write(z_tensor, "global.vtcm")
layout_x_vtcm = s[x_vtcm].transform_layout(layout_transform_2d)
layout_y_vtcm = s[y_vtcm].transform_layout(layout_transform_2d)
- layout_z_vtcm = s[z_vtcm].transform_layout(layout_transform_2d)
+ _ = s[z_vtcm].transform_layout(layout_transform_2d)
mem_copy_read = intrin_mem_copy(inner_shape, dtype, "global.vtcm", "global")
s[x_vtcm].tensorize(layout_x_vtcm[1], mem_copy_read)
@@ -169,31 +176,32 @@ def test_cache_read_write_2d(hexagon_session: Session):
# on `z_vtcm` above therefore we must call `split` to modify the loop schedule
# over `z` to match the layout of `z_vtcm` such that we can accurately write
# `z_vtcm` back to `z` using memory copy intrinsic
- zouter, zinner = s[z].split(z.op.axis[0], factor=factor)
+ _, zinner = s[z_tensor].split(z_tensor.op.axis[0], factor=factor)
mem_copy_write = intrin_mem_copy(inner_shape, dtype, "global", "global.vtcm")
- s[z].tensorize(zinner, mem_copy_write)
+ s[z_tensor].tensorize(zinner, mem_copy_write)
- verify(hexagon_session, s, x, y, z, size)
+ verify(hexagon_session, s, x_tensor, y_tensor, z_tensor, size)
@T.prim_func
-def scale_by_two(A: T.Buffer[(8192,), "int8"], C: T.Buffer[(8192,), "int8"]):
+def scale_by_two(buffer_a: T.Buffer[(8192,), "int8"], buffer_c: T.Buffer[(8192,), "int8"]):
for i in T.serial(
0,
8192,
):
with T.block("C"):
- C[i] = A[i] * T.int8(2)
+ buffer_c[i] = buffer_a[i] * T.int8(2)
def test_vtcm_lowering():
+ """Test lowering with vtcm mem scope"""
mod = tvm.IRModule.from_expr(scale_by_two.with_attr("global_symbol", "main"))
sch = tir.Schedule(mod, debug_mask="all")
block_c = sch.get_block("C")
(flat,) = sch.get_loops(block_c)
- o, i, ii, iii = sch.split(flat, factors=[8, 4, 2, 128])
+ outer, _, _, _ = sch.split(flat, factors=[8, 4, 2, 128])
cache_block = sch.cache_read(block_c, 0, storage_scope="global.vtcm")
- sch.compute_at(cache_block, o)
+ sch.compute_at(cache_block, outer)
lowered = tvm.lower(sch.mod["main"])
def ir_module_has_allocate_nodes(irmod):
diff --git a/tests/python/contrib/test_hexagon/test_launcher.py b/tests/python/contrib/test_hexagon/test_launcher.py
index aae2e598f6..9321ddf71d 100644
--- a/tests/python/contrib/test_hexagon/test_launcher.py
+++ b/tests/python/contrib/test_hexagon/test_launcher.py
@@ -15,115 +15,151 @@
# specific language governing permissions and limitations
# under the License.
+""" Test rpc based launcher for hexagon """
+
import numpy as np
import tvm.testing
-from tvm import te
-from tvm import relay
-from tvm.relay.backend import Executor, Runtime
+from tvm import relay, te
from tvm.contrib.hexagon.session import Session
+from tvm.relay.backend import Executor, Runtime
@tvm.testing.requires_hexagon
def test_add(hexagon_session: Session):
+ """Test simple add"""
dtype = "int8"
- A = tvm.te.placeholder((2,), dtype=dtype)
- B = tvm.te.placeholder((1,), dtype=dtype)
- C = tvm.te.compute(A.shape, lambda i: A[i] + B[0], name="C")
- sched = tvm.te.create_schedule(C.op)
+ placeholder_a = tvm.te.placeholder((2,), dtype=dtype)
+ placeholder_b = tvm.te.placeholder((1,), dtype=dtype)
+ compute_c = tvm.te.compute(
+ placeholder_a.shape, lambda i: placeholder_a[i] + placeholder_b[0], name="C"
+ )
+ sched = tvm.te.create_schedule(compute_c.op)
target_hexagon = tvm.target.hexagon("v68", link_params=True)
func = tvm.build(
- sched, [A, B, C], tvm.target.Target(target_hexagon, host=target_hexagon), name="add"
+ sched,
+ [placeholder_a, placeholder_b, compute_c],
+ tvm.target.Target(target_hexagon, host=target_hexagon),
+ name="add",
)
mod = hexagon_session.load_module(func)
- A_data = tvm.nd.array(np.array([2, 3], dtype=dtype), device=hexagon_session.device)
- assert (A_data.numpy() == np.array([2, 3])).all()
- B_data = tvm.nd.array(np.array([4], dtype=dtype), device=hexagon_session.device)
- assert (B_data.numpy() == np.array([4])).all()
- C_data = tvm.nd.array(np.array([0, 0], dtype=dtype), device=hexagon_session.device)
- assert (C_data.numpy() == np.array([0, 0])).all()
- mod["add"](A_data, B_data, C_data)
- assert (C_data.numpy() == np.array([6, 7])).all()
+ a_data = tvm.nd.array(np.array([2, 3], dtype=dtype), device=hexagon_session.device)
+ assert (a_data.numpy() == np.array([2, 3])).all()
+ b_data = tvm.nd.array(np.array([4], dtype=dtype), device=hexagon_session.device)
+ assert (b_data.numpy() == np.array([4])).all()
+ c_data = tvm.nd.array(np.array([0, 0], dtype=dtype), device=hexagon_session.device)
+ assert (c_data.numpy() == np.array([0, 0])).all()
+ mod["add"](a_data, b_data, c_data)
+ assert (c_data.numpy() == np.array([6, 7])).all()
@tvm.testing.requires_hexagon
def test_add_vtcm(hexagon_session: Session):
+ """Test add on VTCM"""
dtype = "int8"
- A = tvm.te.placeholder((2,), dtype=dtype)
- B = tvm.te.placeholder((1,), dtype=dtype)
- C = tvm.te.compute(A.shape, lambda i: A[i] + B[0], name="C")
- sched = tvm.te.create_schedule(C.op)
+ placeholder_a = tvm.te.placeholder((2,), dtype=dtype)
+ placeholder_b = tvm.te.placeholder((1,), dtype=dtype)
+ compute_c = tvm.te.compute(
+ placeholder_a.shape, lambda i: placeholder_a[i] + placeholder_b[0], name="C"
+ )
+ sched = tvm.te.create_schedule(compute_c.op)
target_hexagon = tvm.target.hexagon("v68", link_params=True)
func = tvm.build(
- sched, [A, B, C], tvm.target.Target(target_hexagon, host=target_hexagon), name="add"
+ sched,
+ [placeholder_a, placeholder_b, compute_c],
+ tvm.target.Target(target_hexagon, host=target_hexagon),
+ name="add",
)
mod = hexagon_session.load_module(func)
- A_data = tvm.nd.empty(A.shape, A.dtype, hexagon_session.device, "global.vtcm")
- A_data.copyfrom(np.array([2, 3]))
+ a_data = tvm.nd.empty(
+ placeholder_a.shape, placeholder_a.dtype, hexagon_session.device, "global.vtcm"
+ )
+ a_data.copyfrom(np.array([2, 3]))
- B_data = tvm.nd.empty(B.shape, B.dtype, hexagon_session.device, "global.vtcm")
- B_data.copyfrom(np.array([4]))
+ b_data = tvm.nd.empty(
+ placeholder_b.shape, placeholder_b.dtype, hexagon_session.device, "global.vtcm"
+ )
+ b_data.copyfrom(np.array([4]))
- C_data = tvm.nd.empty(C.shape, C.dtype, hexagon_session.device, "global.vtcm")
- C_data.copyfrom(np.array([0, 0]))
+ c_data = tvm.nd.empty(compute_c.shape, compute_c.dtype, hexagon_session.device, "global.vtcm")
+ c_data.copyfrom(np.array([0, 0]))
- mod["add"](A_data, B_data, C_data)
- result = C_data.numpy()
+ mod["add"](a_data, b_data, c_data)
+ result = c_data.numpy()
assert (result == np.array([6, 7])).all()
class TestMatMul:
- M = tvm.testing.parameter(32)
- N = tvm.testing.parameter(32)
- K = tvm.testing.parameter(32)
+ """Test matmul class"""
+
+ size_m = tvm.testing.parameter(32)
+ size_n = tvm.testing.parameter(32)
+ size_k = tvm.testing.parameter(32)
@tvm.testing.requires_hexagon
- def test_matmul(self, hexagon_session, M, N, K):
- X = te.placeholder((M, K), dtype="float32")
- Y = te.placeholder((K, N), dtype="float32")
- k1 = te.reduce_axis((0, K), name="k1")
- Z = te.compute((M, N), lambda i, j: te.sum(X[i, k1] * Y[k1, j], axis=[k1]))
- schedule = te.create_schedule(Z.op)
+ def test_matmul(self, hexagon_session, size_m, size_n, size_k):
+ """Test matmul"""
+ placeholder_x = te.placeholder((size_m, size_k), dtype="float32")
+ placeholder_y = te.placeholder((size_k, size_n), dtype="float32")
+ reduce_k1 = te.reduce_axis((0, size_k), name="k1")
+ compute_z = te.compute(
+ (size_m, size_n),
+ lambda i, j: te.sum(
+ placeholder_x[i, reduce_k1] * placeholder_y[reduce_k1, j], axis=[reduce_k1]
+ ),
+ )
+ schedule = te.create_schedule(compute_z.op)
target_hexagon = tvm.target.hexagon("v68", link_params=True)
func = tvm.build(
- schedule, [X, Y, Z], tvm.target.Target(target_hexagon, host=target_hexagon)
+ schedule,
+ [placeholder_x, placeholder_y, compute_z],
+ tvm.target.Target(target_hexagon, host=target_hexagon),
)
mod = hexagon_session.load_module(func)
- x = np.random.uniform(size=[i.value for i in X.shape]).astype(X.dtype)
- y = np.random.uniform(size=[i.value for i in Y.shape]).astype(Y.dtype)
- z = np.zeros([i.value for i in Z.shape], dtype=Z.dtype)
+ x_data = np.random.uniform(size=[i.value for i in placeholder_x.shape]).astype(
+ placeholder_x.dtype
+ )
+ y_data = np.random.uniform(size=[i.value for i in placeholder_y.shape]).astype(
+ placeholder_y.dtype
+ )
+ z_data = np.zeros([i.value for i in compute_z.shape], dtype=compute_z.dtype)
- xt = tvm.nd.array(x, device=hexagon_session.device)
- yt = tvm.nd.array(y, device=hexagon_session.device)
- zt = tvm.nd.array(z, device=hexagon_session.device)
- mod(xt, yt, zt)
+ x_array = tvm.nd.array(x_data, device=hexagon_session.device)
+ y_array = tvm.nd.array(y_data, device=hexagon_session.device)
+ z_array = tvm.nd.array(z_data, device=hexagon_session.device)
+ mod(x_array, y_array, z_array)
target_llvm = tvm.target.Target("llvm")
- mod = tvm.build(schedule, [X, Y, Z], tvm.target.Target(target_llvm, host=target_llvm))
+ mod = tvm.build(
+ schedule,
+ [placeholder_x, placeholder_y, compute_z],
+ tvm.target.Target(target_llvm, host=target_llvm),
+ )
device = tvm.cpu(0)
- xtcpu = tvm.nd.array(x, device)
- ytcpu = tvm.nd.array(y, device)
- ztcpu = tvm.nd.array(z, device)
+ xtcpu = tvm.nd.array(x_data, device)
+ ytcpu = tvm.nd.array(y_data, device)
+ ztcpu = tvm.nd.array(z_data, device)
mod(xtcpu, ytcpu, ztcpu)
- tvm.testing.assert_allclose(zt.numpy(), ztcpu.numpy(), rtol=1e-4)
+ tvm.testing.assert_allclose(z_array.numpy(), ztcpu.numpy(), rtol=1e-4)
@tvm.testing.requires_hexagon
def test_graph_executor(hexagon_session: Session):
+ """Test graph executor"""
dtype = "float32"
data = relay.var("data", relay.TensorType((1, 64, 64, 3), dtype))
weight = relay.var("weight", relay.TensorType((5, 5, 3, 8), dtype))
- y = relay.nn.conv2d(
+ conv2d_op = relay.nn.conv2d(
data,
weight,
padding=(2, 2),
@@ -132,7 +168,7 @@ def test_graph_executor(hexagon_session: Session):
kernel_layout="HWIO",
out_dtype="float32",
)
- f = relay.Function([data, weight], y)
+ f = relay.Function([data, weight], conv2d_op)
relay_mod = tvm.IRModule.from_expr(f)
relay_mod = relay.transform.InferType()(relay_mod)
@@ -176,6 +212,7 @@ def test_graph_executor(hexagon_session: Session):
@tvm.testing.requires_hexagon
def test_graph_executor_multiple_conv2d(hexagon_session: Session):
+ """Test multiple conv2d nodes with graph_executor"""
dtype = "float32"
input_shape = (1, 8, 8, 3)
w1_shape = (5, 5, 3, 1)
@@ -183,7 +220,7 @@ def test_graph_executor_multiple_conv2d(hexagon_session: Session):
data = relay.var("data", relay.TensorType(input_shape, dtype))
weight1 = relay.var("weight1", relay.TensorType(w1_shape, dtype))
weight2 = relay.var("weight2", relay.TensorType(w2_shape, dtype))
- y1 = relay.nn.conv2d(
+ conv2d_op1 = relay.nn.conv2d(
data,
weight1,
padding=(2, 2),
@@ -192,8 +229,8 @@ def test_graph_executor_multiple_conv2d(hexagon_session: Session):
kernel_layout="HWIO",
out_dtype="float32",
)
- y2 = relay.nn.conv2d(
- y1,
+ conv2d_op2 = relay.nn.conv2d(
+ conv2d_op1,
weight2,
padding=(2, 2),
kernel_size=(5, 5),
@@ -201,7 +238,7 @@ def test_graph_executor_multiple_conv2d(hexagon_session: Session):
kernel_layout="HWIO",
out_dtype="float32",
)
- f = relay.Function([data, weight1, weight2], y2)
+ f = relay.Function([data, weight1, weight2], conv2d_op2)
relay_mod = tvm.IRModule.from_expr(f)
relay_mod = relay.transform.InferType()(relay_mod)
@@ -253,6 +290,7 @@ def test_graph_executor_multiple_conv2d(hexagon_session: Session):
@tvm.testing.requires_hexagon
def test_aot_executor(hexagon_session: Session, aot_host_target, aot_target):
+ """Test AOT executor"""
dtype = "float32"
input_shape = (1, 128, 128, 3)
w_shape = (5, 5, 3, 8)
@@ -312,6 +350,7 @@ def test_aot_executor(hexagon_session: Session, aot_host_target, aot_target):
@tvm.testing.requires_hexagon
def test_aot_executor_multiple_conv2d(hexagon_session: Session, aot_host_target, aot_target):
+ """Test multiple conv2d nodes with AOT executor"""
dtype = "float32"
input_shape = (1, 8, 8, 3)
w1_shape = (5, 5, 3, 1)
@@ -319,7 +358,7 @@ def test_aot_executor_multiple_conv2d(hexagon_session: Session, aot_host_target,
data = relay.var("data", relay.TensorType(input_shape, dtype))
weight1 = relay.var("weight1", relay.TensorType(w1_shape, dtype))
weight2 = relay.var("weight2", relay.TensorType(w2_shape, dtype))
- y1 = relay.nn.conv2d(
+ conv2d_op1 = relay.nn.conv2d(
data,
weight1,
padding=(2, 2),
@@ -328,8 +367,8 @@ def test_aot_executor_multiple_conv2d(hexagon_session: Session, aot_host_target,
kernel_layout="HWIO",
out_dtype="float32",
)
- y2 = relay.nn.conv2d(
- y1,
+ conv2d_op2 = relay.nn.conv2d(
+ conv2d_op1,
weight2,
padding=(2, 2),
kernel_size=(5, 5),
@@ -337,7 +376,7 @@ def test_aot_executor_multiple_conv2d(hexagon_session: Session, aot_host_target,
kernel_layout="HWIO",
out_dtype="float32",
)
- f = relay.Function([data, weight1, weight2], y2)
+ f = relay.Function([data, weight1, weight2], conv2d_op2)
relay_mod = tvm.IRModule.from_expr(f)
relay_mod = relay.transform.InferType()(relay_mod)
diff --git a/tests/python/contrib/test_hexagon/test_maxpool2d_blocked.py b/tests/python/contrib/test_hexagon/test_maxpool2d_blocked.py
index 23f7d6ed27..dbf3f3d790 100644
--- a/tests/python/contrib/test_hexagon/test_maxpool2d_blocked.py
+++ b/tests/python/contrib/test_hexagon/test_maxpool2d_blocked.py
@@ -15,22 +15,17 @@
# specific language governing permissions and limitations
# under the License.
-import sys
+"""Contrib tests for blocked conv2d and maxpool2d"""
+
+import numpy as np
import tvm
import tvm.testing
-from tvm import te
-from tvm import topi
+from tvm import te, topi
from tvm.topi import testing
-from .infrastructure import (
- ceildiv,
- build_and_run,
- get_block_shape,
- get_packed_shape,
-)
-import numpy as np
-import pytest
+from .infrastructure import build_and_run, get_block_shape, get_packed_shape
+
# Blocked layout: NHWC8h8w32c :: [N, H//8, W//8, C//32, 8h, 8w, 32c]
def maxpool2d_logical(
@@ -47,7 +42,7 @@ def maxpool2d_logical(
activation is nhwc8h8w32c.
"""
- block_H, block_W, block_C = get_block_shape()
+ block_h, block_w, block_c = get_block_shape()
shape = get_packed_shape(shape_nhwc)
logical_output_shape = (
shape_nhwc[0],
@@ -57,60 +52,66 @@ def maxpool2d_logical(
)
output_shape = get_packed_shape(logical_output_shape)
- N, H, W, C = shape_nhwc
- X = te.placeholder(shape_nhwc, dtype=dtype)
+ _, height, width, _ = shape_nhwc
+ placeholder_x = te.placeholder(shape_nhwc, dtype=dtype)
# Combination of padding required by maxpool operator and padding to evenly divisible
# number of blocks. Note that this padding should be inlined in the schedule so
# as to avoid input copying.
- pad_h = (block_H - ((H + padding[1]) % block_H)) % block_H
- pad_w = (block_W - ((W + padding[3]) % block_W)) % block_W
- X_pad = topi.nn.pad(X, [0, padding[0], padding[2], 0], [0, pad_h, pad_w, 0], pad_value=0)
+ pad_h = (block_h - ((height + padding[1]) % block_h)) % block_h
+ pad_w = (block_w - ((width + padding[3]) % block_w)) % block_w
+ x_pad = topi.nn.pad(
+ placeholder_x, [0, padding[0], padding[2], 0], [0, pad_h, pad_w, 0], pad_value=0
+ )
# Calculate packed layout
- X_packed = te.compute(
+ x_packed = te.compute(
shape,
- lambda n, ho, wo, co, hi, wi, ci: X_pad[
- n, ho * block_H + hi, wo * block_W + wi, co * block_C + ci
+ lambda n, ho, wo, co, hi, wi, ci: x_pad[
+ n, ho * block_h + hi, wo * block_w + wi, co * block_c + ci
],
)
- rh = te.reduce_axis((0, window_shape[0]), name="rh")
- rw = te.reduce_axis((0, window_shape[1]), name="rw")
+ reduce_h = te.reduce_axis((0, window_shape[0]), name="rh")
+ reduce_w = te.reduce_axis((0, window_shape[1]), name="rw")
- def compute(n, ho, wo, co, hi, wi, ci):
+ def compute(batch, h_outer, w_outer, c_outer, h_inner, w_inner, c_inner):
# Construct blockized strided maxpool height indices
- h = ho * block_H + hi
- h_contig = h * stride[0] + rh
- h_block_id = h_contig // block_H
- h_block_offset = h_contig % block_H
+ h = h_outer * block_h + h_inner
+ h_contig = h * stride[0] + reduce_h
+ h_block_id = h_contig // block_h
+ h_block_offset = h_contig % block_h
# Construct blockized strided maxpool width indices
- w = wo * block_W + wi
- w_contig = w * stride[1] + rw
- w_block_id = w_contig // block_W
- w_block_offset = w_contig % block_W
+ w_idx = w_outer * block_w + w_inner
+ w_contig = w_idx * stride[1] + reduce_w
+ w_block_id = w_contig // block_w
+ w_block_offset = w_contig % block_w
return te.max(
- X_packed[n, h_block_id, w_block_id, co, h_block_offset, w_block_offset, ci],
- axis=[rh, rw],
+ x_packed[
+ batch, h_block_id, w_block_id, c_outer, h_block_offset, w_block_offset, c_inner
+ ],
+ axis=[reduce_h, reduce_w],
)
- Y = te.compute(output_shape, compute)
- s = te.create_schedule(Y.op)
+ compute_y = te.compute(output_shape, compute)
+ schedule = te.create_schedule(compute_y.op)
# Ensure the padding and array packing is performed inline
- s[X_pad].compute_inline()
- s[X_packed].compute_inline()
+ schedule[x_pad].compute_inline()
+ schedule[x_packed].compute_inline()
binds = {}
if storage_scope and storage_scope != "global":
with tvm.transform.PassContext():
- Xb = tvm.tir.decl_buffer(shape, name="Xb", dtype=dtype, scope=storage_scope)
- Yb = tvm.tir.decl_buffer(output_shape, name="Yb", dtype=dtype, scope=storage_scope)
- binds = {X: Xb, Y: Yb}
+ x_buffer = tvm.tir.decl_buffer(shape, name="Xb", dtype=dtype, scope=storage_scope)
+ y_buffer = tvm.tir.decl_buffer(
+ output_shape, name="Yb", dtype=dtype, scope=storage_scope
+ )
+ binds = {placeholder_x: x_buffer, compute_y: y_buffer}
- return (s, [X, Y], binds)
+ return (schedule, [placeholder_x, compute_y], binds)
class BaseMaxPooling:
@@ -124,8 +125,11 @@ class BaseMaxPooling:
class TestMaxPooling(BaseMaxPooling):
+ """Test MaxPool class"""
+
@tvm.testing.parametrize_targets("llvm")
def test_maxpool(self, shape_nhwc, window_size, stride, pad, dtype, target):
+ """Test blocked maxpool"""
inputs = [np.random.uniform(0, 255, size=shape_nhwc).astype(dtype)]
ref_output = testing.poolnd_python(
inputs[0],
diff --git a/tests/python/contrib/test_hexagon/test_models.py b/tests/python/contrib/test_hexagon/test_models.py
index 4fdc951574..78d7116d08 100644
--- a/tests/python/contrib/test_hexagon/test_models.py
+++ b/tests/python/contrib/test_hexagon/test_models.py
@@ -15,21 +15,22 @@
# specific language governing permissions and limitations
# under the License.
-import sys
-import pytest
+"""Test mobilenet model with both graph and aot executor"""
+
import numpy as np
+import pytest
import tvm.testing
from tvm import relay
-from tvm.relay.backend import Executor, Runtime
from tvm.contrib.hexagon.session import Session
+from tvm.relay.backend import Executor, Runtime
def get_mobilenet():
"""Download and import mobilenet model with ONNX"""
import onnx # pylint: disable=import-outside-toplevel
- model_url = "https://github.com/onnx/models/raw/main/vision/classification/mobilenet/model/mobilenetv2-7.onnx"
+ model_url = "https://github.com/onnx/models/raw/main/vision/classification/mobilenet/model/mobilenetv2-7.onnx" # pylint: disable=line-too-long
model_path = tvm.contrib.download.download_testdata(
model_url, "mobilenetv2-7.onnx", module="onnx"
)
@@ -38,6 +39,7 @@ def get_mobilenet():
@tvm.testing.requires_hexagon
def test_mobilenet(hexagon_session: Session):
+ """Test mobilenet with graph executor"""
dtype = "float32"
onnx_model = get_mobilenet()
@@ -83,11 +85,10 @@ def test_mobilenet(hexagon_session: Session):
tvm.testing.assert_allclose(hexagon_output, expected_output, rtol=1e-4, atol=1e-5)
-enable_usmp = tvm.testing.parameter(False, True)
-
-
+@pytest.mark.parametrize("enable_usmp", [False, True])
@tvm.testing.requires_hexagon
def test_mobilenet_aot(hexagon_session: Session, aot_host_target, aot_target, enable_usmp):
+ """Test mobilenet with aot executor"""
if hexagon_session._launcher._serial_number == "simulator":
pytest.skip(msg="Skip on simulator due to long runtime.")
diff --git a/tests/python/contrib/test_hexagon/test_run_unit_tests.py b/tests/python/contrib/test_hexagon/test_run_unit_tests.py
index 6a60b8fa81..fd75775a01 100644
--- a/tests/python/contrib/test_hexagon/test_run_unit_tests.py
+++ b/tests/python/contrib/test_hexagon/test_run_unit_tests.py
@@ -15,8 +15,8 @@
# specific language governing permissions and limitations
# under the License.
-import os
-import pytest
+""" capture gtest output and return over FFI """
+
import numpy as np
import tvm
@@ -29,11 +29,16 @@ from tvm.contrib.hexagon.session import Session
# pytest -sv <this file> --gtests_args="--gtest_filter=*foo* --gtest_repeat=2"
@tvm.testing.requires_hexagon
def test_run_unit_tests(hexagon_session: Session, gtest_args):
+ """Try running gtest unit tests and capture output and error code"""
try:
func = hexagon_session._rpc.get_function("hexagon.run_unit_tests")
except:
print(
- "This test requires TVM Runtime to be built with a Hexagon gtest version using Hexagon API cmake flag -DUSE_HEXAGON_GTEST=/path/to/hexagon/sdk/utils/googletest/gtest"
+ (
+ "This test requires TVM Runtime to be built with a Hexagon gtest"
+ "version using Hexagon API cmake flag"
+ "-DUSE_HEXAGON_GTEST=/path/to/hexagon/sdk/utils/googletest/gtest"
+ )
)
raise
diff --git a/tests/python/contrib/test_hexagon/test_thread_pool.py b/tests/python/contrib/test_hexagon/test_thread_pool.py
index fa53cdc068..c943dbbb22 100644
--- a/tests/python/contrib/test_hexagon/test_thread_pool.py
+++ b/tests/python/contrib/test_hexagon/test_thread_pool.py
@@ -15,20 +15,23 @@
# specific language governing permissions and limitations
# under the License.
+"""Add hexagon thread pool test"""
+
import numpy as np
import tvm
import tvm.contrib.hexagon
-from tvm.contrib.hexagon.session import Session
import tvm.script
import tvm.testing
-from tvm import te
-
+from tvm.contrib.hexagon.session import Session
from tvm.script import tir as T
@tvm.script.ir_module
class ElemwiseSumIRModule:
+ """IRModule definition for elementwise sum"""
+
+ # pylint: disable=no-self-argument,invalid-name,missing-function-docstring
@T.prim_func
def elemwise_sum_serial(a: T.handle, b: T.handle, c: T.handle, n: T.int32):
T.func_attr({"global_symbol": "elemwise_sum_serial", "tir.noalias": True})
@@ -51,6 +54,8 @@ class ElemwiseSumIRModule:
vi = T.axis.spatial(n, i)
C[vi] = A[vi] + B[vi]
+ # pylint: enable=no-self-argument,invalid-name,missing-function-docstring
+
def generate_add_test_data(hexagon_session: Session, n=128 * 1024):
a = tvm.nd.array(np.random.uniform(size=n).astype("float32"), hexagon_session.device)
@@ -67,6 +72,7 @@ def benchmark_func(mod, name, args, hexagon_session):
@tvm.testing.requires_hexagon
def test_speedup(hexagon_session: Session, capsys):
+ """Test speedup"""
target_hexagon = tvm.target.hexagon("v68", link_params=True)
func = tvm.build(
ElemwiseSumIRModule, target=tvm.target.Target(target_hexagon, host=target_hexagon)
@@ -82,6 +88,7 @@ def test_speedup(hexagon_session: Session, capsys):
@tvm.testing.requires_hexagon
def test_elemwise_sum_parallel(hexagon_session: Session):
+ """Test parallel elementwise sum"""
target_hexagon = tvm.target.hexagon("v68", link_params=True)
func = tvm.build(
ElemwiseSumIRModule, target=tvm.target.Target(target_hexagon, host=target_hexagon)
diff --git a/tests/python/contrib/test_hexagon/test_usmp.py b/tests/python/contrib/test_hexagon/test_usmp.py
index e9d07d1344..d56531b254 100644
--- a/tests/python/contrib/test_hexagon/test_usmp.py
+++ b/tests/python/contrib/test_hexagon/test_usmp.py
@@ -15,23 +15,21 @@
# specific language governing permissions and limitations
# under the License.
-import sys
-import pytest
-import numpy as np
+"""USMP tests"""
+import numpy as np
+import pytest
import tvm.testing
-from tvm import te
from tvm import relay
-from tvm.relay.backend import Executor, Runtime
from tvm.contrib.hexagon.session import Session
+from tvm.relay.backend import Executor, Runtime
from tvm.testing.usmp import is_tvm_backendallocworkspace_calls
-usmp_enabled = tvm.testing.parameter(False, True)
-
-
+@pytest.mark.parametrize("usmp_enabled", [False, True])
@tvm.testing.requires_hexagon
def test_conv2d(hexagon_session: Session, aot_host_target, aot_target, usmp_enabled):
+ """Try conv2d on AOT target with usmp_enabled and check for TVMBackendAllocWorkspace calls"""
dtype = "float32"
input_shape = (1, 8, 8, 3)
w1_shape = (5, 5, 3, 1)
@@ -39,7 +37,7 @@ def test_conv2d(hexagon_session: Session, aot_host_target, aot_target, usmp_enab
data = relay.var("data", relay.TensorType(input_shape, dtype))
weight1 = relay.var("weight1", relay.TensorType(w1_shape, dtype))
weight2 = relay.var("weight2", relay.TensorType(w2_shape, dtype))
- y1 = relay.nn.conv2d(
+ outpu1 = relay.nn.conv2d(
data,
weight1,
padding=(2, 2),
@@ -48,8 +46,8 @@ def test_conv2d(hexagon_session: Session, aot_host_target, aot_target, usmp_enab
kernel_layout="HWIO",
out_dtype="float32",
)
- y2 = relay.nn.conv2d(
- y1,
+ output2 = relay.nn.conv2d(
+ outpu1,
weight2,
padding=(2, 2),
kernel_size=(5, 5),
@@ -57,7 +55,7 @@ def test_conv2d(hexagon_session: Session, aot_host_target, aot_target, usmp_enab
kernel_layout="HWIO",
out_dtype="float32",
)
- f = relay.Function([data, weight1, weight2], y2)
+ f = relay.Function([data, weight1, weight2], output2)
relay_mod = tvm.IRModule.from_expr(f)
relay_mod = relay.transform.InferType()(relay_mod)