You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by am...@apache.org on 2019/02/20 16:14:18 UTC

[trafficserver] branch master updated: Refresh mgmt signal logic to remove possible memory issues.

This is an automated email from the ASF dual-hosted git repository.

amc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new d57057d  Refresh mgmt signal logic to remove possible memory issues.
d57057d is described below

commit d57057d3590d73b58bc2b643501f1a584b94f54e
Author: Alan M. Carroll <am...@apache.org>
AuthorDate: Thu Oct 25 17:02:25 2018 -0500

    Refresh mgmt signal logic to remove possible memory issues.
---
 CMakeLists.txt                       |   1 +
 iocore/net/test_I_UDPNet.cc          |   2 +-
 lib/records/I_RecCore.h              |   8 ++-
 lib/records/P_RecMessage.h           |   3 +-
 lib/records/RecLocal.cc              |   6 +-
 lib/records/RecMessage.cc            |   8 +--
 lib/records/RecProcess.cc            |   6 +-
 mgmt/BaseManager.cc                  | 119 ++++++++++++++---------------------
 mgmt/BaseManager.h                   |  72 +++++++++++++++------
 mgmt/LocalManager.cc                 |  25 ++++----
 mgmt/MgmtDefs.h                      |  16 ++---
 mgmt/ProcessManager.cc               |  32 +++++-----
 proxy/logging/LogConfig.cc           |   7 +--
 proxy/logging/LogConfig.h            |   3 +-
 src/traffic_server/HostStatus.cc     |  22 ++++---
 src/traffic_server/traffic_server.cc |  51 +++++++--------
 16 files changed, 197 insertions(+), 184 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 03d296b..bfc012d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -107,6 +107,7 @@ CPP_ADD_SOURCES(mgmt mgmt/api)
 CPP_ADD_SOURCES(mgmt mgmt/utils)
 
 CPP_LIB(records lib/records lib/records)
+CPP_LIB(logging proxy/logging proxy/logging)
 
 CPP_LIB(tsconfig lib/tsconfig lib/tsconfig)
 CPP_LIB(wccp src/wccp include/wccp)
diff --git a/iocore/net/test_I_UDPNet.cc b/iocore/net/test_I_UDPNet.cc
index 5ee79f0..16810cb 100644
--- a/iocore/net/test_I_UDPNet.cc
+++ b/iocore/net/test_I_UDPNet.cc
@@ -362,7 +362,7 @@ StatPagesManager statPagesManager;
 inkcoreapi ProcessManager *pmgmt = nullptr;
 
 int
-BaseManager::registerMgmtCallback(int, MgmtCallback, void *)
+BaseManager::registerMgmtCallback(int, MgmtCallback const &)
 {
   ink_assert(false);
   return 0;
diff --git a/lib/records/I_RecCore.h b/lib/records/I_RecCore.h
index b915767..3bea5c5 100644
--- a/lib/records/I_RecCore.h
+++ b/lib/records/I_RecCore.h
@@ -23,13 +23,15 @@
 
 #pragma once
 
+#include <functional>
+
 #include "tscore/Diags.h"
 
 #include "I_RecDefs.h"
 #include "I_RecAlarms.h"
 #include "I_RecSignals.h"
 #include "I_RecEvents.h"
-#include <functional>
+#include "tscpp/util/MemSpan.h"
 
 struct RecRecord;
 
@@ -305,5 +307,5 @@ RecErrT RecSetSyncRequired(char *name, bool lock = true);
 //------------------------------------------------------------------------
 // Manager Callback
 //------------------------------------------------------------------------
-typedef void *(*RecManagerCb)(void *opaque_cb_data, char *data_raw, int data_len);
-int RecRegisterManagerCb(int _signal, RecManagerCb _fn, void *_data = nullptr);
+using RecManagerCb = std::function<void(ts::MemSpan)>;
+int RecRegisterManagerCb(int _signal, RecManagerCb const &_fn);
diff --git a/lib/records/P_RecMessage.h b/lib/records/P_RecMessage.h
index 24db24c..d41f74c 100644
--- a/lib/records/P_RecMessage.h
+++ b/lib/records/P_RecMessage.h
@@ -24,6 +24,7 @@
 #pragma once
 
 #include "P_RecDefs.h"
+#include "tscpp/util/MemSpan.h"
 
 //-------------------------------------------------------------------------
 // Initialization
@@ -44,7 +45,7 @@ int RecMessageUnmarshalNext(RecMessage *msg, RecMessageItr *itr, RecRecord **rec
 
 int RecMessageSend(RecMessage *msg);
 int RecMessageRegisterRecvCb(RecMessageRecvCb recv_cb, void *cookie);
-void *RecMessageRecvThis(void *cookie, char *data_raw, int data_len);
+void RecMessageRecvThis(ts::MemSpan);
 
 RecMessage *RecMessageReadFromDisk(const char *fpath);
 int RecMessageWriteToDisk(RecMessage *msg, const char *fpath);
diff --git a/lib/records/RecLocal.cc b/lib/records/RecLocal.cc
index ad927e5..34c7215 100644
--- a/lib/records/RecLocal.cc
+++ b/lib/records/RecLocal.cc
@@ -145,7 +145,7 @@ void
 RecMessageInit()
 {
   ink_assert(g_mode_type != RECM_NULL);
-  lmgmt->registerMgmtCallback(MGMT_SIGNAL_LIBRECORDS, RecMessageRecvThis, nullptr);
+  lmgmt->registerMgmtCallback(MGMT_SIGNAL_LIBRECORDS, &RecMessageRecvThis);
   message_initialized_p = true;
 }
 
@@ -207,9 +207,9 @@ RecLocalStart(FileManager *configFiles)
 }
 
 int
-RecRegisterManagerCb(int id, RecManagerCb _fn, void *_data)
+RecRegisterManagerCb(int id, RecManagerCb const &_fn)
 {
-  return lmgmt->registerMgmtCallback(id, _fn, _data);
+  return lmgmt->registerMgmtCallback(id, _fn);
 }
 
 void
diff --git a/lib/records/RecMessage.cc b/lib/records/RecMessage.cc
index 7436490..c18d502 100644
--- a/lib/records/RecMessage.cc
+++ b/lib/records/RecMessage.cc
@@ -31,6 +31,7 @@
 #include "P_RecUtils.h"
 #include "P_RecCore.h"
 #include "tscore/I_Layout.h"
+#include "tscpp/util/MemSpan.h"
 
 static RecMessageRecvCb g_recv_cb = nullptr;
 static void *g_recv_cookie        = nullptr;
@@ -243,12 +244,11 @@ RecMessageRegisterRecvCb(RecMessageRecvCb recv_cb, void *cookie)
 // RecMessageRecvThis
 //-------------------------------------------------------------------------
 
-void *
-RecMessageRecvThis(void * /* cookie */, char *data_raw, int /* data_len */)
+void
+RecMessageRecvThis(ts::MemSpan span)
 {
-  RecMessage *msg = (RecMessage *)data_raw;
+  RecMessage *msg = static_cast<RecMessage *>(span.data());
   g_recv_cb(msg, msg->msg_type, g_recv_cookie);
-  return nullptr;
 }
 
 //-------------------------------------------------------------------------
diff --git a/lib/records/RecProcess.cc b/lib/records/RecProcess.cc
index 08848f4..29fcb07 100644
--- a/lib/records/RecProcess.cc
+++ b/lib/records/RecProcess.cc
@@ -231,7 +231,7 @@ void
 RecMessageInit()
 {
   ink_assert(g_mode_type != RECM_NULL);
-  pmgmt->registerMgmtCallback(MGMT_EVENT_LIBRECORDS, RecMessageRecvThis, nullptr);
+  pmgmt->registerMgmtCallback(MGMT_EVENT_LIBRECORDS, &RecMessageRecvThis);
   message_initialized_p = true;
 }
 
@@ -300,9 +300,9 @@ RecSignalManager(int id, const char *msg, size_t msgsize)
 }
 
 int
-RecRegisterManagerCb(int _signal, RecManagerCb _fn, void *_data)
+RecRegisterManagerCb(int _signal, RecManagerCb const &_fn)
 {
-  return pmgmt->registerMgmtCallback(_signal, _fn, _data);
+  return pmgmt->registerMgmtCallback(_signal, _fn);
 }
 
 //-------------------------------------------------------------------------
diff --git a/mgmt/BaseManager.cc b/mgmt/BaseManager.cc
index abd99ce..86ef100 100644
--- a/mgmt/BaseManager.cc
+++ b/mgmt/BaseManager.cc
@@ -4,103 +4,80 @@
 
   @section license License
 
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
+  Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
+  See the NOTICE file distributed with this work for additional information regarding copyright
+  ownership.  The ASF licenses this file to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance with the License.  You may obtain a
+  copy of the License at
 
       http://www.apache.org/licenses/LICENSE-2.0
 
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
+  Unless required by applicable law or agreed to in writing, software distributed under the License
+  is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+  or implied. See the License for the specific language governing permissions and limitations under
+  the License.
  */
 
 #include "tscore/ink_memory.h"
+#include "tscore/ink_mutex.h"
 #include "BaseManager.h"
 
 BaseManager::BaseManager()
 {
-  /* Setup the event queue and callback tables */
-  mgmt_event_queue = create_queue();
-
-} /* End BaseManager::BaseManager */
+  ink_sem_init(&q_sem, 0);
+}
 
 BaseManager::~BaseManager()
 {
-  while (!queue_is_empty(mgmt_event_queue)) {
-    MgmtMessageHdr *mh = (MgmtMessageHdr *)dequeue(mgmt_event_queue);
-    ats_free(mh);
+  while (!queue.empty()) {
+    ats_free(queue.front());
+    queue.pop();
   }
-  ats_free(mgmt_event_queue);
-
-  for (auto &&it : mgmt_callback_table) {
-    MgmtCallbackList *tmp, *cb_list = it.second;
+}
 
-    for (tmp = cb_list->next; tmp; tmp = cb_list->next) {
-      ats_free(cb_list);
-      cb_list = tmp;
-    }
-    ats_free(cb_list);
-  }
+void
+BaseManager::enqueue(MgmtMessageHdr *mh)
+{
+  std::lock_guard lock(q_mutex);
+  queue.emplace(mh);
+  ink_sem_post(&q_sem);
+}
 
-  return;
-} /* End BaseManager::~BaseManager */
+bool
+BaseManager::queue_empty()
+{
+  std::lock_guard lock(q_mutex);
+  return queue.empty();
+}
 
-/*
- * registerMgmtCallback(...)
- *   Function to register callback's for various management events, such
- * as shutdown, re-init, etc. The following callbacks should be
- * registered:
- *                MGMT_EVENT_SHUTDOWN  (graceful shutdown)
- *                MGMT_EVENT_RESTART   (graceful reboot)
- *                ...
- *
- *   Returns:   -1      on error(invalid event id passed in)
- *               or     value
- */
-int
-BaseManager::registerMgmtCallback(int msg_id, MgmtCallback cb, void *opaque_cb_data)
+MgmtMessageHdr *
+BaseManager::dequeue()
 {
-  MgmtCallbackList *cb_list;
+  MgmtMessageHdr *msg{nullptr};
 
-  if (auto it = mgmt_callback_table.find(msg_id); it != mgmt_callback_table.end()) {
-    cb_list = it->second;
-  } else {
-    cb_list = nullptr;
+  ink_sem_wait(&q_sem);
+  {
+    std::lock_guard lock(q_mutex);
+    msg = queue.front();
+    queue.pop();
   }
+  return msg;
+}
 
-  if (cb_list) {
-    MgmtCallbackList *tmp;
-
-    for (tmp = cb_list; tmp->next; tmp = tmp->next) {
-      ;
-    }
-    tmp->next              = (MgmtCallbackList *)ats_malloc(sizeof(MgmtCallbackList));
-    tmp->next->func        = cb;
-    tmp->next->opaque_data = opaque_cb_data;
-    tmp->next->next        = nullptr;
-  } else {
-    cb_list              = (MgmtCallbackList *)ats_malloc(sizeof(MgmtCallbackList));
-    cb_list->func        = cb;
-    cb_list->opaque_data = opaque_cb_data;
-    cb_list->next        = nullptr;
-    mgmt_callback_table.emplace(msg_id, cb_list);
-  }
+int
+BaseManager::registerMgmtCallback(int msg_id, MgmtCallback const &cb)
+{
+  auto &cb_list{mgmt_callback_table[msg_id]};
+  cb_list.emplace_back(cb);
   return msg_id;
-} /* End BaseManager::registerMgmtCallback */
+}
 
 void
-BaseManager::executeMgmtCallback(int msg_id, char *data_raw, int data_len)
+BaseManager::executeMgmtCallback(int msg_id, ts::MemSpan span)
 {
   if (auto it = mgmt_callback_table.find(msg_id); it != mgmt_callback_table.end()) {
-    for (MgmtCallbackList *cb_list = it->second; cb_list; cb_list = cb_list->next) {
-      (*((MgmtCallback)(cb_list->func)))(cb_list->opaque_data, data_raw, data_len);
+    for (auto &&cb : it->second) {
+      cb(span);
     }
   }
 }
diff --git a/mgmt/BaseManager.h b/mgmt/BaseManager.h
index a254a72..9f72212 100644
--- a/mgmt/BaseManager.h
+++ b/mgmt/BaseManager.h
@@ -23,15 +23,18 @@
 
 #pragma once
 
+#include <list>
+#include <queue>
+#include <mutex>
+#include <unordered_map>
+
 #include "tscore/ink_thread.h"
 #include "tscore/ink_mutex.h"
-#include "tscore/ink_llqueue.h"
+#include "tscpp/util/MemSpan.h"
 
 #include "MgmtDefs.h"
 #include "MgmtMarshall.h"
 
-#include <unordered_map>
-
 /*
  * MgmtEvent defines.
  */
@@ -88,30 +91,61 @@
 
 #define MGMT_SIGNAL_SAC_SERVER_DOWN 400
 
-typedef struct _mgmt_message_hdr_type {
+struct MgmtMessageHdr {
   int msg_id;
   int data_len;
-} MgmtMessageHdr;
-
-typedef struct _mgmt_event_callback_list {
-  MgmtCallback func;
-  void *opaque_data;
-  struct _mgmt_event_callback_list *next;
-} MgmtCallbackList;
+  ts::MemSpan
+  payload()
+  {
+    return {reinterpret_cast<char *>(this) + sizeof(*this), data_len};
+  }
+};
 
 class BaseManager
 {
+  using MgmtCallbackList = std::list<MgmtCallback>;
+
 public:
   BaseManager();
-  ~BaseManager();
 
-  int registerMgmtCallback(int msg_id, MgmtCallback func, void *opaque_callback_data = nullptr);
+  ~BaseManager();
 
-  LLQ *mgmt_event_queue;
-  std::unordered_map<int, MgmtCallbackList *> mgmt_callback_table;
+  /** Associate a callback function @a func with message identifier @a msg_id.
+   *
+   * @param msg_id Message identifier for the callback.
+   * @param func The callback function.
+   * @return @a msg_id on success, -1 on failure.
+   *
+   * @a msg_id should be one of the @c MGMT_EVENT_... values.
+   *
+   * If a management message with @a msg is received, the callbacks for that message id
+   * are invoked and passed the message payload (not including the header).
+   */
+  int registerMgmtCallback(int msg_id, MgmtCallback const &func);
+
+  /// Add a @a msg to the queue.
+  /// This must be the entire message as read off the wire including the header.
+  void enqueue(MgmtMessageHdr *msg);
+
+  /// Current size of the queue.
+  /// @note This does not block on the semaphore.
+  bool queue_empty();
+
+  /// Dequeue a msg.
+  /// This waits on the semaphore for a message to arrive.
+  MgmtMessageHdr *dequeue();
 
 protected:
-  void executeMgmtCallback(int msg_id, char *data_raw, int data_len);
-
-private:
-}; /* End class BaseManager */
+  void executeMgmtCallback(int msg_id, ts::MemSpan span);
+
+  /// The mapping from an event type to a list of callbacks to invoke.
+  std::unordered_map<int, MgmtCallbackList> mgmt_callback_table;
+
+  /// Message queue.
+  // These holds the entire message object, including the header.
+  std::queue<MgmtMessageHdr *> queue;
+  /// Locked access to the queue.
+  std::mutex q_mutex;
+  /// Semaphore to signal queue state.
+  ink_semaphore q_sem;
+};
diff --git a/mgmt/LocalManager.cc b/mgmt/LocalManager.cc
index 0d35524..a9205be 100644
--- a/mgmt/LocalManager.cc
+++ b/mgmt/LocalManager.cc
@@ -579,9 +579,9 @@ LocalManager::handleMgmtMsgFromProcesses(MgmtMessageHdr *mh)
   }
   case MGMT_SIGNAL_LIBRECORDS:
     if (mh->data_len > 0) {
-      executeMgmtCallback(MGMT_SIGNAL_LIBRECORDS, data_raw, mh->data_len);
+      executeMgmtCallback(MGMT_SIGNAL_LIBRECORDS, {data_raw, mh->data_len});
     } else {
-      executeMgmtCallback(MGMT_SIGNAL_LIBRECORDS, nullptr, 0);
+      executeMgmtCallback(MGMT_SIGNAL_LIBRECORDS, {});
     }
     break;
   case MGMT_SIGNAL_CONFIG_FILE_CHILD: {
@@ -755,12 +755,15 @@ void
 LocalManager::signalEvent(int msg_id, const char *data_raw, int data_len)
 {
   MgmtMessageHdr *mh;
+  size_t n = sizeof(MgmtMessageHdr) + data_len;
 
-  mh           = (MgmtMessageHdr *)ats_malloc(sizeof(MgmtMessageHdr) + data_len);
+  mh           = static_cast<MgmtMessageHdr *>(ats_malloc(n));
   mh->msg_id   = msg_id;
   mh->data_len = data_len;
-  memcpy((char *)mh + sizeof(MgmtMessageHdr), data_raw, data_len);
-  ink_assert(enqueue(mgmt_event_queue, mh));
+  auto payload = mh->payload();
+  memcpy(payload.data(), data_raw, data_len);
+  this->enqueue(mh);
+  //  ink_assert(enqueue(mgmt_event_queue, mh));
 
 #if HAVE_EVENTFD
   // we don't care about the actual value of wakeup_fd, so just keep adding 1. just need to
@@ -788,16 +791,16 @@ LocalManager::processEventQueue()
 {
   bool handled_by_mgmt;
 
-  while (!queue_is_empty(mgmt_event_queue)) {
+  while (!this->queue_empty()) {
     handled_by_mgmt = false;
 
-    MgmtMessageHdr *mh = (MgmtMessageHdr *)dequeue(mgmt_event_queue);
-    char *data_raw     = (char *)mh + sizeof(MgmtMessageHdr);
+    MgmtMessageHdr *mh = this->dequeue();
+    auto payload       = mh->payload();
 
     // check if we have a local file update
     if (mh->msg_id == MGMT_EVENT_CONFIG_FILE_UPDATE || mh->msg_id == MGMT_EVENT_CONFIG_FILE_UPDATE_NO_INC_VERSION) {
       // records.config
-      if (!(strcmp(data_raw, REC_CONFIG_FILE))) {
+      if (!(strcmp(payload.begin(), REC_CONFIG_FILE))) {
         bool incVersion = mh->msg_id == MGMT_EVENT_CONFIG_FILE_UPDATE;
         if (RecReadConfigFile(incVersion) != REC_ERR_OKAY) {
           mgmt_elog(errno, "[fileUpdated] Config update failed for records.config\n");
@@ -813,10 +816,10 @@ LocalManager::processEventQueue()
         // Fix INKqa04984
         // If traffic server hasn't completely come up yet,
         // we will hold off until next round.
-        ink_assert(enqueue(mgmt_event_queue, mh));
+        this->enqueue(mh);
         return;
       }
-      Debug("lm", "[TrafficManager] ==> Sending signal event '%d' %s payload=%d", mh->msg_id, data_raw, mh->data_len);
+      Debug("lm", "[TrafficManager] ==> Sending signal event '%d' %s payload=%d", mh->msg_id, payload.begin(), int(payload.size()));
       lmgmt->sendMgmtMsgToProcesses(mh);
     }
     ats_free(mh);
diff --git a/mgmt/MgmtDefs.h b/mgmt/MgmtDefs.h
index ed8f681..6932868 100644
--- a/mgmt/MgmtDefs.h
+++ b/mgmt/MgmtDefs.h
@@ -30,6 +30,7 @@
 #include <string_view>
 
 #include "tscore/ink_defs.h"
+#include "tscpp/util/MemSpan.h"
 
 typedef int64_t MgmtIntCounter;
 typedef int64_t MgmtInt;
@@ -37,20 +38,19 @@ typedef int8_t MgmtByte;
 typedef float MgmtFloat;
 typedef char *MgmtString;
 
-typedef enum {
+enum MgmtType {
   MGMT_INVALID  = -1,
   MGMT_INT      = 0,
   MGMT_FLOAT    = 1,
   MGMT_STRING   = 2,
   MGMT_COUNTER  = 3,
   MGMT_TYPE_MAX = 4,
-} MgmtType;
+};
 
-/*
- * MgmtCallback
- *   Management Callback functions.
- */
-using MgmtCallback = void *(*)(void *opaque_cb_data, char *data_raw, int data_len);
+/// Management callback signature.
+/// The memory span is the message payload for the callback.
+/// This can be a lambda, which should be used if additional context information is needed.
+using MgmtCallback = std::function<void(ts::MemSpan)>;
 
 //-------------------------------------------------------------------------
 // API conversion functions.
@@ -142,4 +142,4 @@ inline MgmtConverter::MgmtConverter(MgmtInt (*_load_int)(void *), void (*_store_
 {
 }
 
-#define LM_CONNECTION_SERVER "processerver.sock"
+constexpr std::string_view LM_CONNECTION_SERVER{"processerver.sock"};
diff --git a/mgmt/ProcessManager.cc b/mgmt/ProcessManager.cc
index 4f52c9e..ad0c3f7 100644
--- a/mgmt/ProcessManager.cc
+++ b/mgmt/ProcessManager.cc
@@ -122,7 +122,7 @@ ProcessManager::stop()
   poll_thread = ink_thread_null();
 
   while (!queue_is_empty(mgmt_signal_queue)) {
-    char *sig = (char *)dequeue(mgmt_signal_queue);
+    char *sig = (char *)::dequeue(mgmt_signal_queue);
     ats_free(sig);
   }
 
@@ -244,7 +244,7 @@ ProcessManager::signalManager(int msg_id, const char *data_raw, int data_len)
   mh->data_len = data_len;
   memcpy((char *)mh + sizeof(MgmtMessageHdr), data_raw, data_len);
 
-  ink_release_assert(enqueue(mgmt_signal_queue, mh));
+  ink_release_assert(::enqueue(mgmt_signal_queue, mh));
 
 #if HAVE_EVENTFD
   // we don't care about the actual value of wakeup_fd, so just keep adding 1. just need to
@@ -265,7 +265,7 @@ int
 ProcessManager::processSignalQueue()
 {
   while (!queue_is_empty(mgmt_signal_queue)) {
-    MgmtMessageHdr *mh = (MgmtMessageHdr *)dequeue(mgmt_signal_queue);
+    MgmtMessageHdr *mh = (MgmtMessageHdr *)::dequeue(mgmt_signal_queue);
 
     Debug("pmgmt", "signaling local manager with message ID %d", mh->msg_id);
 
@@ -414,35 +414,35 @@ ProcessManager::handleMgmtMsgFromLM(MgmtMessageHdr *mh)
 {
   ink_assert(mh != nullptr);
 
-  char *data_raw = (char *)mh + sizeof(MgmtMessageHdr);
+  auto payload = mh->payload();
 
   Debug("pmgmt", "processing event id '%d' payload=%d", mh->msg_id, mh->data_len);
   switch (mh->msg_id) {
   case MGMT_EVENT_SHUTDOWN:
-    executeMgmtCallback(MGMT_EVENT_SHUTDOWN, nullptr, 0);
+    executeMgmtCallback(MGMT_EVENT_SHUTDOWN, {});
     Alert("exiting on shutdown message");
     break;
   case MGMT_EVENT_RESTART:
-    executeMgmtCallback(MGMT_EVENT_RESTART, nullptr, 0);
+    executeMgmtCallback(MGMT_EVENT_RESTART, {});
     break;
   case MGMT_EVENT_DRAIN:
-    executeMgmtCallback(MGMT_EVENT_DRAIN, data_raw, mh->data_len);
+    executeMgmtCallback(MGMT_EVENT_DRAIN, payload);
     break;
   case MGMT_EVENT_CLEAR_STATS:
-    executeMgmtCallback(MGMT_EVENT_CLEAR_STATS, nullptr, 0);
+    executeMgmtCallback(MGMT_EVENT_CLEAR_STATS, {});
     break;
   case MGMT_EVENT_HOST_STATUS_UP:
-    executeMgmtCallback(MGMT_EVENT_HOST_STATUS_UP, data_raw, mh->data_len);
+    executeMgmtCallback(MGMT_EVENT_HOST_STATUS_UP, payload);
     break;
   case MGMT_EVENT_HOST_STATUS_DOWN:
-    executeMgmtCallback(MGMT_EVENT_HOST_STATUS_DOWN, data_raw, mh->data_len);
+    executeMgmtCallback(MGMT_EVENT_HOST_STATUS_DOWN, payload);
     break;
   case MGMT_EVENT_ROLL_LOG_FILES:
-    executeMgmtCallback(MGMT_EVENT_ROLL_LOG_FILES, nullptr, 0);
+    executeMgmtCallback(MGMT_EVENT_ROLL_LOG_FILES, {});
     break;
   case MGMT_EVENT_PLUGIN_CONFIG_UPDATE:
-    if (data_raw != nullptr && data_raw[0] != '\0' && this->cbtable) {
-      this->cbtable->invoke(data_raw);
+    if (!payload.empty() && payload.at<char>(0) != '\0' && this->cbtable) {
+      this->cbtable->invoke(static_cast<char const *>(payload.data()));
     }
     break;
   case MGMT_EVENT_CONFIG_FILE_UPDATE:
@@ -463,13 +463,13 @@ ProcessManager::handleMgmtMsgFromLM(MgmtMessageHdr *mh)
     */
     break;
   case MGMT_EVENT_LIBRECORDS:
-    executeMgmtCallback(MGMT_EVENT_LIBRECORDS, data_raw, mh->data_len);
+    executeMgmtCallback(MGMT_EVENT_LIBRECORDS, payload);
     break;
   case MGMT_EVENT_STORAGE_DEVICE_CMD_OFFLINE:
-    executeMgmtCallback(MGMT_EVENT_STORAGE_DEVICE_CMD_OFFLINE, data_raw, mh->data_len);
+    executeMgmtCallback(MGMT_EVENT_STORAGE_DEVICE_CMD_OFFLINE, payload);
     break;
   case MGMT_EVENT_LIFECYCLE_MESSAGE:
-    executeMgmtCallback(MGMT_EVENT_LIFECYCLE_MESSAGE, data_raw, mh->data_len);
+    executeMgmtCallback(MGMT_EVENT_LIFECYCLE_MESSAGE, payload);
     break;
   default:
     Warning("received unknown message ID %d\n", mh->msg_id);
diff --git a/proxy/logging/LogConfig.cc b/proxy/logging/LogConfig.cc
index 4b09754..4437266 100644
--- a/proxy/logging/LogConfig.cc
+++ b/proxy/logging/LogConfig.cc
@@ -109,13 +109,10 @@ LogConfig::setup_default_values()
   max_line_size     = 9216; // size of pipe buffer for SunOS 5.6
 }
 
-void *
-LogConfig::reconfigure_mgmt_variables(void * /* token ATS_UNUSED */, char * /* data_raw ATS_UNUSED */,
-                                      int /* data_len ATS_UNUSED */)
+void LogConfig::reconfigure_mgmt_variables(ts::MemSpan)
 {
   Note("received log reconfiguration event, rolling now");
   Log::config->roll_log_files_now = true;
-  return nullptr;
 }
 
 void
@@ -668,7 +665,7 @@ LogConfig::register_stat_callbacks()
 void
 LogConfig::register_mgmt_callbacks()
 {
-  RecRegisterManagerCb(REC_EVENT_ROLL_LOG_FILES, &LogConfig::reconfigure_mgmt_variables, nullptr);
+  RecRegisterManagerCb(REC_EVENT_ROLL_LOG_FILES, &LogConfig::reconfigure_mgmt_variables);
 }
 
 /*-------------------------------------------------------------------------
diff --git a/proxy/logging/LogConfig.h b/proxy/logging/LogConfig.h
index d8efcf3..fc41390 100644
--- a/proxy/logging/LogConfig.h
+++ b/proxy/logging/LogConfig.h
@@ -31,6 +31,7 @@
 #include "records/P_RecProcess.h"
 #include "ProxyConfig.h"
 #include "LogObject.h"
+#include "tscpp/util/MemSpan.h"
 
 /* Instead of enumerating the stats in DynamicStats.h, each module needs
    to enumerate its stats separately and register them with librecords
@@ -210,7 +211,7 @@ public:
   void read_configuration_variables();
 
   // CVR This is the mgmt callback function, hence all the strange arguments
-  static void *reconfigure_mgmt_variables(void *token, char *data_raw, int data_len);
+  static void reconfigure_mgmt_variables(ts::MemSpan);
 
   int
   get_max_space_mb() const
diff --git a/src/traffic_server/HostStatus.cc b/src/traffic_server/HostStatus.cc
index 365e2e0..ae6302d 100644
--- a/src/traffic_server/HostStatus.cc
+++ b/src/traffic_server/HostStatus.cc
@@ -37,16 +37,18 @@ getStatName(std::string &stat_name, const char *name, const char *reason)
   }
 }
 
-static void *
-mgmt_host_status_up_callback(void *x, char *data, int len)
+static void
+mgmt_host_status_up_callback(ts::MemSpan span)
 {
   MgmtInt op;
   MgmtMarshallString name;
   MgmtMarshallInt down_time;
   MgmtMarshallString reason;
   std::string reason_stat;
+  char *data                             = static_cast<char *>(span.data());
+  auto len                               = span.size();
   static const MgmtMarshallType fields[] = {MGMT_MARSHALL_INT, MGMT_MARSHALL_STRING, MGMT_MARSHALL_STRING, MGMT_MARSHALL_INT};
-  Debug("host_statuses", "%s:%s:%d - data: %s, len: %d\n", __FILE__, __func__, __LINE__, data, len);
+  Debug("host_statuses", "%s:%s:%d - data: %s, len: %ld\n", __FILE__, __func__, __LINE__, data, len);
 
   if (mgmt_message_parse(data, len, fields, countof(fields), &op, &name, &reason, &down_time) == -1) {
     Error("Plugin message - RPC parsing error - message discarded.");
@@ -61,19 +63,20 @@ mgmt_host_status_up_callback(void *x, char *data, int len)
     }
     hs.setHostStatus(name, HostStatus_t::HOST_STATUS_UP, down_time, reason);
   }
-  return nullptr;
 }
 
-static void *
-mgmt_host_status_down_callback(void *x, char *data, int len)
+static void
+mgmt_host_status_down_callback(ts::MemSpan span)
 {
   MgmtInt op;
   MgmtMarshallString name;
   MgmtMarshallInt down_time;
   MgmtMarshallString reason;
   std::string reason_stat;
+  char *data                             = static_cast<char *>(span.data());
+  auto len                               = span.size();
   static const MgmtMarshallType fields[] = {MGMT_MARSHALL_INT, MGMT_MARSHALL_STRING, MGMT_MARSHALL_STRING, MGMT_MARSHALL_INT};
-  Debug("host_statuses", "%s:%s:%d - data: %s, len: %d\n", __FILE__, __func__, __LINE__, data, len);
+  Debug("host_statuses", "%s:%s:%d - data: %s, len: %ld\n", __FILE__, __func__, __LINE__, data, len);
 
   if (mgmt_message_parse(data, len, fields, countof(fields), &op, &name, &reason, &down_time) == -1) {
     Error("Plugin message - RPC parsing error - message discarded.");
@@ -89,15 +92,14 @@ mgmt_host_status_down_callback(void *x, char *data, int len)
     }
     hs.setHostStatus(name, HostStatus_t::HOST_STATUS_DOWN, down_time, reason);
   }
-  return nullptr;
 }
 
 HostStatus::HostStatus()
 {
   ink_rwlock_init(&host_status_rwlock);
   ink_rwlock_init(&host_statids_rwlock);
-  pmgmt->registerMgmtCallback(MGMT_EVENT_HOST_STATUS_UP, mgmt_host_status_up_callback, nullptr);
-  pmgmt->registerMgmtCallback(MGMT_EVENT_HOST_STATUS_DOWN, mgmt_host_status_down_callback, nullptr);
+  pmgmt->registerMgmtCallback(MGMT_EVENT_HOST_STATUS_UP, &mgmt_host_status_up_callback);
+  pmgmt->registerMgmtCallback(MGMT_EVENT_HOST_STATUS_DOWN, &mgmt_host_status_down_callback);
   host_status_rsb = RecAllocateRawStatBlock((int)TS_MAX_API_STATS);
 }
 
diff --git a/src/traffic_server/traffic_server.cc b/src/traffic_server/traffic_server.cc
index 2b9eaaa..eea54b4 100644
--- a/src/traffic_server/traffic_server.cc
+++ b/src/traffic_server/traffic_server.cc
@@ -114,10 +114,10 @@ extern "C" int plock(int);
 
 static const long MAX_LOGIN = ink_login_name_max();
 
-static void *mgmt_restart_shutdown_callback(void *, char *, int data_len);
-static void *mgmt_drain_callback(void *, char *, int data_len);
-static void *mgmt_storage_device_cmd_callback(void *x, char *data, int len);
-static void *mgmt_lifecycle_msg_callback(void *x, char *data, int len);
+static void mgmt_restart_shutdown_callback(ts::MemSpan);
+static void mgmt_drain_callback(ts::MemSpan);
+static void mgmt_storage_device_cmd_callback(int cmd, std::string_view const &arg);
+static void mgmt_lifecycle_msg_callback(ts::MemSpan);
 static void init_ssl_ctx_callback(void *ctx, bool server);
 static void load_ssl_file_callback(const char *ssl_file, unsigned int options);
 static void load_remap_file_callback(const char *remap_file);
@@ -1982,16 +1982,17 @@ main(int /* argc ATS_UNUSED */, const char **argv)
       start_SocksProxy(netProcessor.socks_conf_stuff->accept_port);
     }
 
-    pmgmt->registerMgmtCallback(MGMT_EVENT_SHUTDOWN, mgmt_restart_shutdown_callback, nullptr);
-    pmgmt->registerMgmtCallback(MGMT_EVENT_RESTART, mgmt_restart_shutdown_callback, nullptr);
-    pmgmt->registerMgmtCallback(MGMT_EVENT_DRAIN, mgmt_drain_callback, nullptr);
+    pmgmt->registerMgmtCallback(MGMT_EVENT_SHUTDOWN, &mgmt_restart_shutdown_callback);
+    pmgmt->registerMgmtCallback(MGMT_EVENT_RESTART, &mgmt_restart_shutdown_callback);
+    pmgmt->registerMgmtCallback(MGMT_EVENT_DRAIN, &mgmt_drain_callback);
 
     // Callback for various storage commands. These all go to the same function so we
     // pass the event code along so it can do the right thing. We cast that to <int> first
     // just to be safe because the value is a #define, not a typed value.
-    pmgmt->registerMgmtCallback(MGMT_EVENT_STORAGE_DEVICE_CMD_OFFLINE, mgmt_storage_device_cmd_callback,
-                                reinterpret_cast<void *>(static_cast<int>(MGMT_EVENT_STORAGE_DEVICE_CMD_OFFLINE)));
-    pmgmt->registerMgmtCallback(MGMT_EVENT_LIFECYCLE_MESSAGE, mgmt_lifecycle_msg_callback, nullptr);
+    pmgmt->registerMgmtCallback(MGMT_EVENT_STORAGE_DEVICE_CMD_OFFLINE, [](ts::MemSpan span) -> void {
+      mgmt_storage_device_cmd_callback(MGMT_EVENT_STORAGE_DEVICE_CMD_OFFLINE, std::string_view{span});
+    });
+    pmgmt->registerMgmtCallback(MGMT_EVENT_LIFECYCLE_MESSAGE, &mgmt_lifecycle_msg_callback);
 
     ink_set_thread_name("[TS_MAIN]");
 
@@ -2036,42 +2037,37 @@ REGRESSION_TEST(Hdrs)(RegressionTest *t, int atype, int *pstatus)
 }
 #endif
 
-static void *
-mgmt_restart_shutdown_callback(void *, char *, int /* data_len ATS_UNUSED */)
+static void mgmt_restart_shutdown_callback(ts::MemSpan)
 {
   sync_cache_dir_on_shutdown();
-  return nullptr;
 }
 
-static void *
-mgmt_drain_callback(void *, char *arg, int len)
+static void
+mgmt_drain_callback(ts::MemSpan span)
 {
-  ts_is_draining = (len == 2 && arg[0] == '1');
+  char *arg      = static_cast<char *>(span.data());
+  ts_is_draining = (span.size() == 2 && arg[0] == '1');
   RecSetRecordInt("proxy.node.config.draining", ts_is_draining ? 1 : 0, REC_SOURCE_DEFAULT);
-  return nullptr;
 }
 
-static void *
-mgmt_storage_device_cmd_callback(void *data, char *arg, int len)
+static void
+mgmt_storage_device_cmd_callback(int cmd, std::string_view const &arg)
 {
   // data is the device name to control
-  CacheDisk *d = cacheProcessor.find_by_path(arg, len);
-  // Actual command is in @a data.
-  intptr_t cmd = reinterpret_cast<intptr_t>(data);
+  CacheDisk *d = cacheProcessor.find_by_path(arg.data(), int(arg.size()));
 
   if (d) {
     switch (cmd) {
     case MGMT_EVENT_STORAGE_DEVICE_CMD_OFFLINE:
-      Debug("server", "Marking %.*s offline", len, arg);
+      Debug("server", "Marking %.*s offline", int(arg.size()), arg.data());
       cacheProcessor.mark_storage_offline(d, /* admin */ true);
       break;
     }
   }
-  return nullptr;
 }
 
-static void *
-mgmt_lifecycle_msg_callback(void *, char *data, int len)
+static void
+mgmt_lifecycle_msg_callback(ts::MemSpan span)
 {
   APIHook *hook = lifecycle_hooks->get(TS_LIFECYCLE_MSG_HOOK);
   TSPluginMsg msg;
@@ -2080,7 +2076,7 @@ mgmt_lifecycle_msg_callback(void *, char *data, int len)
   MgmtMarshallData payload;
   static const MgmtMarshallType fields[] = {MGMT_MARSHALL_INT, MGMT_MARSHALL_STRING, MGMT_MARSHALL_DATA};
 
-  if (mgmt_message_parse(data, len, fields, countof(fields), &op, &tag, &payload) == -1) {
+  if (mgmt_message_parse(span.data(), span.size(), fields, countof(fields), &op, &tag, &payload) == -1) {
     Error("Plugin message - RPC parsing error - message discarded.");
   } else {
     msg.tag       = tag;
@@ -2092,7 +2088,6 @@ mgmt_lifecycle_msg_callback(void *, char *data, int len)
       hook = hook->next();
     }
   }
-  return nullptr;
 }
 
 static void