You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mynewt.apache.org by ja...@apache.org on 2022/07/27 08:48:20 UTC
[mynewt-nimble] 02/02: tools/hci_throughput: fix coding style - PEP8
This is an automated email from the ASF dual-hosted git repository.
janc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mynewt-nimble.git
commit c28ca3608259ea0840fdbdfc99ea0b09616bd8e5
Author: Jakub <ja...@codecoup.pl>
AuthorDate: Wed Jul 27 10:26:41 2022 +0200
tools/hci_throughput: fix coding style - PEP8
Fixed coding style to conform with PEP8.
autopep8 --in-place --aggressive --aggressive --experimental
---
tools/hci_throughput/check_addr.py | 36 +++--
tools/hci_throughput/hci.py | 170 ++++++++++++++++-------
tools/hci_throughput/hci_commands.py | 215 +++++++++++++++++++-----------
tools/hci_throughput/hci_device.py | 82 ++++++------
tools/hci_throughput/hci_socket.py | 36 +++--
tools/hci_throughput/main.py | 15 ++-
tools/hci_throughput/throughput.py | 71 ++++++----
tools/hci_throughput/transport_factory.py | 5 +-
tools/hci_throughput/util.py | 13 +-
9 files changed, 409 insertions(+), 234 deletions(-)
diff --git a/tools/hci_throughput/check_addr.py b/tools/hci_throughput/check_addr.py
index 7af76a11..45691318 100644
--- a/tools/hci_throughput/check_addr.py
+++ b/tools/hci_throughput/check_addr.py
@@ -27,6 +27,7 @@ import traceback
import util
import transport_factory
+
def parse_arguments():
parser = argparse.ArgumentParser(
description='Check HCI device address type and address',
@@ -34,11 +35,16 @@ def parse_arguments():
sudo python check_addr.py -i 0 1 2')
parser.add_argument('-i', '--indexes', type=str, nargs='*',
help='specify hci adapters indexes', default=0)
- parser.add_argument('-t', '--transport_directory', type=str, nargs='*',
- help='specify hci transport directory path', default="default")
+ parser.add_argument(
+ '-t',
+ '--transport_directory',
+ type=str,
+ nargs='*',
+ help='specify hci transport directory path',
+ default="default")
try:
args = parser.parse_args()
- if (type(args.transport_directory) == list):
+ if (isinstance(args.transport_directory, list)):
args.transport_directory = args.transport_directory.pop()
else:
args.transport_directory = args.transport_directory
@@ -47,6 +53,7 @@ def parse_arguments():
print(traceback.format_exc())
return args
+
async def main(dev: hci_commands.HCI_Commands):
result = tuple()
task = asyncio.create_task(dev.rx_buffer_q_wait())
@@ -54,24 +61,31 @@ async def main(dev: hci_commands.HCI_Commands):
await dev.cmd_read_bd_addr()
if hci.bdaddr != '00:00:00:00:00:00':
- logging.info("Type public: %s, address: %s", hci.PUBLIC_ADDRESS_TYPE, hci.bdaddr)
+ logging.info("Type public: %s, address: %s",
+ hci.PUBLIC_ADDRESS_TYPE, hci.bdaddr)
result = (0, hci.bdaddr)
print("Public address: ", result)
else:
await dev.cmd_vs_read_static_addr()
if hci.static_addr != '00:00:00:00:00:00':
- logging.info("Type static random: %s, address: %s", hci.STATIC_RANDOM_ADDRESS_TYPE, hci.static_addr)
+ logging.info("Type static random: %s, address: %s",
+ hci.STATIC_RANDOM_ADDRESS_TYPE, hci.static_addr)
result = (1, hci.static_addr)
print("Static random address: ", result)
else:
addr = hci.gen_static_rand_addr()
- logging.info("Type static random: %s, generated address: %s", hci.STATIC_RANDOM_ADDRESS_TYPE, addr)
+ logging.info("Type static random: %s, generated address: %s",
+ hci.STATIC_RANDOM_ADDRESS_TYPE, addr)
result = (1, addr)
print("Generated static random address: ", result)
task.cancel()
return result
-def check_addr(device_indexes: list, addresses: list, transport_directory: str) -> list:
+
+def check_addr(
+ device_indexes: list,
+ addresses: list,
+ transport_directory: str) -> list:
util.configure_logging(f"log/check_addr.log", clear_log_file=True)
logging.info(f"Devices indexes: {device_indexes}")
@@ -80,9 +94,8 @@ def check_addr(device_indexes: list, addresses: list, transport_directory: str)
asyncio.set_event_loop(loop)
loop.set_debug(True)
- transport = transport_factory.TransportFactory(device_index=str(index),
- asyncio_loop=loop,
- transport_directory=transport_directory)
+ transport = transport_factory.TransportFactory(device_index=str(
+ index), asyncio_loop=loop, transport_directory=transport_directory)
bt_dev = hci_commands.HCI_Commands(send=transport.send,
rx_buffer_q=transport.rx_buffer_q,
@@ -104,7 +117,8 @@ if __name__ == '__main__':
args = parse_arguments()
print(args)
addresses = []
- addresses = check_addr(args.indexes, addresses, args.transport_directory)
+ addresses = check_addr(args.indexes, addresses,
+ args.transport_directory)
print(addresses)
except Exception as e:
print(traceback.format_exc())
diff --git a/tools/hci_throughput/hci.py b/tools/hci_throughput/hci.py
index 6ed45625..d0a0b8a1 100644
--- a/tools/hci_throughput/hci.py
+++ b/tools/hci_throughput/hci.py
@@ -88,7 +88,7 @@ LE_FEATURE_CODED_PHY = ctypes.c_uint64(0x0800).value
############
# GLOBAL VAR
############
-num_of_bytes_to_send = None # based on supported_max_tx_octets
+num_of_bytes_to_send = None # based on supported_max_tx_octets
num_of_packets_to_send = None
events_list = []
@@ -103,44 +103,53 @@ max_data_len = None
phy = None
ev_num_comp_pkts = None
num_of_completed_packets_cnt = 0
-num_of_completed_packets_time = 0
+num_of_completed_packets_time = 0
read_local_commands = None
le_read_local_supported_features = None
############
# FUNCTIONS
############
+
+
def get_opcode(ogf: int, ocf: int):
- return ((ocf & 0x03ff)|(ogf << 10))
+ return ((ocf & 0x03ff) | (ogf << 10))
+
def get_ogf_ocf(opcode: int):
ogf = opcode >> 10
ocf = opcode & 0x03ff
return ogf, ocf
+
def cmd_addr_to_ba(addr_str: str):
return unhexlify("".join(addr_str.split(':')))[::-1]
+
def ba_addr_to_str(addr_ba: bytearray):
addr_str = addr_ba.hex().upper()
- return ':'.join(addr_str[i:i+2] for i in range(len(addr_str), -2, -2))[1:]
+ return ':'.join(addr_str[i:i + 2]
+ for i in range(len(addr_str), -2, -2))[1:]
+
def gen_static_rand_addr():
while True:
- x = [random.randint(0,1) for _ in range(0,48)]
+ x = [random.randint(0, 1) for _ in range(0, 48)]
if 0 in x[:-2] and 1 in x[:-2]:
x[0] = 1
x[1] = 1
break
- addr_int = int("".join([str(x[i]) for i in range(0,len(x))]), 2)
+ addr_int = int("".join([str(x[i]) for i in range(0, len(x))]), 2)
addr_hex = "{0:0{1}x}".format(addr_int, 12)
- addr = ":".join(addr_hex[i:i+2] for i in range(0, len(addr_hex), 2))
+ addr = ":".join(addr_hex[i:i + 2] for i in range(0, len(addr_hex), 2))
return addr.upper()
############
# GLOBAL VAR CLASSES
############
+
+
@dataclass
class Suggested_Dflt_Data_Length():
status: int
@@ -150,11 +159,16 @@ class Suggested_Dflt_Data_Length():
def __init__(self):
self.set()
- def set(self, status=0, suggested_max_tx_octets=0, suggested_max_tx_time=0):
+ def set(
+ self,
+ status=0,
+ suggested_max_tx_octets=0,
+ suggested_max_tx_time=0):
self.status = status
self.suggested_max_tx_octets = suggested_max_tx_octets
self.suggested_max_tx_time = suggested_max_tx_time
+
@dataclass
class Max_Data_Length():
status: int
@@ -174,6 +188,7 @@ class Max_Data_Length():
self.supported_max_rx_octets = supported_max_rx_octets
self.supported_max_rx_time = supported_max_rx_time
+
@dataclass
class LE_Read_Buffer_Size:
status: int
@@ -194,6 +209,7 @@ class LE_Read_Buffer_Size:
self.iso_data_packet_len = iso_data_packet_len
self.total_num_iso_data_packets = total_num_iso_data_packets
+
@dataclass
class LE_Read_PHY:
status: int
@@ -210,6 +226,7 @@ class LE_Read_PHY:
self.tx_phy = tx_phy
self.rx_phy = rx_phy
+
@dataclass
class Read_Local_Commands:
status: int
@@ -218,10 +235,11 @@ class Read_Local_Commands:
def __init__(self):
self.set()
- def set(self, rcv_bytes = bytes(65)):
+ def set(self, rcv_bytes=bytes(65)):
self.status = int(rcv_bytes[0])
self.supported_commands = rcv_bytes[1:]
+
@dataclass
class LE_Read_Local_Supported_Features:
status: int
@@ -230,14 +248,16 @@ class LE_Read_Local_Supported_Features:
def __init__(self):
self.set()
- def set(self, rcv_bytes = bytes(9)):
+ def set(self, rcv_bytes=bytes(9)):
self.status = int(rcv_bytes[0])
- self.le_features = ctypes.c_uint64.from_buffer_copy(rcv_bytes[1:]).value
+ self.le_features = ctypes.c_uint64.from_buffer_copy(
+ rcv_bytes[1:]).value
############
# EVENTS
############
+
@dataclass
class HCI_Ev_Disconn_Complete:
status: int
@@ -252,6 +272,7 @@ class HCI_Ev_Disconn_Complete:
self.connection_handle = connection_handle
self.reason = reason
+
@dataclass
class HCI_Ev_Cmd_Complete:
num_hci_command_packets: int
@@ -266,6 +287,7 @@ class HCI_Ev_Cmd_Complete:
self.opcode = opcode
self.return_parameters = return_parameters
+
@dataclass
class HCI_Ev_Cmd_Status:
status: int
@@ -275,11 +297,12 @@ class HCI_Ev_Cmd_Status:
def __init__(self):
self.set()
- def set(self, status = 0, num_hci_cmd_packets=0, opcode=0):
+ def set(self, status=0, num_hci_cmd_packets=0, opcode=0):
self.status = status
self.num_hci_command_packets = num_hci_cmd_packets
self.opcode = opcode
+
@dataclass
class HCI_Ev_LE_Meta:
subevent_code: int
@@ -327,6 +350,7 @@ class HCI_Ev_LE_Enhanced_Connection_Complete(HCI_Ev_LE_Meta):
self.supervision_timeout = supervision_timeout
self.central_clock_accuracy = central_clock_accuracy
+
@dataclass
class HCI_Ev_LE_Data_Length_Change(HCI_Ev_LE_Meta):
conn_handle: int
@@ -341,13 +365,14 @@ class HCI_Ev_LE_Data_Length_Change(HCI_Ev_LE_Meta):
def set(self, subevent_code=0, conn_handle=0, max_tx_octets=0,
max_tx_time=0, max_rx_octets=0, max_rx_time=0, triggered=0):
- super().set(subevent_code)
- self.conn_handle = conn_handle
- self.max_tx_octets = max_tx_octets
- self.max_tx_time = max_tx_time
- self.max_rx_octets = max_rx_octets
- self.max_rx_time = max_rx_time
- self.triggered = triggered
+ super().set(subevent_code)
+ self.conn_handle = conn_handle
+ self.max_tx_octets = max_tx_octets
+ self.max_tx_time = max_tx_time
+ self.max_rx_octets = max_rx_octets
+ self.max_rx_time = max_rx_time
+ self.triggered = triggered
+
@dataclass
class HCI_Ev_LE_PHY_Update_Complete(HCI_Ev_LE_Meta):
@@ -367,6 +392,7 @@ class HCI_Ev_LE_PHY_Update_Complete(HCI_Ev_LE_Meta):
self.tx_phy = tx_phy
self.rx_phy = rx_phy
+
@dataclass
class HCI_Number_Of_Completed_Packets:
num_handles: int
@@ -381,6 +407,7 @@ class HCI_Number_Of_Completed_Packets:
self.connection_handle = connection_handle
self.num_completed_packets = num_completed_packets
+
class HCI_Ev_LE_Chan_Sel_Alg(HCI_Ev_LE_Meta):
connection_handle: int
algorithm: int
@@ -396,6 +423,8 @@ class HCI_Ev_LE_Chan_Sel_Alg(HCI_Ev_LE_Meta):
############
# PARAMETERS
############
+
+
@dataclass
class HCI_Advertising:
advertising_interval_min: int
@@ -411,9 +440,9 @@ class HCI_Advertising:
def __init__(self):
self.set()
- def set(self, advertising_interval_min=0, advertising_interval_max=0, \
- advertising_type=0, own_address_type=0, peer_address_type=0, \
- peer_address='00:00:00:00:00:00', advertising_channel_map=0, \
+ def set(self, advertising_interval_min=0, advertising_interval_max=0,
+ advertising_type=0, own_address_type=0, peer_address_type=0,
+ peer_address='00:00:00:00:00:00', advertising_channel_map=0,
advertising_filter_policy=0):
self.advertising_interval_min = advertising_interval_min
self.advertising_interval_max = advertising_interval_max
@@ -423,13 +452,20 @@ class HCI_Advertising:
self.peer_address = peer_address
self.advertising_channel_map = advertising_channel_map
self.advertising_filter_policy = advertising_filter_policy
- self.ba_full_message = bytearray(struct.pack('<HHBBBBB',
- advertising_interval_min, advertising_interval_max,
- advertising_type, own_address_type, peer_address_type,
- advertising_channel_map, advertising_filter_policy))
+ self.ba_full_message = bytearray(
+ struct.pack(
+ '<HHBBBBB',
+ advertising_interval_min,
+ advertising_interval_max,
+ advertising_type,
+ own_address_type,
+ peer_address_type,
+ advertising_channel_map,
+ advertising_filter_policy))
peer_addr_ba = cmd_addr_to_ba(peer_address)
self.ba_full_message[7:7] = peer_addr_ba
+
@dataclass
class HCI_Scan:
le_scan_type: int
@@ -449,9 +485,15 @@ class HCI_Scan:
self.le_scan_window = le_scan_window
self.own_address_type = own_address_type
self.scanning_filter_policy = scanning_filter_policy
- self.ba_full_message = bytearray(struct.pack('<BHHBB',le_scan_type,
- le_scan_interval, le_scan_window, own_address_type,
- scanning_filter_policy))
+ self.ba_full_message = bytearray(
+ struct.pack(
+ '<BHHBB',
+ le_scan_type,
+ le_scan_interval,
+ le_scan_window,
+ own_address_type,
+ scanning_filter_policy))
+
@dataclass
class HCI_Connect:
@@ -472,11 +514,11 @@ class HCI_Connect:
def __init__(self):
self.set()
- def set(self, le_scan_interval=0, le_scan_window=0, \
- initiator_filter_policy=0, peer_address_type=0, \
- peer_address='00:00:00:00:00:00', own_address_type=0, \
- connection_interval_min=0, connection_interval_max=0, \
- max_latency=0, supervision_timeout=0, min_ce_length=0, \
+ def set(self, le_scan_interval=0, le_scan_window=0,
+ initiator_filter_policy=0, peer_address_type=0,
+ peer_address='00:00:00:00:00:00', own_address_type=0,
+ connection_interval_min=0, connection_interval_max=0,
+ max_latency=0, supervision_timeout=0, min_ce_length=0,
max_ce_length=0):
self.le_scan_interval = le_scan_interval
self.le_scan_window = le_scan_window
@@ -490,17 +532,28 @@ class HCI_Connect:
self.supervision_timeout = supervision_timeout
self.min_ce_length = min_ce_length
self.max_ce_length = max_ce_length
- self.ba_full_message = bytearray(struct.pack('<HHBBBHHHHHH',
- le_scan_interval, le_scan_window, initiator_filter_policy,
- peer_address_type, own_address_type, connection_interval_min,
- connection_interval_max, max_latency,supervision_timeout,
- min_ce_length, max_ce_length))
+ self.ba_full_message = bytearray(
+ struct.pack(
+ '<HHBBBHHHHHH',
+ le_scan_interval,
+ le_scan_window,
+ initiator_filter_policy,
+ peer_address_type,
+ own_address_type,
+ connection_interval_min,
+ connection_interval_max,
+ max_latency,
+ supervision_timeout,
+ min_ce_length,
+ max_ce_length))
peer_addr_ba = cmd_addr_to_ba(peer_address)
self.ba_full_message[6:6] = peer_addr_ba
############
# RX / TX
############
+
+
@dataclass
class HCI_Receive:
packet_type: int
@@ -508,9 +561,10 @@ class HCI_Receive:
def __init__(self):
self.set()
- def set(self,packet_type=0):
+ def set(self, packet_type=0):
self.packet_type = packet_type
+
@dataclass
class HCI_Recv_Event_Packet(HCI_Receive):
ev_code: int
@@ -521,7 +575,7 @@ class HCI_Recv_Event_Packet(HCI_Receive):
def __init__(self):
self.set()
- def set(self,packet_type=0, ev_code=0, packet_len=0,
+ def set(self, packet_type=0, ev_code=0, packet_len=0,
recv_data=bytearray(256)):
super().set(packet_type)
self.ev_code = ev_code
@@ -529,6 +583,7 @@ class HCI_Recv_Event_Packet(HCI_Receive):
self.recv_data = recv_data
self.recv_data = recv_data[:packet_len]
+
@dataclass
class HCI_Recv_ACL_Data_Packet(HCI_Receive):
connection_handle: int
@@ -549,6 +604,7 @@ class HCI_Recv_ACL_Data_Packet(HCI_Receive):
self.data_total_len = total_data_len
self.data = data
+
@dataclass
class HCI_Recv_L2CAP_Data:
pdu_length: int
@@ -563,6 +619,7 @@ class HCI_Recv_L2CAP_Data:
self.channel_id = channel_id
self.data = data
+
@dataclass
class HCI_Cmd_Send:
packet_type: int
@@ -582,10 +639,15 @@ class HCI_Cmd_Send:
self.opcode = get_opcode(ogf, ocf)
self.packet_len = len(data)
self.data = data
- self.ba_full_message = bytearray(struct.pack('<BHB',
- self.packet_type, self.opcode, self.packet_len))
+ self.ba_full_message = bytearray(
+ struct.pack(
+ '<BHB',
+ self.packet_type,
+ self.opcode,
+ self.packet_len))
self.ba_full_message.extend(self.data)
+
@dataclass
class HCI_ACL_Data_Send:
packet_type: int
@@ -606,14 +668,17 @@ class HCI_ACL_Data_Send:
self.bc_flag = bc_flag
self.data_total_length = len(data)
self.data = data
- self.ba_full_message = bytearray(struct.pack('<BHH',
- self.packet_type,
- ((self.connection_handle & 0x0eff) |
- (self.pb_flag << 12) |
- (self.bc_flag << 14)),
- self.data_total_length))
+ self.ba_full_message = bytearray(
+ struct.pack(
+ '<BHH',
+ self.packet_type,
+ ((self.connection_handle & 0x0eff) | (
+ self.pb_flag << 12) | (
+ self.bc_flag << 14)),
+ self.data_total_length))
self.ba_full_message.extend(self.data)
+
@dataclass
class L2CAP_Data_Send:
pdu_length: int
@@ -632,6 +697,9 @@ class L2CAP_Data_Send:
self.channel_id = channel_id
self.data = data
fmt_conf = "<HH"
- self.ba_full_message = bytearray(struct.pack(fmt_conf,
- self.pdu_length, self.channel_id))
+ self.ba_full_message = bytearray(
+ struct.pack(
+ fmt_conf,
+ self.pdu_length,
+ self.channel_id))
self.ba_full_message.extend(data)
diff --git a/tools/hci_throughput/hci_commands.py b/tools/hci_throughput/hci_commands.py
index a6040ef0..83e6284f 100644
--- a/tools/hci_throughput/hci_commands.py
+++ b/tools/hci_throughput/hci_commands.py
@@ -24,10 +24,12 @@ import hci
import sys
import time
+
async def wait_ev(ev):
while ev.is_set() == False:
await asyncio.sleep(0.000001)
+
async def wait_for_event(ev, timeout):
try:
await asyncio.wait_for(wait_ev(ev), timeout)
@@ -35,6 +37,7 @@ async def wait_for_event(ev, timeout):
logging.error(f"Timeout waiting for event: {e}")
sys.exit()
+
class HCI_Commands():
def __init__(self, send=None, rx_buffer_q=None,
asyncio_loop=None, tp=None, device_mode="rx"):
@@ -61,7 +64,6 @@ class HCI_Commands():
self.loop = asyncio_loop
self.device_mode = device_mode
-
async def rx_buffer_q_wait(self):
try:
logging.debug("%s", self.rx_buffer_q_wait.__name__)
@@ -75,13 +77,13 @@ class HCI_Commands():
except asyncio.CancelledError:
logging.critical("rx_buffer_q_wait task canceled")
-
""" 7.3 Controller & Baseband commands """
async def cmd_set_event_mask(self, mask: int = 0x00001fffffffffff):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_HOST_CTL, hci.OCF_SET_EVENT_MASK,
struct.pack('<Q', mask))
- logging.debug("%s %s", self.cmd_set_event_mask.__name__, self.hci_send_cmd)
+ logging.debug("%s %s", self.cmd_set_event_mask.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
@@ -94,12 +96,13 @@ class HCI_Commands():
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
-
""" 7.4 Informational parameters """
async def cmd_read_local_supported_cmds(self):
async with self.async_sem_cmd:
- self.hci_send_cmd.set(hci.OGF_INFO_PARAM, hci.OCF_READ_LOCAL_COMMANDS)
- logging.debug("%s %s", self.cmd_read_local_supported_cmds.__name__, self.hci_send_cmd)
+ self.hci_send_cmd.set(hci.OGF_INFO_PARAM,
+ hci.OCF_READ_LOCAL_COMMANDS)
+ logging.debug("%s %s", self.cmd_read_local_supported_cmds.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
@@ -107,34 +110,40 @@ class HCI_Commands():
async def cmd_read_bd_addr(self):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_INFO_PARAM, hci.OCF_READ_BD_ADDR)
- logging.debug("%s %s", self.cmd_read_bd_addr.__name__, self.hci_send_cmd)
+ logging.debug("%s %s", self.cmd_read_bd_addr.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
-
""" 7.8 LE Controller Commands """
async def cmd_le_set_event_mask(self, mask: int = 0x000000000000001f):
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_EVENT_MASK,
struct.pack('<Q', mask))
- logging.debug("%s %s", self.cmd_le_set_event_mask.__name__, self.hci_send_cmd)
+ logging.debug("%s %s", self.cmd_le_set_event_mask.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_read_buffer_size(self):
async with self.async_sem_cmd:
- self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_READ_BUFFER_SIZE_V1)
- logging.debug("%s %s", self.cmd_le_read_buffer_size.__name__, self.hci_send_cmd)
+ self.hci_send_cmd.set(hci.OGF_LE_CTL,
+ hci.OCF_LE_READ_BUFFER_SIZE_V1)
+ logging.debug("%s %s", self.cmd_le_read_buffer_size.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_read_local_supported_features(self):
async with self.async_sem_cmd:
- self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_READ_LOCAL_SUPPORTED_FEATURES)
- logging.debug("%s %s", self.cmd_le_read_local_supported_features.__name__, self.hci_send_cmd)
+ self.hci_send_cmd.set(hci.OGF_LE_CTL,
+ hci.OCF_LE_READ_LOCAL_SUPPORTED_FEATURES)
+ logging.debug("%s %s",
+ self.cmd_le_read_local_supported_features.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
@@ -145,7 +154,8 @@ class HCI_Commands():
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_SET_RANDOM_ADDRESS,
addr_ba)
- logging.debug("%s %s", self.cmd_le_set_random_addr.__name__, self.hci_send_cmd)
+ logging.debug("%s %s", self.cmd_le_set_random_addr.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
@@ -155,7 +165,8 @@ class HCI_Commands():
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_SET_ADVERTISING_PARAMETERS,
adv_params.ba_full_message)
- logging.debug("%s %s", self.cmd_le_set_advertising_params.__name__, self.hci_send_cmd)
+ logging.debug("%s %s", self.cmd_le_set_advertising_params.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
@@ -165,17 +176,21 @@ class HCI_Commands():
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_SET_ADVERTISE_ENABLE,
- struct.pack('<B',adv_en))
- logging.debug("%s %s", self.cmd_le_set_advertising_enable.__name__, self.hci_send_cmd)
+ struct.pack('<B', adv_en))
+ logging.debug("%s %s", self.cmd_le_set_advertising_enable.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
async def cmd_le_set_scan_params(self, scan_params: hci.HCI_Scan):
async with self.async_sem_cmd:
- self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_SCAN_PARAMETERS,
- scan_params.ba_full_message)
- logging.debug("%s %s", self.cmd_le_set_scan_params.__name__, self.hci_send_cmd)
+ self.hci_send_cmd.set(
+ hci.OGF_LE_CTL,
+ hci.OCF_LE_SET_SCAN_PARAMETERS,
+ scan_params.ba_full_message)
+ logging.debug("%s %s", self.cmd_le_set_scan_params.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
@@ -188,7 +203,8 @@ class HCI_Commands():
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_SCAN_ENABLE,
struct.pack('<BB', scan_en, filter_dup))
- logging.debug("%s %s", self.cmd_le_set_scan_enable.__name__, self.hci_send_cmd)
+ logging.debug("%s %s", self.cmd_le_set_scan_enable.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
@@ -197,7 +213,8 @@ class HCI_Commands():
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_CREATE_CONN,
con_params.ba_full_message)
- logging.debug("%s %s", self.cmd_le_create_connection.__name__, self.hci_send_cmd)
+ logging.debug("%s %s", self.cmd_le_create_connection.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
@@ -218,8 +235,9 @@ class HCI_Commands():
await asyncio.sleep(0.001)
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_DATA_LEN,
struct.pack('<HHH', conn_handle,
- tx_octets, tx_time))
- logging.debug("%s %s", self.cmd_le_set_data_len.__name__, self.hci_send_cmd)
+ tx_octets, tx_time))
+ logging.debug("%s %s", self.cmd_le_set_data_len.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
@@ -228,7 +246,10 @@ class HCI_Commands():
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_READ_SUGGESTED_DFLT_DATA_LEN)
- logging.debug("%s %s",self.cmd_le_read_suggested_dflt_data_len.__name__, self.hci_send_cmd)
+ logging.debug(
+ "%s %s",
+ self.cmd_le_read_suggested_dflt_data_len.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
@@ -237,7 +258,8 @@ class HCI_Commands():
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL,
hci.OCF_LE_READ_MAX_DATA_LEN)
- logging.debug("%s %s", self.cmd_le_read_max_data_len.__name__, self.hci_send_cmd)
+ logging.debug("%s %s", self.cmd_le_read_max_data_len.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
@@ -250,7 +272,8 @@ class HCI_Commands():
await asyncio.sleep(0.001)
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_READ_PHY,
struct.pack('<H', conn_handle))
- logging.debug("%s %s", self.cmd_le_read_phy.__name__, self.hci_send_cmd)
+ logging.debug("%s %s", self.cmd_le_read_phy.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
@@ -267,8 +290,9 @@ class HCI_Commands():
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_DFLT_PHY,
struct.pack('<BBB', all_phys,
- tx_phys, rx_phys))
- logging.debug("%s %s", self.cmd_le_set_dflt_phy.__name__, self.hci_send_cmd)
+ tx_phys, rx_phys))
+ logging.debug("%s %s", self.cmd_le_set_dflt_phy.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
@@ -292,8 +316,9 @@ class HCI_Commands():
await asyncio.sleep(0.001)
self.hci_send_cmd.set(hci.OGF_LE_CTL, hci.OCF_LE_SET_PHY,
struct.pack('<HBBBH', conn_handle, all_phys,
- tx_phys, rx_phys, phy_options))
- logging.debug("%s %s", self.cmd_le_set_phy.__name__, self.hci_send_cmd)
+ tx_phys, rx_phys, phy_options))
+ logging.debug("%s %s", self.cmd_le_set_phy.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
@@ -302,12 +327,12 @@ class HCI_Commands():
async with self.async_sem_cmd:
self.hci_send_cmd.set(hci.OGF_VENDOR_SPECIFIC,
hci.BLE_HCI_OCF_VS_RD_STATIC_ADDR)
- logging.debug("%s %s", self.cmd_vs_read_static_addr.__name__, self.hci_send_cmd)
+ logging.debug("%s %s", self.cmd_vs_read_static_addr.__name__,
+ self.hci_send_cmd)
await self.send(self.hci_send_cmd.ba_full_message)
await self.async_ev_cmd_end.wait()
self.async_ev_cmd_end.clear()
-
""" Send data """
async def acl_data_send(self, acl_data: hci.HCI_ACL_Data_Send):
async with self.async_sem_cmd:
@@ -316,8 +341,8 @@ class HCI_Commands():
await self.send(self.hci_send_acl_data.ba_full_message)
self.sent_packets_counter += 1
-
""" Parse and process received data"""
+
def parse_ev_disconn_cmp(self, data: bytes):
ev_disconn_cmp = hci.HCI_Ev_Disconn_Complete()
ev_disconn_cmp.set(*struct.unpack('<BHB', bytes(data[:4])))
@@ -369,11 +394,11 @@ class HCI_Commands():
def process_returned_parameters(self):
def status() -> int:
- current_ev_name = type(self.hci_recv_ev_packet.current_event).__name__
+ current_ev_name = type(
+ self.hci_recv_ev_packet.current_event).__name__
if current_ev_name == type(hci.HCI_Ev_Cmd_Complete()).__name__:
- return struct.unpack_from("<B",
- self.hci_recv_ev_packet.current_event.return_parameters,
- offset=0)[0]
+ return struct.unpack_from(
+ "<B", self.hci_recv_ev_packet.current_event.return_parameters, offset=0)[0]
elif current_ev_name == type(hci.HCI_Ev_Cmd_Status()).__name__:
return self.hci_recv_ev_packet.current_event.status
else:
@@ -391,11 +416,12 @@ class HCI_Commands():
elif ogf == hci.OGF_INFO_PARAM:
if ocf == hci.OCF_READ_LOCAL_COMMANDS:
hci.read_local_commands = hci.Read_Local_Commands()
- hci.read_local_commands.set(bytes(current_ev.return_parameters))
+ hci.read_local_commands.set(
+ bytes(current_ev.return_parameters))
return status()
elif ocf == hci.OCF_READ_BD_ADDR:
hci.bdaddr = hci.ba_addr_to_str(
- bytes(current_ev.return_parameters[1:7]))
+ bytes(current_ev.return_parameters[1:7]))
return status()
elif ogf == hci.OGF_LE_CTL:
@@ -409,7 +435,8 @@ class HCI_Commands():
return hci.le_read_buffer_size.status
elif ocf == hci.OCF_LE_READ_LOCAL_SUPPORTED_FEATURES:
hci.le_read_local_supported_features = hci.LE_Read_Local_Supported_Features()
- hci.le_read_local_supported_features.set(current_ev.return_parameters)
+ hci.le_read_local_supported_features.set(
+ current_ev.return_parameters)
return status()
elif ocf == hci.OCF_LE_SET_RANDOM_ADDRESS:
return status()
@@ -429,18 +456,22 @@ class HCI_Commands():
hci.suggested_dflt_data_len = hci.Suggested_Dflt_Data_Length()
hci.suggested_dflt_data_len.set(*struct.unpack("<BHH",
current_ev.return_parameters))
- logging.info(f"Suggested Deafult Data Len: {hci.suggested_dflt_data_len}")
+ logging.info(
+ f"Suggested Deafult Data Len: {hci.suggested_dflt_data_len}")
return status()
elif ocf == hci.OCF_LE_READ_MAX_DATA_LEN:
hci.max_data_len = hci.Max_Data_Length()
hci.max_data_len.set(*struct.unpack("<BHHHH",
current_ev.return_parameters))
logging.info(f"Suggested Max Data Len: {hci.max_data_len}")
- if (hci.num_of_bytes_to_send > hci.max_data_len.supported_max_tx_octets - hci.L2CAP_HDR_BYTES):
- logging.critical(f"Number of data bytes to send + 4 bytes of L2CAP header: {hci.num_of_bytes_to_send + 4} "
- f"exceeds allowed value of: {hci.max_data_len.supported_max_tx_octets}. Closing.")
- raise SystemExit(f"Number of data bytes to send + 4 bytes of L2CAP header: {hci.num_of_bytes_to_send + 4} "
- f"exceeds allowed value of: {hci.max_data_len.supported_max_tx_octets}. Closing.")
+ if (hci.num_of_bytes_to_send >
+ hci.max_data_len.supported_max_tx_octets - hci.L2CAP_HDR_BYTES):
+ logging.critical(
+ f"Number of data bytes to send + 4 bytes of L2CAP header: {hci.num_of_bytes_to_send + 4} "
+ f"exceeds allowed value of: {hci.max_data_len.supported_max_tx_octets}. Closing.")
+ raise SystemExit(
+ f"Number of data bytes to send + 4 bytes of L2CAP header: {hci.num_of_bytes_to_send + 4} "
+ f"exceeds allowed value of: {hci.max_data_len.supported_max_tx_octets}. Closing.")
return status()
elif ocf == hci.OCF_LE_READ_PHY:
@@ -456,9 +487,10 @@ class HCI_Commands():
elif ogf == hci.OGF_VENDOR_SPECIFIC:
if ocf == hci.BLE_HCI_OCF_VS_RD_STATIC_ADDR:
- if type(current_ev).__name__ == type(hci.HCI_Ev_Cmd_Complete()).__name__:
+ if type(current_ev).__name__ == type(
+ hci.HCI_Ev_Cmd_Complete()).__name__:
hci.static_addr = hci.ba_addr_to_str(
- bytes(current_ev.return_parameters[1:7]))
+ bytes(current_ev.return_parameters[1:7]))
logging.info(f"Received rd static addr: {hci.static_addr}")
elif type(current_ev).__name__ == type(hci.HCI_Ev_Cmd_Status()).__name__:
logging.info(f"Rd static addr status: {current_ev.status}")
@@ -484,39 +516,43 @@ class HCI_Commands():
l2cap_data = buffer[5:]
hci_recv_acl_data_packet.set(
- packet_type=packet_type,
- connection_handle=handle,
- pb_flag=pb_flag,
- bc_flag=bc_flag,
- total_data_len=data_len,
- data=l2cap_data)
+ packet_type=packet_type,
+ connection_handle=handle,
+ pb_flag=pb_flag,
+ bc_flag=bc_flag,
+ total_data_len=data_len,
+ data=l2cap_data)
return hci_recv_acl_data_packet
def parse_subevent(self, subev_code: int):
if subev_code == hci.HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP:
self.hci_recv_ev_packet.current_event = \
- self.parse_subev_le_enhcd_conn_cmp(self.hci_recv_ev_packet.recv_data)
+ self.parse_subev_le_enhcd_conn_cmp(
+ self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP,
self.hci_recv_ev_packet.current_event))
return hci.HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP
elif subev_code == hci.HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE:
self.hci_recv_ev_packet.current_event = \
- self.parse_subev_le_data_len_change(self.hci_recv_ev_packet.recv_data)
+ self.parse_subev_le_data_len_change(
+ self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE,
self.hci_recv_ev_packet.current_event))
return hci.HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE
elif subev_code == hci.HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP:
self.hci_recv_ev_packet.current_event = \
- self.parse_subev_le_phy_update_cmp(self.hci_recv_ev_packet.recv_data)
+ self.parse_subev_le_phy_update_cmp(
+ self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP,
self.hci_recv_ev_packet.current_event))
return hci.HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP
elif subev_code == hci.HCI_SUBEV_CODE_LE_CHAN_SEL_ALG:
self.hci_recv_ev_packet.current_event = \
- self.parse_subev_le_chan_sel_alg(self.hci_recv_ev_packet.recv_data)
+ self.parse_subev_le_chan_sel_alg(
+ self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_SUBEV_CODE_LE_CHAN_SEL_ALG,
self.hci_recv_ev_packet.current_event))
return hci.HCI_SUBEV_CODE_LE_CHAN_SEL_ALG
@@ -526,26 +562,26 @@ class HCI_Commands():
def parse_event(self, buffer: bytes):
self.hci_recv_ev_packet.set(*struct.unpack('<BBB', bytes(buffer[:3])),
- buffer[3:])
+ buffer[3:])
if self.hci_recv_ev_packet.ev_code == hci.HCI_EV_CODE_DISCONN_CMP:
self.hci_recv_ev_packet.current_event = \
self.parse_ev_disconn_cmp(self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_EV_CODE_DISCONN_CMP,
- self.hci_recv_ev_packet.current_event))
+ self.hci_recv_ev_packet.current_event))
return hci.HCI_EV_CODE_DISCONN_CMP
elif self.hci_recv_ev_packet.ev_code == hci.HCI_EV_CODE_CMD_CMP:
self.hci_recv_ev_packet.current_event = \
self.parse_ev_cmd_cmp(self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_EV_CODE_CMD_CMP,
- self.hci_recv_ev_packet.current_event))
+ self.hci_recv_ev_packet.current_event))
return hci.HCI_EV_CODE_CMD_CMP
elif self.hci_recv_ev_packet.ev_code == hci.HCI_EV_CODE_CMD_STATUS:
self.hci_recv_ev_packet.current_event = \
self.parse_ev_cmd_stat(self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_EV_CODE_CMD_STATUS,
- self.hci_recv_ev_packet.current_event))
+ self.hci_recv_ev_packet.current_event))
return hci.HCI_EV_CODE_CMD_STATUS
elif self.hci_recv_ev_packet.ev_code == hci.HCI_EV_CODE_LE_META_EVENT:
@@ -557,7 +593,7 @@ class HCI_Commands():
self.hci_recv_ev_packet.current_event = \
self.parse_num_comp_pkts(self.hci_recv_ev_packet.recv_data)
hci.events_list.append((hci.HCI_EV_NUM_COMP_PKTS,
- self.hci_recv_ev_packet.current_event))
+ self.hci_recv_ev_packet.current_event))
return hci.HCI_EV_NUM_COMP_PKTS
else:
@@ -567,31 +603,41 @@ class HCI_Commands():
event_code = self.parse_event(buffer)
curr_ev = self.hci_recv_ev_packet.current_event
if event_code == hci.HCI_EV_CODE_DISCONN_CMP:
- logging.debug("Received code: %s - HCI_EV_CODE_DISCONN_CMP", event_code)
- logging.debug("Status: %s for event: %s - HCI_EV_CODE_DISCONN_CMP",
- curr_ev.status, self.hci_recv_ev_packet.current_event)
+ logging.debug("Received code: %s - HCI_EV_CODE_DISCONN_CMP",
+ event_code)
+ logging.debug(
+ "Status: %s for event: %s - HCI_EV_CODE_DISCONN_CMP",
+ curr_ev.status,
+ self.hci_recv_ev_packet.current_event)
if curr_ev.reason == hci.CONN_FAILED_TO_BE_ESTABLISHED:
- logging.error(f"Connection failed to be established. Exiting...")
- raise Exception("Connection failed to be established. Exiting...")
+ logging.error(
+ f"Connection failed to be established. Exiting...")
+ raise Exception(
+ "Connection failed to be established. Exiting...")
if curr_ev.reason == hci.CONN_TIMEOUT:
logging.error(f"Connection timeout. Exiting...")
raise Exception("Connection timeout. Exiting...")
elif event_code == hci.HCI_EV_CODE_CMD_CMP:
- logging.debug("Received code: %s - HCI_EV_CODE_CMD_CMP", event_code)
+ logging.debug("Received code: %s - HCI_EV_CODE_CMD_CMP",
+ event_code)
sent_opcode = self.hci_send_cmd.opcode
recv_opcode = curr_ev.opcode
if sent_opcode == recv_opcode:
status = self.process_returned_parameters()
if status != 0:
- logging.error("Status: %s for event: %s - HCI_EV_CODE_CMD_CMP", status, curr_ev)
+ logging.error(
+ "Status: %s for event: %s - HCI_EV_CODE_CMD_CMP",
+ status,
+ curr_ev)
self.async_ev_cmd_end.set()
elif event_code == hci.HCI_EV_CODE_CMD_STATUS:
- logging.debug("Received code: %s - HCI_EV_CODE_CMD_STATUS", event_code)
+ logging.debug(
+ "Received code: %s - HCI_EV_CODE_CMD_STATUS", event_code)
sent_opcode = self.hci_send_cmd.opcode
recv_opcode = curr_ev.opcode
@@ -602,32 +648,42 @@ class HCI_Commands():
self.async_ev_cmd_end.set()
elif event_code == hci.HCI_EV_CODE_LE_META_EVENT:
- logging.debug("Received code: %s - HCI_EV_CODE_LE_META_EVENT", event_code)
+ logging.debug(
+ "Received code: %s - HCI_EV_CODE_LE_META_EVENT", event_code)
subev_code = self.parse_subevent(curr_ev.subevent_code)
if subev_code == hci.HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP:
- logging.debug("Received subev code: %s - HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP", subev_code)
+ logging.debug(
+ "Received subev code: %s - HCI_SUBEV_CODE_LE_ENHANCED_CONN_CMP",
+ subev_code)
hci.conn_handle = self.hci_recv_ev_packet.current_event.connection_handle
if self.async_ev_connected.is_set() == False:
logging.info("Connection established. Event received.")
self.async_ev_connected.set()
elif subev_code == hci.HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE:
- logging.debug("Received subev code: %s - HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE", subev_code)
+ logging.debug(
+ "Received subev code: %s - HCI_SUBEV_CODE_LE_DATA_LEN_CHANGE",
+ subev_code)
self.async_ev_set_data_len.set()
elif subev_code == hci.HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP:
- logging.debug("Received subev code: %s - HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP", subev_code)
+ logging.debug(
+ "Received subev code: %s - HCI_SUBEV_CODE_LE_PHY_UPDATE_CMP",
+ subev_code)
self.async_ev_update_phy.set()
elif subev_code == hci.HCI_SUBEV_CODE_LE_CHAN_SEL_ALG:
- logging.debug("Received subev code: %s - HCI_SUBEV_CODE_LE_CHAN_SEL_ALG", subev_code)
+ logging.debug(
+ "Received subev code: %s - HCI_SUBEV_CODE_LE_CHAN_SEL_ALG",
+ subev_code)
elif subev_code < 0:
logging.warning(f"Unknown received subevent: {buffer}\n")
elif event_code == hci.HCI_EV_NUM_COMP_PKTS:
- logging.debug("Received code: %s - HCI_EV_NUM_COMP_PKTS", event_code)
+ logging.debug(
+ "Received code: %s - HCI_EV_NUM_COMP_PKTS", event_code)
async with self.async_lock_packets_cnt:
hci.num_of_completed_packets_cnt += curr_ev.num_completed_packets
hci.num_of_completed_packets_time = time.perf_counter()
@@ -637,7 +693,8 @@ class HCI_Commands():
logging.warning(f"Unknown received event: {buffer}\n")
else:
- logging.debug("%s \t%s ", self.handle_event.__name__, self.hci_recv_ev_packet)
+ logging.debug("%s \t%s ", self.handle_event.__name__,
+ self.hci_recv_ev_packet)
def match_recv_l2cap_data(self, buffer: bytes, timestamp: int):
self.expected_recv_data += self.tp.predef_packet_key
diff --git a/tools/hci_throughput/hci_device.py b/tools/hci_throughput/hci_device.py
index 4950bf27..dacb391c 100644
--- a/tools/hci_throughput/hci_device.py
+++ b/tools/hci_throughput/hci_device.py
@@ -35,11 +35,13 @@ show_tp_plots = False
test_dir = None
transport_directory = None
+
class ParentCalledException(KeyboardInterrupt):
""" This exception is raised when e.g. parent process sends signal.
This allows to terminate processes correctly. """
pass
+
def parse_arguments():
parser = argparse.ArgumentParser(
description='HCI device with User Channel Socket',
@@ -65,10 +67,12 @@ def parse_arguments():
help='device own address type, public e.g.: -oat 0')
parser.add_argument('-di', '--dev_idx', type=str, nargs="?",
help='device own hci index, hci0 e.g.: -ohi 0')
- parser.add_argument('-pa', '--peer_addr', type=str, nargs="?",
- help='peer device address, e.g.: -pa 00:00:00:00:00:00')
- parser.add_argument('-pat', '--peer_addr_type', type=int, nargs="?",
- help='peer device own address type, public e.g.: -pat 0')
+ parser.add_argument(
+ '-pa', '--peer_addr', type=str, nargs="?",
+ help='peer device address, e.g.: -pa 00:00:00:00:00:00')
+ parser.add_argument(
+ '-pat', '--peer_addr_type', type=int, nargs="?",
+ help='peer device own address type, public e.g.: -pat 0')
parser.add_argument('-pdi', '--peer_dev_idx', type=str, nargs="?",
help='peer device index, e.g. hci0: -phi 0')
parser.add_argument('-cf', '--config_file', type=str, nargs="?",
@@ -83,8 +87,9 @@ def parse_arguments():
logging.error(traceback.format_exc())
sys.exit()
+
async def set_phy(bt_dev: hci_commands.HCI_Commands, conn_handle, cfg_phy,
- supported_features):
+ supported_features):
def error(info):
print("ERROR: Check log files")
raise Exception(info, ": Unsupported PHY. Closing...")
@@ -113,6 +118,7 @@ async def set_phy(bt_dev: hci_commands.HCI_Commands, conn_handle, cfg_phy,
else:
error("Possible PHY in config.yaml: 1M, 2M, Coded")
+
async def init(bt_dev: hci_commands.HCI_Commands, ini: dict, cfg: dict):
""" init: Assumed to be the same for all devices """
asyncio.create_task(bt_dev.rx_buffer_q_wait())
@@ -130,7 +136,7 @@ async def finish(bt_dev: hci_commands.HCI_Commands, cfg: dict):
logging.info("Received %s good packets", bt_dev.valid_recv_data)
if bt_dev.tp:
if show_tp_plots:
- bt_dev.tp.plot_tp_from_file(sample_time = cfg["tp"]["sample_time"])
+ bt_dev.tp.plot_tp_from_file(sample_time=cfg["tp"]["sample_time"])
if bt_dev.device_mode == "rx":
bt_dev.tp.save_average()
util.copy_log_files_to_test_directory(test_dir)
@@ -156,14 +162,14 @@ async def async_main_rx(bt_dev: hci_commands.HCI_Commands, ini: dict, cfg: dict)
############
adv_params = hci.HCI_Advertising()
adv_params.set(
- advertising_interval_min = cfg["adv"]["advertising_interval_min"],
- advertising_interval_max = cfg["adv"]["advertising_interval_max"],
- advertising_type = cfg["adv"]["advertising_type"],
- own_address_type = ini["own_address_type"],
- peer_address_type = ini["peer_address_type"],
- peer_address = cfg["adv"]["peer_address"],
- advertising_channel_map = cfg["adv"]["advertising_channel_map"],
- advertising_filter_policy = cfg["adv"]["advertising_filter_policy"]
+ advertising_interval_min=cfg["adv"]["advertising_interval_min"],
+ advertising_interval_max=cfg["adv"]["advertising_interval_max"],
+ advertising_type=cfg["adv"]["advertising_type"],
+ own_address_type=ini["own_address_type"],
+ peer_address_type=ini["peer_address_type"],
+ peer_address=cfg["adv"]["peer_address"],
+ advertising_channel_map=cfg["adv"]["advertising_channel_map"],
+ advertising_filter_policy=cfg["adv"]["advertising_filter_policy"]
)
await bt_dev.cmd_le_set_advertising_params(adv_params)
await bt_dev.cmd_le_set_advertising_enable(1)
@@ -173,7 +179,6 @@ async def async_main_rx(bt_dev: hci_commands.HCI_Commands, ini: dict, cfg: dict)
await bt_dev.cmd_le_set_data_len(hci.conn_handle, tx_octets=0, tx_time=0)
await hci_commands.wait_for_event(bt_dev.async_ev_set_data_len, hci.WAIT_FOR_EVENT_TIMEOUT)
-
logging.debug("Before finish event")
await asyncio.shield(bt_dev.async_ev_recv_data_finish.wait())
logging.debug("after finish event")
@@ -183,23 +188,24 @@ async def async_main_rx(bt_dev: hci_commands.HCI_Commands, ini: dict, cfg: dict)
await finish(bt_dev, cfg)
+
async def async_main_tx(bt_dev: hci_commands.HCI_Commands, ini: dict, cfg: dict):
await init(bt_dev, ini, cfg)
conn_params = hci.HCI_Connect()
conn_params.set(
- le_scan_interval = cfg["conn"]["le_scan_interval"],
- le_scan_window = cfg["conn"]["le_scan_window"],
- initiator_filter_policy = cfg["conn"]["initiator_filter_policy"],
- peer_address_type = ini['peer_address_type'],
- peer_address = ini['peer_address'],
- own_address_type = ini['own_address_type'],
- connection_interval_min = cfg["conn"]["connection_interval_min"],
- connection_interval_max = cfg["conn"]["connection_interval_max"],
- max_latency = cfg["conn"]["max_latency"],
- supervision_timeout = cfg["conn"]["supervision_timeout"],
- min_ce_length = cfg["conn"]["min_ce_length"],
- max_ce_length = cfg["conn"]["max_ce_length"]
+ le_scan_interval=cfg["conn"]["le_scan_interval"],
+ le_scan_window=cfg["conn"]["le_scan_window"],
+ initiator_filter_policy=cfg["conn"]["initiator_filter_policy"],
+ peer_address_type=ini['peer_address_type'],
+ peer_address=ini['peer_address'],
+ own_address_type=ini['own_address_type'],
+ connection_interval_min=cfg["conn"]["connection_interval_min"],
+ connection_interval_max=cfg["conn"]["connection_interval_max"],
+ max_latency=cfg["conn"]["max_latency"],
+ supervision_timeout=cfg["conn"]["supervision_timeout"],
+ min_ce_length=cfg["conn"]["min_ce_length"],
+ max_ce_length=cfg["conn"]["max_ce_length"]
)
await bt_dev.cmd_le_create_connection(conn_params)
@@ -239,10 +245,11 @@ async def async_main_tx(bt_dev: hci_commands.HCI_Commands, ini: dict, cfg: dict)
while sent_packets < hci.num_of_packets_to_send:
if packet_credits > 0 and packets_to_send > 0:
- data, last_value = tp.gen_data(hci.num_of_bytes_to_send, last_value)
+ data, last_value = tp.gen_data(
+ hci.num_of_bytes_to_send, last_value)
l2cap_data.set(channel_id=0x0044, data=data)
- acl_data.set(connection_handle=hci.conn_handle, pb_flag=0b00, bc_flag=0b00,
- data=l2cap_data.ba_full_message)
+ acl_data.set(connection_handle=hci.conn_handle, pb_flag=0b00,
+ bc_flag=0b00, data=l2cap_data.ba_full_message)
await bt_dev.acl_data_send(acl_data)
async with bt_dev.async_lock_packets_cnt:
packets_to_send -= 1
@@ -262,7 +269,6 @@ async def async_main_tx(bt_dev: hci_commands.HCI_Commands, ini: dict, cfg: dict)
packet_credits += hci.num_of_completed_packets_cnt
hci.num_of_completed_packets_cnt = 0
-
for timestamp in tx_sent_timestamps:
bt_dev.tp.append_to_csv_file(*timestamp)
@@ -301,7 +307,8 @@ def parse_cfg_files(args) -> dict:
def signal_handler(signum, frame):
logging.critical(f"Received signal: {signal.Signals(signum).name}")
- raise ParentCalledException(f"Received signal: {signal.Signals(signum).name}")
+ raise ParentCalledException(
+ f"Received signal: {signal.Signals(signum).name}")
def main():
@@ -316,10 +323,10 @@ def main():
loop = asyncio.get_event_loop()
loop.set_debug(True)
- transport = transport_factory.TransportFactory(device_index=ini['dev_index'],
- device_mode=args.mode,
- asyncio_loop=loop,
- transport_directory=transport_directory)
+ transport = transport_factory.TransportFactory(
+ device_index=ini['dev_index'],
+ device_mode=args.mode, asyncio_loop=loop,
+ transport_directory=transport_directory)
signal.signal(signal.SIGTERM, signal_handler)
@@ -335,14 +342,13 @@ def main():
elif args.mode == 'tx':
loop.run_until_complete(async_main_tx(bt_dev, ini, cfg))
-
except Exception as e:
logging.error(traceback.format_exc())
except (KeyboardInterrupt or ParentCalledException):
logging.critical("Hard exit triggered.")
logging.error(traceback.format_exc())
finally:
- if transport != None:
+ if transport is not None:
transport.stop()
sys.exit()
diff --git a/tools/hci_throughput/hci_socket.py b/tools/hci_throughput/hci_socket.py
index 41c95eee..da908b8f 100644
--- a/tools/hci_throughput/hci_socket.py
+++ b/tools/hci_throughput/hci_socket.py
@@ -53,7 +53,10 @@ class HCI_User_Channel_Socket_Error(BaseException):
class HCI_User_Channel_Socket():
def __init__(self, device_index=0, device_mode=None,
asyncio_loop=None):
- logging.debug("Device index: %s, Device address: %s", device_index, device_mode)
+ logging.debug(
+ "Device index: %s, Device address: %s",
+ device_index,
+ device_mode)
self.loop = asyncio_loop
self.libc = ctypes.cdll.LoadLibrary('libc.so.6')
self.rx_buffer_q = multiprocessing.Manager().Queue()
@@ -71,22 +74,28 @@ class HCI_User_Channel_Socket():
new_socket = socket.socket(socket.AF_BLUETOOTH,
socket.SOCK_RAW | socket.SOCK_NONBLOCK,
socket.BTPROTO_HCI)
- if new_socket == None:
+ if new_socket is None:
raise HCI_User_Channel_Socket_Error("Socket error. \
Opening socket failed")
new_socket.setblocking(False)
- socket_size = new_socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
+ socket_size = new_socket.getsockopt(
+ socket.SOL_SOCKET, socket.SO_RCVBUF)
logging.info(f"Default socket recv buffer size: {socket_size}")
new_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 500000)
- socket_size = new_socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
+ socket_size = new_socket.getsockopt(
+ socket.SOL_SOCKET, socket.SO_RCVBUF)
logging.info(f"Set socket recv buffer size: {socket_size}")
return new_socket
def socket_bind(self, index):
logging.debug("%s index: %s", self.socket_bind.__name__, index)
# addr: struct sockaddr_hci from /usr/include/bluetooth/hci.h
- addr = struct.pack('HHH', hci.AF_BLUETOOTH, index, hci.HCI_CHANNEL_USER)
- retry_binding=2
+ addr = struct.pack(
+ 'HHH',
+ hci.AF_BLUETOOTH,
+ index,
+ hci.HCI_CHANNEL_USER)
+ retry_binding = 2
for i in range(retry_binding):
try:
bind = self.libc.bind(self.hci_socket.fileno(),
@@ -118,8 +127,8 @@ class HCI_User_Channel_Socket():
cnt += len(buff)
logging.debug(f"Read from buffer {cnt} bytes")
except BlockingIOError:
- logging.info("Buffer empty and ready!")
- return
+ logging.info("Buffer empty and ready!")
+ return
async def send(self, ba_message):
await self.loop.sock_sendall(self.hci_socket, ba_message)
@@ -132,10 +141,11 @@ class HCI_User_Channel_Socket():
logging.info("listener_ev set")
break
buffer = self.hci_socket.recv(SOCKET_RECV_BUFFER_SIZE)
- logging.info(f"Socket recv: {self.counter} th packet with len: {len(buffer)}")
+ logging.info(
+ f"Socket recv: {self.counter} th packet with len: {len(buffer)}")
self.rx_buffer_q.put((buffer, time.perf_counter()))
- recv_at_once +=1
- self.counter +=1
+ recv_at_once += 1
+ self.counter += 1
except BlockingIOError:
if recv_at_once > 1:
@@ -151,8 +161,8 @@ class HCI_User_Channel_Socket():
return self.hci_socket.close()
def start(self):
- self.listener_proc = multiprocessing.Process(target=self.socket_listener,
- daemon=True)
+ self.listener_proc = multiprocessing.Process(
+ target=self.socket_listener, daemon=True)
self.listener_proc.start()
logging.info(f"start listener_proc pid: {self.listener_proc.pid}")
diff --git a/tools/hci_throughput/main.py b/tools/hci_throughput/main.py
index 047f3008..b00780a0 100644
--- a/tools/hci_throughput/main.py
+++ b/tools/hci_throughput/main.py
@@ -30,7 +30,8 @@ import util
import os
import math
-PROCESS_TIMEOUT = 500 # seconds, adjust if necessary
+PROCESS_TIMEOUT = 500 # seconds, adjust if necessary
+
def parse_arguments():
parser = argparse.ArgumentParser(
@@ -97,7 +98,7 @@ def change_config_var(filename: str, group: str, variable: str,
def get_init_dict(filename: str, args_list: list, modes: list, dir: str,
transport_directory: str):
ini = {
- modes[0]:{
+ modes[0]: {
"dev_index": args_list[0][0],
"own_address_type": args_list[0][1],
"own_address": args_list[0][2],
@@ -105,7 +106,7 @@ def get_init_dict(filename: str, args_list: list, modes: list, dir: str,
"peer_address_type": args_list[1][1],
"peer_address": args_list[1][2]
},
- modes[1]:{
+ modes[1]: {
"dev_index": args_list[1][0],
"own_address_type": args_list[1][1],
"own_address": args_list[1][2],
@@ -209,7 +210,7 @@ def main():
init_file = "init.yaml"
cfg_file = args.config_file[0]
- if (type(args.transport_directory) == list):
+ if (isinstance(args.transport_directory, list)):
args.transport_directory = args.transport_directory.pop()
else:
args.transport_directory = args.transport_directory
@@ -224,11 +225,11 @@ def main():
test_dir_path = util.create_test_directory()
init_dict = get_init_dict(filename=init_file, args_list=addr_list,
- modes=args.modes, dir=test_dir_path,
- transport_directory=args.transport_directory)
+ modes=args.modes, dir=test_dir_path,
+ transport_directory=args.transport_directory)
util.copy_config_files_to_test_directory([init_file, cfg_file],
- init_dict["test_dir"])
+ init_dict["test_dir"])
try:
if cfg["flag_testing"]:
diff --git a/tools/hci_throughput/throughput.py b/tools/hci_throughput/throughput.py
index dcc242d2..e2b78b72 100644
--- a/tools/hci_throughput/throughput.py
+++ b/tools/hci_throughput/throughput.py
@@ -24,6 +24,9 @@ import struct
import argparse
import traceback
+data_types = ['kb', 'kB']
+
+
def parse_arguments():
parser = argparse.ArgumentParser(
description='Plot throughput from the csv file.',
@@ -40,19 +43,18 @@ def parse_arguments():
print(traceback.format_exc())
return args
-data_types = ['kb', 'kB']
-
-def gen_data(num_of_bytes_in_packet: int, last_number_from_previous_data_packet: int):
+def gen_data(num_of_bytes_in_packet: int,
+ last_number_from_previous_data_packet: int):
counter = last_number_from_previous_data_packet + 1
rem = num_of_bytes_in_packet % 4
valid_data_len = int((num_of_bytes_in_packet - rem) / 4)
total_data_len = valid_data_len + rem
data = [0] * total_data_len
- for i in range(rem,total_data_len):
+ for i in range(rem, total_data_len):
data[i] = counter
counter += 1
- last_value = data[len(data)-1]
+ last_value = data[len(data) - 1]
if rem:
fmt = "<" + str(rem) + "B" + str(valid_data_len) + "I"
else:
@@ -62,13 +64,22 @@ def gen_data(num_of_bytes_in_packet: int, last_number_from_previous_data_packet:
class Throughput():
- def __init__(self, name="tp_chart", mode="rx", total_packets_number=0, bytes_number_in_packet=0,
- throughput_data_type='kb', flag_plot_packets=True, sample_time=1, test_directory=None):
+ def __init__(
+ self,
+ name="tp_chart",
+ mode="rx",
+ total_packets_number=0,
+ bytes_number_in_packet=0,
+ throughput_data_type='kb',
+ flag_plot_packets=True,
+ sample_time=1,
+ test_directory=None):
self.name = name
self.mode = mode
self.total_packets_number = total_packets_number
self.bytes_number_in_packet = bytes_number_in_packet
- self.predef_packet_key = int((bytes_number_in_packet - (bytes_number_in_packet % 4))/4)
+ self.predef_packet_key = int(
+ (bytes_number_in_packet - (bytes_number_in_packet % 4)) / 4)
self.total_bits_number = bytes_number_in_packet * 8
assert throughput_data_type in data_types
self.throughput_data_type = throughput_data_type
@@ -77,38 +88,45 @@ class Throughput():
self.test_directory = test_directory
if self.test_directory is not None:
- self.csv_file_name = self.test_directory + "/" + time.strftime("%Y_%m_%d_%H_%M_%S_") + self.name + ".csv"
+ self.csv_file_name = self.test_directory + "/" + \
+ time.strftime("%Y_%m_%d_%H_%M_%S_") + self.name + ".csv"
else:
- self.csv_file_name = time.strftime("%Y_%m_%d_%H_%M_%S_") + self.name + ".csv"
+ self.csv_file_name = time.strftime(
+ "%Y_%m_%d_%H_%M_%S_") + self.name + ".csv"
self.clean_csv_file()
def calc_throughput(self, current_num, last_num, current_time, last_time):
if self.throughput_data_type == 'kb':
- return float((((current_num - last_num) * \
- self.total_bits_number) / (current_time-last_time))/1000)
+ return float(
+ (((current_num - last_num) * self.total_bits_number) /
+ (current_time - last_time)) / 1000)
elif self.throughput_data_type == 'kB':
- return float((((current_num - last_num) * \
- self.bytes_number_in_packet) / (current_time-last_time))/1000)
+ return float(
+ (((current_num - last_num) * self.bytes_number_in_packet) /
+ (current_time - last_time)) / 1000)
def clean_csv_file(self):
- file = open(self.csv_file_name, 'w')
- file.write("Time,Packet\n")
+ file = open(self.csv_file_name, 'w')
+ file.write("Time,Packet\n")
- def append_to_csv_file(self, timestamp: float = 0.0, packet_number: int = 0):
+ def append_to_csv_file(
+ self,
+ timestamp: float = 0.0,
+ packet_number: int = 0):
with open(self.csv_file_name, "a") as file:
csv_writer = csv.writer(file)
csv_writer.writerow([timestamp, packet_number])
def get_average(self, packet_numbers, timestamps):
if self.throughput_data_type == 'kb':
- average_tp = ((packet_numbers * self.total_bits_number) \
- / (timestamps[-1] - timestamps[0]))/1000
+ average_tp = ((packet_numbers * self.total_bits_number)
+ / (timestamps[-1] - timestamps[0])) / 1000
elif self.throughput_data_type == 'kB':
- average_tp = ((packet_numbers * self.bytes_number_in_packet) \
- / (timestamps[-1] - timestamps[0]))/1000
+ average_tp = ((packet_numbers * self.bytes_number_in_packet)
+ / (timestamps[-1] - timestamps[0])) / 1000
return average_tp
- def save_average(self, tp_csv_filename = None):
+ def save_average(self, tp_csv_filename=None):
if self.mode == "rx":
timestamps = []
packet_numbers = []
@@ -126,14 +144,15 @@ class Throughput():
packet_numbers.append(float(row[1]))
average_tp = self.get_average(packet_numbers[-1], timestamps)
- print(f"Average rx throughput: {round(average_tp, 3)} {self.throughput_data_type}ps")
+ print(
+ f"Average rx throughput: {round(average_tp, 3)} {self.throughput_data_type}ps")
with open(self.test_directory + "/average_rx_tp.csv", "a") as file:
csv_writer = csv.writer(file)
csv_writer.writerow([average_tp])
def plot_tp_from_file(self, filename: str = None, sample_time: float = 1,
- save_to_file: bool = True):
+ save_to_file: bool = True):
timestamps = []
packet_numbers = []
@@ -158,7 +177,7 @@ class Throughput():
if timestamps[i] - last_time > sample_time:
throughput.append((timestamps[i],
self.calc_throughput(packet_numbers[i],
- last_number,
+ last_number,
timestamps[i],
last_time)))
last_time = timestamps[i]
@@ -194,4 +213,4 @@ class Throughput():
if __name__ == "__main__":
args = parse_arguments()
tp = Throughput(bytes_number_in_packet=247)
- tp.plot_tp_from_file(*args.file, args.samp_t[0], save_to_file=False)
\ No newline at end of file
+ tp.plot_tp_from_file(*args.file, args.samp_t[0], save_to_file=False)
diff --git a/tools/hci_throughput/transport_factory.py b/tools/hci_throughput/transport_factory.py
index 7555c320..9b5f07aa 100644
--- a/tools/hci_throughput/transport_factory.py
+++ b/tools/hci_throughput/transport_factory.py
@@ -27,9 +27,8 @@ class TransportFactory:
def __init__(self, device_index: str = None, device_mode=None,
asyncio_loop=None, transport_directory=None) -> None:
if (device_index.isnumeric()):
- self.transport = hci_socket.HCI_User_Channel_Socket(int(device_index),
- device_mode,
- asyncio_loop)
+ self.transport = hci_socket.HCI_User_Channel_Socket(
+ int(device_index), device_mode, asyncio_loop)
else:
try:
if (transport_directory != "default"):
diff --git a/tools/hci_throughput/util.py b/tools/hci_throughput/util.py
index 6aeed607..472057b1 100644
--- a/tools/hci_throughput/util.py
+++ b/tools/hci_throughput/util.py
@@ -22,6 +22,7 @@ import shutil
import time
import os
+
def create_test_directory():
test_dir_name = "tests/" + time.strftime("%Y_%m_%d_%H_%M_%S")
path = os.path.join(os.getcwd(), test_dir_name)
@@ -32,8 +33,8 @@ def create_test_directory():
def configure_logging(log_filename, clear_log_file=True):
format_template = ("%(asctime)s %(threadName)s %(name)s %(levelname)s "
- "%(filename)-25s %(lineno)-5s "
- "%(funcName)-25s : %(message)s")
+ "%(filename)-25s %(lineno)-5s "
+ "%(funcName)-25s : %(message)s")
logging.basicConfig(format=format_template,
filename=log_filename,
filemode='a',
@@ -53,10 +54,10 @@ def copy_config_files_to_test_directory(files: list, test_directory: str):
def copy_log_files_to_test_directory(dir: str):
- log_files = ["log/log_rx.log", "log/log_tx.log", "log/check_addr.log"]
- for file in log_files:
- shutil.copy(file, dir + "/" + time.strftime("%Y_%m_%d_%H_%M_%S_") +
- file.replace("log/", ""))
+ log_files = ["log/log_rx.log", "log/log_tx.log", "log/check_addr.log"]
+ for file in log_files:
+ shutil.copy(file, dir + "/" + time.strftime("%Y_%m_%d_%H_%M_%S_") +
+ file.replace("log/", ""))
# Running tests as sudo implies root permissions on created directories/files.