You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nuttx.apache.org by ac...@apache.org on 2022/07/23 19:58:08 UTC
[incubator-nuttx-apps] 05/20: system/uorb: listener, has 'top' command.
This is an automated email from the ASF dual-hosted git repository.
acassis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nuttx-apps.git
commit 9f6d3221867b5e81613f8461b16b06be44336302
Author: jihandong <ji...@xiaomi.com>
AuthorDate: Fri Mar 25 21:01:20 2022 +0800
system/uorb: listener, has 'top' command.
Listener can scan device under ORB_SENSOR_PATH, subscribe them and
cotinue printing messages by call their cb.
'-T' can Continuously print updating objects, '-l' means only
print once.
Signed-off-by: jihandong <ji...@xiaomi.com>
Signed-off-by: Jiuzhu Dong <do...@xiaomi.com>
---
system/uorb/Kconfig | 28 +-
system/uorb/Makefile | 9 +-
system/uorb/listener.c | 735 ++++++++++++++++++++++++++++++++++++++++++++++++
system/uorb/uORB/uORB.c | 3 +-
system/uorb/uORB/uORB.h | 5 +-
5 files changed, 762 insertions(+), 18 deletions(-)
diff --git a/system/uorb/Kconfig b/system/uorb/Kconfig
index 028e2f2de..6345f77b1 100644
--- a/system/uorb/Kconfig
+++ b/system/uorb/Kconfig
@@ -18,6 +18,22 @@ config UORB_STACKSIZE
int "stack size"
default DEFAULT_TASK_STACKSIZE
+config UORB_LISTENER
+ bool "uorb listener"
+ default n
+
+config UORB_TESTS
+ bool "uorb unit tests"
+ default n
+
+if UORB_TESTS
+
+config UORB_SRORAGE_DIR
+ string "uorb test result storage dir"
+ default "/data/"
+
+endif # UORB_TESTS
+
config DEBUG_UORB
bool "uorb debug output"
default n
@@ -46,16 +62,4 @@ config UORB_INFO
endif # DEBUG_UORB
-config UORB_TESTS
- bool "uorb unit tests"
- default n
-
-if UORB_TESTS
-
-config UORB_SRORAGE_DIR
- string "uorb test result storage dir"
- default "/data/"
-
-endif # UORB_TESTS
-
endif # UORB
diff --git a/system/uorb/Makefile b/system/uorb/Makefile
index feb09b7da..6f902463d 100644
--- a/system/uorb/Makefile
+++ b/system/uorb/Makefile
@@ -23,10 +23,15 @@ include $(APPDIR)/Make.defs
CSRCS += uORB/uORB.c
CSRCS += $(wildcard sensor/*.c)
+ifneq ($(CONFIG_UORB_LISTENER),)
+MAINSRC += listener.c
+PROGNAME += uorb_listener
+endif
+
ifneq ($(CONFIG_UORB_TESTS),)
CSRCS += test/utility.c
-MAINSRC = test/unit_test.c
-PROGNAME = uorb_unit_test
+MAINSRC += test/unit_test.c
+PROGNAME += uorb_unit_test
endif
PRIORITY = $(CONFIG_UORB_PRIORITY)
diff --git a/system/uorb/listener.c b/system/uorb/listener.c
new file mode 100644
index 000000000..7b3a083ec
--- /dev/null
+++ b/system/uorb/listener.c
@@ -0,0 +1,735 @@
+/****************************************************************************
+ * apps/system/uorb/listener.c
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The
+ * ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ *
+ ****************************************************************************/
+
+/****************************************************************************
+ * Included Files
+ ****************************************************************************/
+
+#include <nuttx/list.h>
+
+#include <errno.h>
+#include <dirent.h>
+#include <poll.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <unistd.h>
+
+#include <uORB/uORB.h>
+
+/****************************************************************************
+ * Pre-processor Definitions
+ ****************************************************************************/
+
+#define ORB_MAX_PRINT_NAME 32
+#define ORB_TOP_WAIT_TIME 1000
+
+/****************************************************************************
+ * Private Types
+ ****************************************************************************/
+
+struct listen_object_s
+{
+ struct list_node node; /* Node of object info list */
+ struct orb_object object; /* Object id */
+ orb_abstime timestamp; /* Time of lastest generation */
+ unsigned long generation; /* Latest generation */
+};
+
+/****************************************************************************
+ * Private Function Prototypes
+ ****************************************************************************/
+
+static int listener_get_state(FAR struct orb_object *object,
+ FAR struct orb_state *state);
+static int listener_add_object(FAR struct list_node *objlist,
+ FAR struct orb_object *object);
+static void listener_delete_object_list(FAR struct list_node *objlist);
+static int listener_generate_object_list(FAR struct list_node *objlist,
+ FAR const char *filter);
+static int listener_print(FAR const struct orb_metadata *meta, int fd);
+static void listener_monitor(FAR struct list_node *objlist, int nb_objects,
+ int topic_rate, int nb_msgs, int timeout);
+static int listener_update(FAR struct list_node *objlist,
+ FAR struct orb_object *object);
+static void listener_top(FAR struct list_node *objlist,
+ FAR const char *filter,
+ bool only_once);
+
+/****************************************************************************
+ * Private Data
+ ****************************************************************************/
+
+static bool g_should_exit = false;
+
+/****************************************************************************
+ * Private Functions
+ ****************************************************************************/
+
+static void usage(void)
+{
+ uorbinfo_raw("\n\
+Utility to listen on uORB topics and print the data to the console.\n\
+\n\
+The listener can be exited any time by pressing Ctrl+C, Esc, or Q.\n\
+\n\
+We use cmd 'uorb_listener <t1,t2,...> -n 1' to print once snapshot for\n\
+specify topics if command exists or all the topic objects when command\n\
+doesn't exists. If topic isn't updated, the snapshot for this topic is\n\
+empty.\n\
+\n\
+We use cmd 'uorb_listener t1,t2,... [arguments...]' to print the topics\n\
+specified. The following arguments apply to all topics.\n\
+\n\
+We use cmd 'uorb_listener ... -n num(>1) ...' to print all the topic\n\
+objects message until the number of received is equal to the specified\n\
+number.\n\
+\n\
+We can print the messages all the time when we do not specify the number\n\
+of message.\n\
+\n\
+listener <command> [arguments...]\n\
+ Commands:\n\
+\t<topics_name> Topic name. Multi name are separated by ','\n\
+\t[-h ] Listener commands help\n\
+\t[-n <val> ] Number of messages, default: 0\n\
+\t[-r <val> ] Subscription rate (unlimited if 0), default: 0\n\
+\t[-t <val> ] Time of listener, in seconds, default: 5\n\
+\t[-T ] Top, continuously print updating objects\n\
+\t[-l ] Top only execute once.\n\
+ ");
+}
+
+/****************************************************************************
+ * Name: listener_get_state
+ *
+ * Description:
+ * Get object's current state.
+ *
+ * Input Parameters:
+ * object Given object
+ * state Returned state.
+ *
+ * Returned Value:
+ * 0 on success, otherwise negative errno.
+ ****************************************************************************/
+
+static int listener_get_state(FAR struct orb_object *object,
+ FAR struct orb_state *state)
+{
+ int ret;
+ int fd;
+
+ fd = orb_subscribe_multi(object->meta, object->instance);
+ if (fd < 0)
+ {
+ return fd;
+ }
+
+ ret = orb_get_state(fd, state);
+ orb_unsubscribe(fd);
+ if (ret < 0)
+ {
+ return ret;
+ }
+
+ state->nsubscribers--; /* Ignore temp subscriber */
+ return 0;
+}
+
+/****************************************************************************
+ * Name: listener_add_object
+ *
+ * Description:
+ * Alloc object node and add to list.
+ *
+ * Input Parameters:
+ * object Object to add.
+ * objlist List to modify.
+ *
+ * Returned Value:
+ * 0 on success, otherwise return negative errno.
+ ****************************************************************************/
+
+static int listener_add_object(FAR struct list_node *objlist,
+ FAR struct orb_object *object)
+{
+ FAR struct listen_object_s *tmp;
+ struct orb_state state;
+ int ret;
+
+ tmp = malloc(sizeof(struct listen_object_s));
+ if (tmp == NULL)
+ {
+ return -ENOMEM;
+ }
+
+ ret = listener_get_state(object, &state);
+ if (ret < 0)
+ {
+ free(tmp);
+ return ret;
+ }
+
+ tmp->object.meta = object->meta;
+ tmp->object.instance = object->instance;
+ tmp->timestamp = orb_absolute_time();
+ tmp->generation = state.generation;
+
+ list_add_tail(objlist, &tmp->node);
+
+ return 0;
+}
+
+/****************************************************************************
+ * Name: listener_update
+ *
+ * Description:
+ * Update object list, print imformation if given object has new data.
+ *
+ * Input Parameters:
+ * object Object to check state.
+ * objlist List to update.
+ *
+ * Returned Value:
+ * 0 on success.
+ ****************************************************************************/
+
+static int listener_update(FAR struct list_node *objlist,
+ FAR struct orb_object *object)
+{
+ FAR struct listen_object_s *old = NULL;
+ FAR struct listen_object_s *tmp;
+ int ret;
+
+ /* Check wether object already exist in old list */
+
+ list_for_every_entry(objlist, tmp, struct listen_object_s, node)
+ {
+ if (tmp->object.meta == object->meta &&
+ tmp->object.instance == object->instance)
+ {
+ old = tmp;
+ break;
+ }
+ }
+
+ if (old)
+ {
+ /* If object existed in old list, print and update. */
+
+ struct orb_state state;
+ orb_abstime now_time;
+ unsigned long delta_time;
+ unsigned long delta_generation;
+
+ now_time = orb_absolute_time();
+ ret = listener_get_state(object, &state);
+ if (ret < 0)
+ {
+ return ret;
+ }
+
+ delta_time = now_time - old->timestamp;
+ delta_generation = state.generation - old->generation;
+ if (delta_generation && delta_time)
+ {
+ unsigned long frequency;
+
+ frequency = delta_generation * 1000000 / delta_time;
+ uorbinfo_raw("\033[K" "%-*s %2u %4" PRIu32 " %4lu "
+ "%2" PRIu32 " %4u",
+ ORB_MAX_PRINT_NAME,
+ object->meta->o_name,
+ object->instance,
+ state.nsubscribers,
+ frequency,
+ state.queue_size,
+ object->meta->o_size);
+ old->generation = state.generation;
+ old->timestamp = now_time;
+ }
+ }
+ else
+ {
+ /* If object not existed in old list, alloc one */
+
+ ret = listener_add_object(objlist, object);
+ if (ret < 0)
+ {
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
+/****************************************************************************
+ * Name: listener_delete_object_list
+ *
+ * Description:
+ * free object list.
+ *
+ * Input Parameters:
+ * objlist List to free.
+ *
+ * Returned Value:
+ * None.
+ ****************************************************************************/
+
+static void listener_delete_object_list(FAR struct list_node *objlist)
+{
+ FAR struct listen_object_s *tmp;
+ FAR struct listen_object_s *next;
+
+ list_for_every_entry_safe(objlist, tmp, next, struct listen_object_s, node)
+ {
+ list_delete(&tmp->node);
+ free(tmp);
+ }
+
+ list_initialize(objlist);
+}
+
+/****************************************************************************
+ * Name: listener_generate_object_list
+ *
+ * Description:
+ * Update / alloc object list by scan ORB_SENSOR_PATH.
+ *
+ * Input Parameters:
+ * objlist List to update / alloc.
+ * filter Specified topic names.
+ *
+ * Returned Value:
+ * File number under ORB_SENSOR_PATH on success.
+ * Negative errno on failure.
+ ****************************************************************************/
+
+static int listener_generate_object_list(FAR struct list_node *objlist,
+ FAR const char *filter)
+{
+ FAR struct dirent *entry;
+ FAR DIR *dir;
+ int cnt = 0;
+ int ret;
+
+ /* Traverse all objects under ORB_SENSOR_PATH */
+
+ dir = opendir(ORB_SENSOR_PATH);
+ if (!dir)
+ {
+ return ERROR;
+ }
+
+ while ((entry = readdir(dir)))
+ {
+ struct orb_object object;
+ char file_name[PATH_MAX];
+ int len;
+
+ /* Get meta data and instance number through file name */
+
+ if (!strcmp(entry->d_name, ".") || !strcmp(entry->d_name, ".."))
+ {
+ continue;
+ }
+
+ strlcpy(file_name, entry->d_name, PATH_MAX);
+
+ len = strlen(file_name) - 1;
+ object.instance = file_name[len] - '0';
+ file_name[len] = 0;
+
+ if (filter)
+ {
+ /* Example:
+ * objects: aaa0, aaa1, aaa2, bbb0, bbb1
+ * filter: "aaa bbb1"
+ * Object list we get:
+ * aaa0, aaa1, aaa2, bbb1.
+ */
+
+ FAR const char *str = strstr(filter, file_name);
+ if (!str || (str[len] && str[len] != ',' &&
+ str[len] != object.instance + '0'))
+ {
+ continue;
+ }
+ }
+
+ object.meta = orb_get_meta(entry->d_name);
+ if (!object.meta)
+ {
+ continue;
+ }
+
+ /* Update object infomation to list. */
+
+ ret = listener_update(objlist, &object);
+ if (ret < 0)
+ {
+ uorbinfo_raw("listener %s failed", object.meta->o_name);
+ continue;
+ }
+
+ cnt++;
+ }
+
+ closedir(dir);
+ return cnt;
+}
+
+/****************************************************************************
+ * Name: listener_print
+ *
+ * Description:
+ * Print topic data by its print_message callback.
+ *
+ * Input Parameters:
+ * meta The uORB metadata.
+ * fd Subscriber handle.
+ *
+ * Returned Value:
+ * 0 on success copy, otherwise -1
+ ****************************************************************************/
+
+static int listener_print(FAR const struct orb_metadata *meta, int fd)
+{
+ char buffer[meta->o_size];
+ int ret;
+
+ ret = orb_copy(meta, fd, buffer);
+#ifdef CONFIG_DEBUG_UORB
+ if (ret == OK && meta->o_cb != NULL)
+ {
+ meta->o_cb(meta, buffer);
+ }
+#else
+ uorbinfo_raw("Enable debug uorb to print message");
+#endif
+
+ return ret;
+}
+
+/****************************************************************************
+ * Name: listener_monitor
+ *
+ * Description:
+ * Moniter objects by subscribe and print data.
+ *
+ * Input Parameters:
+ * objlist List of objects to subscribe.
+ * nb_objects Length of objects list.
+ * topic_rate Subscribe frequency.
+ * nb_msgs Subscribe amount of messages.
+ * timeout Maximum poll waiting time , ms.
+ *
+ * Returned Value:
+ * None
+ ****************************************************************************/
+
+static void listener_monitor(FAR struct list_node *objlist, int nb_objects,
+ int topic_rate, int nb_msgs, int timeout)
+{
+ struct pollfd fds[nb_objects];
+ int recv_msgs[nb_objects];
+ int nb_recv_msgs = 0;
+ int i = 0;
+
+ struct listen_object_s *tmp;
+
+ /* Prepare pollfd for all objects */
+
+ list_for_every_entry(objlist, tmp, struct listen_object_s, node)
+ {
+ int fd;
+
+ fd = orb_subscribe_multi(tmp->object.meta, tmp->object.instance);
+ if (fd < 0)
+ {
+ fds[i].fd = -1;
+ fds[i].events = 0;
+ continue;
+ }
+ else
+ {
+ fds[i].fd = fd;
+ fds[i].events = POLLIN;
+ }
+
+ if (nb_msgs == 1)
+ {
+ listener_print(tmp->object.meta, fd);
+ orb_unsubscribe(fd);
+ }
+ else if (topic_rate != 0)
+ {
+ orb_set_frequency(fd, topic_rate);
+ }
+
+ i++;
+ }
+
+ if (nb_msgs == 1)
+ {
+ return;
+ }
+
+ memset(recv_msgs, 0, sizeof(recv_msgs));
+
+ /* Loop poll and print recieved messages */
+
+ while ((!nb_msgs || nb_recv_msgs < nb_msgs) && !g_should_exit)
+ {
+ if (poll(&fds[0], nb_objects, timeout * 1000) > 0)
+ {
+ i = 0;
+ list_for_every_entry(objlist, tmp, struct listen_object_s, node)
+ {
+ if (fds[i].revents & POLLIN)
+ {
+ nb_recv_msgs++;
+ recv_msgs[i]++;
+ if (listener_print(tmp->object.meta, fds[i].fd) != 0)
+ {
+ uorberr("Listener callback failed");
+ }
+
+ if (nb_msgs && nb_recv_msgs >= nb_msgs)
+ {
+ break;
+ }
+ }
+
+ i++;
+ }
+ }
+ else if (errno != EINTR)
+ {
+ uorbinfo_raw("Waited for %d seconds without a message. "
+ "Giving up.", timeout);
+ break;
+ }
+ }
+
+ i = 0;
+ list_for_every_entry(objlist, tmp, struct listen_object_s, node)
+ {
+ if (fds[i].fd < 0)
+ {
+ uorbinfo_raw("Object name:%s%d, subscribe fail",
+ tmp->object.meta->o_name, tmp->object.instance);
+ }
+ else
+ {
+ orb_unsubscribe(fds[i].fd);
+ uorbinfo_raw("Object name:%s%d, recieved:%d",
+ tmp->object.meta->o_name, tmp->object.instance,
+ recv_msgs[i]);
+ }
+
+ i++;
+ }
+
+ uorbinfo_raw("Total number of received Message:%d/%d",
+ nb_recv_msgs, nb_msgs ? nb_msgs : nb_recv_msgs);
+}
+
+/****************************************************************************
+ * Name: listener_top
+ *
+ * Description:
+ * Continuously print updating objects, like the unix 'top' command.
+ * Exited when the user presses the enter key.
+ *
+ * Input Parameters:
+ * objlist List of objects.
+ * filter Specific topic names.
+ * only_once Print only once, then exit.
+ *
+ * Returned Value:
+ * None.
+ ****************************************************************************/
+
+static void listener_top(FAR struct list_node *objlist,
+ FAR const char *filter,
+ bool only_once)
+{
+ bool quit = false;
+ struct pollfd fds;
+
+ fds.fd = STDIN_FILENO;
+ fds.events = POLLIN;
+
+ uorbinfo_raw("\033[2J\n"); /* clear screen */
+
+ do
+ {
+ /* Wait a while, quit if user input some thing */
+
+ if (poll(&fds, 1, ORB_TOP_WAIT_TIME) > 0)
+ {
+ char c;
+
+ if (read(STDIN_FILENO, &c, 1) > 0)
+ {
+ quit = true;
+ break;
+ }
+ }
+
+ /* Then Update object list and print changes. */
+
+ if (!only_once)
+ {
+ uorbinfo_raw("\033[H"); /* move cursor to top left corner */
+ }
+
+ uorbinfo_raw("\033[K" "current objects: %i", list_length(objlist));
+ uorbinfo_raw("\033[K" "%-*s INST #SUB RATE #Q SIZE",
+ ORB_MAX_PRINT_NAME - 2, "NAME");
+
+ if (listener_generate_object_list(objlist, filter) < 0)
+ {
+ uorberr("Failed to update object list");
+ return;
+ }
+
+ if (!only_once)
+ {
+ uorbinfo_raw("\033[0J"); /* Clear the rest of the screen */
+ }
+ }
+ while (!quit && !only_once);
+}
+
+static void exit_handler(int signo)
+{
+ g_should_exit = true;
+}
+
+/****************************************************************************
+ * Public Functions
+ ****************************************************************************/
+
+int main(int argc, FAR char *argv[])
+{
+ struct list_node objlist;
+ FAR struct listen_object_s *tmp;
+ int topic_rate = 0;
+ int nb_msgs = 0;
+ int timeout = 5;
+ bool top = false;
+ bool only_once = false;
+ FAR char *filter = NULL;
+ int ret;
+ int ch;
+
+ g_should_exit = false;
+ if (signal(SIGINT, exit_handler) == SIG_ERR)
+ {
+ return 1;
+ }
+
+ /* Pasrse Argument */
+
+ while ((ch = getopt(argc, argv, "r:n:t:Tlh")) != EOF)
+ {
+ switch (ch)
+ {
+ case 'r':
+ topic_rate = strtol(optarg, NULL, 0);
+ if (topic_rate < 0)
+ {
+ goto error;
+ }
+ break;
+
+ case 'n':
+ nb_msgs = strtol(optarg, NULL, 0);
+ if (nb_msgs < 0)
+ {
+ goto error;
+ }
+ break;
+
+ case 't':
+ timeout = strtol(optarg, NULL, 0);
+ if (timeout < 0)
+ {
+ goto error;
+ }
+ break;
+
+ case 'T':
+ top = true;
+ break;
+
+ case 'l':
+ only_once = true;
+ break;
+
+ case 'h':
+ default:
+ goto error;
+ }
+ }
+
+ if (optind < argc)
+ {
+ filter = argv[optind];
+ }
+
+ /* Alloc list and exec command */
+
+ list_initialize(&objlist);
+ ret = listener_generate_object_list(&objlist, filter);
+ if (ret <= 0)
+ {
+ return 0;
+ }
+
+ if (top)
+ {
+ listener_top(&objlist, filter, only_once);
+ }
+ else
+ {
+ uorbinfo_raw("\nMointor objects num:%d", ret);
+ list_for_every_entry(&objlist, tmp, struct listen_object_s, node)
+ {
+ uorbinfo_raw("object_name:%s, object_instance:%d",
+ tmp->object.meta->o_name,
+ tmp->object.instance);
+ }
+
+ listener_monitor(&objlist, ret, topic_rate, nb_msgs, timeout);
+ }
+
+ listener_delete_object_list(&objlist);
+ return 0;
+
+error:
+ usage();
+ return 1;
+}
diff --git a/system/uorb/uORB/uORB.c b/system/uorb/uORB/uORB.c
index 7c58806d2..9b771e657 100644
--- a/system/uorb/uORB/uORB.c
+++ b/system/uorb/uORB/uORB.c
@@ -200,7 +200,8 @@ int orb_get_state(int fd, FAR struct orb_state *state)
1000000 / tmp.min_interval : 0;
state->min_batch_interval = tmp.min_latency;
state->queue_size = tmp.nbuffer;
- state->enable = tmp.nsubscribers > 0;
+ state->nsubscribers = tmp.nsubscribers;
+ state->generation = tmp.generation;
return ret;
}
diff --git a/system/uorb/uORB/uORB.h b/system/uorb/uORB/uORB.h
index 77e424319..1ba96e66d 100644
--- a/system/uorb/uORB/uORB.h
+++ b/system/uorb/uORB/uORB.h
@@ -60,9 +60,8 @@ struct orb_state
uint32_t queue_size; /* The maximum number of buffered elements,
* if 1, no queuing is is used
*/
- bool enable; /* Indicates whether the current node is
- * subscribed or activated
- */
+ uint32_t nsubscribers; /* Number of subscribers */
+ uint64_t generation; /* Mainline generation */
};
struct orb_object