You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2014/08/26 00:57:16 UTC

[3/7] TS-3033: reimplement management API with generic marshalling

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/6250f431/mgmt/api/NetworkMessage.h
----------------------------------------------------------------------
diff --git a/mgmt/api/NetworkMessage.h b/mgmt/api/NetworkMessage.h
new file mode 100644
index 0000000..412c23c
--- /dev/null
+++ b/mgmt/api/NetworkMessage.h
@@ -0,0 +1,89 @@
+/** @file
+
+  Network message marshalling.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#ifndef _NETWORK_MESSAGE_H_
+#define _NETWORK_MESSAGE_H_
+
+#include "MgmtMarshall.h"
+
+#define REMOTE_DELIM ':'
+#define REMOTE_DELIM_STR ":"
+
+#define MAX_CONN_TRIES 10       // maximum number of attemps to reconnect to TM
+
+// the possible operations or msg types sent from remote client to TM
+typedef enum
+{
+  FILE_READ,
+  FILE_WRITE,
+  RECORD_SET,
+  RECORD_GET,
+  PROXY_STATE_GET,
+  PROXY_STATE_SET,
+  RECONFIGURE,
+  RESTART,
+  BOUNCE,
+  EVENT_RESOLVE,
+  EVENT_GET_MLT,
+  EVENT_ACTIVE,
+  EVENT_REG_CALLBACK,
+  EVENT_UNREG_CALLBACK,
+  EVENT_NOTIFY,                 /* only msg sent from TM to client */
+  SNAPSHOT_TAKE,
+  SNAPSHOT_RESTORE,
+  SNAPSHOT_REMOVE,
+  SNAPSHOT_GET_MLT,
+  DIAGS,
+  STATS_RESET_NODE,
+  STATS_RESET_CLUSTER,
+  STORAGE_DEVICE_CMD_OFFLINE,
+  RECORD_MATCH_GET,
+  API_PING,
+  UNDEFINED_OP /* This must be last */
+} OpType;
+
+struct mgmt_message_sender
+{
+    virtual TSMgmtError send(void * msg, size_t msglen) const = 0;
+};
+
+// Marshall and send a request, prefixing the message length as a MGMT_MARSHALL_INT.
+TSMgmtError send_mgmt_request(const mgmt_message_sender& snd, OpType optype, ...);
+TSMgmtError send_mgmt_request(int fd, OpType optype, ...);
+
+// Parse a request message from a buffer.
+TSMgmtError recv_mgmt_request(void * buf, size_t buflen, OpType optype, ...);
+
+// Marshall and send a response, prefixing the message length as a MGMT_MARSHALL_INT.
+TSMgmtError send_mgmt_response(int fd, OpType optype, ...);
+
+// Parse a response message from a buffer.
+TSMgmtError recv_mgmt_response(void * buf, size_t buflen, OpType optype, ...);
+
+// Pull a management message (either request or response) off the wire.
+TSMgmtError recv_mgmt_message(int fd, MgmtMarshallData& msg);
+
+// Extract the first MGMT_MARSHALL_INT from the buffered message. This is the OpType.
+OpType extract_mgmt_request_optype(void * msg, size_t msglen);
+
+#endif /* _NETWORK_MESSAGE_H_ */

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/6250f431/mgmt/api/NetworkUtilsDefs.h
----------------------------------------------------------------------
diff --git a/mgmt/api/NetworkUtilsDefs.h b/mgmt/api/NetworkUtilsDefs.h
deleted file mode 100644
index bd94fdc..0000000
--- a/mgmt/api/NetworkUtilsDefs.h
+++ /dev/null
@@ -1,87 +0,0 @@
-/** @file
-
-  A brief file description
-
-  @section license License
-
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
- */
-
-/***************************************************************************
- * NetworkUtilsDefs.h
- *
- * contains general definitions used by both NetworkUtilsRemote and
- * NetworkUtilsLocal
- *
- *
- ***************************************************************************/
-
-#ifndef _NETWORK_UTILS_DEFS_H_
-#define _NETWORK_UTILS_DEFS_H_
-
-#define REMOTE_DELIM ':'
-#define REMOTE_DELIM_STR ":"
-
-#define MAX_CONN_TRIES 10       // maximum number of attemps to reconnect to TM
-#define MAX_TIME_WAIT  60       // num secs for a timeout on a select call (remote only)
-
-// measure in bytes used in construcing network messages
-#define SIZE_OP_T     2         // num bytes used to specify OpType
-#define SIZE_FILE_T   2         // num bytes used to specify INKFileNameT
-#define SIZE_LEN      4         // max num bytes used to specify length of anything
-#define SIZE_ERR_T    2         // num bytes used to specify INKError return value
-#define SIZE_VER      2         // num bytes used to specify file version
-#define SIZE_REC_T    2         // num bytes used to specify INKRecordT
-#define SIZE_PROXY_T  2         // num bytes used to specify INKProxyStateT
-#define SIZE_TS_ARG_T 2         // num bytes used to specify INKCacheClearT
-#define SIZE_DIAGS_T  2         // num bytes used to specify INKDiagsT
-#define SIZE_BOOL     2
-#define SIZE_ACTION_T 2         // num bytes used to specify INKActionNeedT
-#define SIZE_EVENT_ID 2         // num bytes used to specify event_id
-
-
-// the possible operations or msg types sent from remote client to TM
-typedef enum
-{
-  FILE_READ,
-  FILE_WRITE,
-  RECORD_SET,
-  RECORD_GET,
-  PROXY_STATE_GET,
-  PROXY_STATE_SET,
-  RECONFIGURE,
-  RESTART,
-  BOUNCE,
-  EVENT_RESOLVE,
-  EVENT_GET_MLT,
-  EVENT_ACTIVE,
-  EVENT_REG_CALLBACK,
-  EVENT_UNREG_CALLBACK,
-  EVENT_NOTIFY,                 /* only msg sent from TM to client */
-  SNAPSHOT_TAKE,
-  SNAPSHOT_RESTORE,
-  SNAPSHOT_REMOVE,
-  SNAPSHOT_GET_MLT,
-  DIAGS,
-  STATS_RESET_NODE,
-  STATS_RESET_CLUSTER,
-  STORAGE_DEVICE_CMD_OFFLINE,
-  RECORD_MATCH_GET,
-  UNDEFINED_OP /* This must be last */
-} OpType;
-
-#endif

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/6250f431/mgmt/api/NetworkUtilsLocal.cc
----------------------------------------------------------------------
diff --git a/mgmt/api/NetworkUtilsLocal.cc b/mgmt/api/NetworkUtilsLocal.cc
index 8da5d63..ae812ab 100644
--- a/mgmt/api/NetworkUtilsLocal.cc
+++ b/mgmt/api/NetworkUtilsLocal.cc
@@ -35,138 +35,10 @@
 #include "Diags.h"
 #include "MgmtUtils.h"
 #include "MgmtSocket.h"
+#include "MgmtMarshall.h"
 #include "CoreAPIShared.h"
 #include "NetworkUtilsLocal.h"
-
-#ifndef MAX_BUF_SIZE
-#define MAX_BUF_SIZE 4096
-#endif
-
-/**************************************************************************
- * socket_flush
- *
- * flushes the socket by reading the entire message out of the socket
- * and then gets rid of the msg
- **************************************************************************/
-TSMgmtError
-socket_flush(struct SocketInfo sock_info)
-{
-  int ret, byte_read = 0;
-  char buf[MAX_BUF_SIZE];
-
-  // check to see if anything to read; wait only for specified time
-  if (mgmt_read_timeout(sock_info.fd, MAX_TIME_WAIT, 0) <= 0) {
-    return TS_ERR_NET_TIMEOUT;
-  }
-  // read entire message
-  while (byte_read < MAX_BUF_SIZE) {
-
-    ret = socket_read(sock_info, buf + byte_read, MAX_BUF_SIZE - byte_read);
-
-    if (ret < 0) {
-      if (errno == EAGAIN)
-        continue;
-
-      Debug("ts_main", "[socket_read_n] socket read for version byte failed.\n");
-      mgmt_elog(0, "[socket_flush] (TS_ERR_NET_READ) %s\n", strerror(errno));
-      return TS_ERR_NET_READ;
-    }
-
-    if (ret == 0) {
-      Debug("ts_main", "[socket_read_n] returned 0 on reading: %s.\n", strerror(errno));
-      return TS_ERR_NET_EOF;
-    }
-    // we are all good here
-    byte_read += ret;
-  }
-
-  mgmt_elog(0, "[socket_flush] uh oh! didn't finish flushing socket!\n");
-  return TS_ERR_FAIL;
-}
-
-/**************************************************************************
- * socket_read_n
- *
- * purpose: guarantees reading of n bytes or return error.
- * input:   socket info struct, buffer to read into and number of bytes to read
- * output:  number of bytes read
- * note:    socket_read is implemented in WebUtils.cc
- *************************************************************************/
-TSMgmtError
-socket_read_n(struct SocketInfo sock_info, char *buf, int bytes)
-{
-  int ret, byte_read = 0;
-
-  // check to see if anything to read; wait for specified time
-  if (mgmt_read_timeout(sock_info.fd, MAX_TIME_WAIT, 0) <= 0) {
-    return TS_ERR_NET_TIMEOUT;
-  }
-  // read until we fulfill the number
-  while (byte_read < bytes) {
-    ret = socket_read(sock_info, buf + byte_read, bytes - byte_read);
-
-    // error!
-    if (ret < 0) {
-      if (errno == EAGAIN)
-        continue;
-
-      Debug("ts_main", "[socket_read_n] socket read for version byte failed.\n");
-      mgmt_elog(0, "[socket_read_n] (TS_ERR_NET_READ) %s\n", strerror(errno));
-      return TS_ERR_NET_READ;
-    }
-
-    if (ret == 0) {
-      Debug("ts_main", "[socket_read_n] returned 0 on reading: %s.\n", strerror(errno));
-      return TS_ERR_NET_EOF;
-    }
-    // we are all good here
-    byte_read += ret;
-  }
-
-  return TS_ERR_OKAY;
-}
-
-/**************************************************************************
- * socket_write_n
- *
- * purpose: guarantees writing of n bytes or return error
- * input:   socket info struct, buffer to write from & number of bytes to write
- * output:  TS_ERR_xx (depends on num bytes written)
- * note:    socket_read is implemented in WebUtils.cc
- *************************************************************************/
-TSMgmtError
-socket_write_n(struct SocketInfo sock_info, const char *buf, int bytes)
-{
-  int ret, byte_wrote = 0;
-
-  // makes sure the socket descriptor is writable
-  if (mgmt_write_timeout(sock_info.fd, MAX_TIME_WAIT, 0) <= 0) {
-    return TS_ERR_NET_TIMEOUT;
-  }
-  // read until we fulfill the number
-  while (byte_wrote < bytes) {
-    ret = socket_write(sock_info, buf + byte_wrote, bytes - byte_wrote);
-
-    if (ret < 0) {
-      Debug("ts_main", "[socket_write_n] return error %s \n", strerror(errno));
-      mgmt_elog(0, "[socket_write_n] %s\n", strerror(errno));
-      if (errno == EAGAIN)
-        continue;
-
-      return TS_ERR_NET_WRITE;
-    }
-
-    if (ret == 0) {
-      mgmt_elog(0, "[socket_write_n] %s\n", strerror(errno));
-      return TS_ERR_NET_EOF;
-    }
-    // we are all good here
-    byte_wrote += ret;
-  }
-
-  return TS_ERR_OKAY;
-}
-
+#include "NetworkMessage.h"
 
 /**********************************************************************
  * preprocess_msg
@@ -174,602 +46,32 @@ socket_write_n(struct SocketInfo sock_info, const char *buf, int bytes)
  * purpose: reads in all the message; parses the message into header info
  *          (OpType + msg_len) and the request portion (used by the handle_xx fns)
  * input: sock_info - socket msg is read from
- *        op_t      - the operation type specified in the msg
  *        msg       - the data from the network message (no OpType or msg_len)
  * output: TS_ERR_xx ( if TS_ERR_OKAY, then parameters set successfully)
  * notes: Since preprocess_msg already removes the OpType and msg_len, this part o
  *        the message is not dealt with by the other parsing functions
  **********************************************************************/
 TSMgmtError
-preprocess_msg(struct SocketInfo sock_info, OpType * op_t, char **req)
+preprocess_msg(int fd, void ** req, size_t * reqlen)
 {
   TSMgmtError ret;
-  int req_len;
-  int16_t op;
-
-  // read operation type
-  ret = socket_read_n(sock_info, (char *) &op, SIZE_OP_T);
-  if (ret != TS_ERR_OKAY) {
-    Debug("ts_main", "[preprocess_msg] ERROR %d reading op type\n", ret);
-    goto Lerror;
-  }
-
-  Debug("ts_main", "[preprocess_msg] operation = %d", op);
-  *op_t = (OpType) op;          // convert to proper format
-
-  // check if invalid op type
-  if ((int) op > UNDEFINED_OP) {
-    mgmt_elog(0, "[preprocess_msg] ERROR: %d is invalid op type\n", op);
+  MgmtMarshallData msg;
 
-    // need to flush the invalid message from the socket
-    if ((ret = socket_flush(sock_info)) != TS_ERR_NET_EOF)
-      mgmt_log("[preprocess_msg] unsuccessful socket flushing\n");
-    else
-      mgmt_log("[preprocess_msg] successfully flushed the socket\n");
+  *req = NULL;
+  *reqlen = 0;
 
-    goto Lerror;
-  }
-  // now read the request msg size
-  ret = socket_read_n(sock_info, (char *) &req_len, SIZE_LEN);
+  ret = recv_mgmt_message(fd, msg);
   if (ret != TS_ERR_OKAY) {
-    mgmt_elog(0, "[preprocess_msg] ERROR %d reading msg size\n", ret);
-    Debug("ts_main", "[preprocess_msg] ERROR %d reading msg size\n", ret);
-    goto Lerror;
+    return ret;
   }
 
-  Debug("ts_main", "[preprocess_msg] length = %d\n", req_len);
-
-  // use req msg length to fetch the rest of the message
-  // first check that there is a "rest of the msg", some msgs just
-  // have the op specified
-  if (req_len == 0) {
-    *req = NULL;
-    Debug("ts_main", "[preprocess_msg] request message = NULL\n");
-  } else {
-    *req = (char *)ats_malloc(sizeof(char) * (req_len + 1));
-    ret = socket_read_n(sock_info, *req, req_len);
-    if (ret != TS_ERR_OKAY) {
-      ats_free(*req);
-      goto Lerror;
-    }
-    // add end of string to end of msg
-    (*req)[req_len] = '\0';
-    Debug("ts_main", "[preprocess_msg] request message = %s\n", *req);
+  // We should never receive an empty payload.
+  if (msg.ptr == NULL) {
+    return TS_ERR_NET_READ;
   }
 
+  *req = msg.ptr;
+  *reqlen = msg.len;
+  Debug("ts_main", "[preprocess_msg] read message length = %zd\n", msg.len);
   return TS_ERR_OKAY;
-
-Lerror:
-  return ret;
-}
-
-
-/**********************************************************************
- * Unmarshal Requests
- **********************************************************************/
-
-/**********************************************************************
- * parse_file_read_request
- *
- * purpose: parses a file read request from a remote API client
- * input: req - data that needs to be parsed
- *        file - the file type sent in the request
- * output: TS_ERR_xx
- * notes: request format = <TSFileNameT>
- **********************************************************************/
-TSMgmtError
-parse_file_read_request(char *req, TSFileNameT * file)
-{
-  int16_t file_t;
-
-  if (!req || !file)
-    return TS_ERR_PARAMS;
-
-  // get file type - copy first 2 bytes of request
-  memcpy(&file_t, req, SIZE_FILE_T);
-  *file = (TSFileNameT) file_t;
-
-  return TS_ERR_OKAY;
-}
-
-/**********************************************************************
- * parse_file_write_request
- *
- * purpose: parses a file write request from a remote API client
- * input: socket info
- *        file - the file type to write that was sent in the request
- *        text - the text that needs to be written
- *        size - length of the text
- *        ver  - version of the file that is to be written
- * output: TS_ERR_xx
- * notes: request format = <TSFileNameT> <version> <size> <text>
- **********************************************************************/
-TSMgmtError
-parse_file_write_request(char *req, TSFileNameT * file, int *ver, int *size, char **text)
-{
-  int16_t file_t, f_ver;
-  int32_t f_size;
-
-  // check input is non-NULL
-  if (!req || !file || !ver || !size || !text)
-    return TS_ERR_PARAMS;
-
-  // get file type - copy first 2 bytes of request
-  memcpy(&file_t, req, SIZE_FILE_T);
-  *file = (TSFileNameT) file_t;
-
-  // get file version - copy next 2 bytes
-  memcpy(&f_ver, req + SIZE_FILE_T, SIZE_VER);
-  *ver = (int) f_ver;
-
-  // get file size - copy next 4 bytes
-  memcpy(&f_size, req + SIZE_FILE_T + SIZE_VER, SIZE_LEN);
-  *size = (int) f_size;
-
-  // get file text
-  *text = (char *)ats_malloc(sizeof(char) * (f_size + 1));
-  memcpy(*text, req + SIZE_FILE_T + SIZE_VER + SIZE_LEN, f_size);
-  (*text)[f_size] = '\0';       // end buffer
-
-  return TS_ERR_OKAY;
-}
-
-/**********************************************************************
- * parse_request_name_value
- *
- * purpose: parses a request w/ 2 args from a remote API client
- * input: req - request info from requestor
- *        name - first arg
- *        val  - second arg
- * output: TS_ERR_xx
- * notes: format= <name_len> <val_len> <name> <val>
- **********************************************************************/
-TSMgmtError
-parse_request_name_value(char *req, char **name_1, char **val_1)
-{
-  int32_t name_len, val_len;
-  char *name, *val;
-
-  if (!req || !name_1 || !val_1)
-    return TS_ERR_PARAMS;
-
-  // get record name length
-  memcpy(&name_len, req, SIZE_LEN);
-
-  // get record value length
-  memcpy(&val_len, req + SIZE_LEN, SIZE_LEN);
-
-  // get record name
-  name = (char *)ats_malloc(sizeof(char) * (name_len + 1));
-  memcpy(name, req + SIZE_LEN + SIZE_LEN, name_len);
-  name[name_len] = '\0';        // end string
-  *name_1 = name;
-
-  // get record value - can be a MgmtInt, MgmtCounter ...
-  val = (char *)ats_malloc(sizeof(char) * (val_len + 1));
-  memcpy(val, req + SIZE_LEN + SIZE_LEN + name_len, val_len);
-  val[val_len] = '\0';          // end string
-  *val_1 = val;
-
-  return TS_ERR_OKAY;
-}
-
-
-/**********************************************************************
- * parse_diags_request
- *
- * purpose: parses a diags request
- * input: diag_msg - the diag msg to be outputted
- *        mode     - indicates what type of diag message
- * output: TS_ERR_xx
- * notes: request format = <TSDiagsT> <diag_msg_len> <diag_msg>
- **********************************************************************/
-TSMgmtError
-parse_diags_request(char *req, TSDiagsT * mode, char **diag_msg)
-{
-  int16_t diag_t;
-  int32_t msg_len;
-
-  // check input is non-NULL
-  if (!req || !mode || !diag_msg)
-    return TS_ERR_PARAMS;
-
-  // get diags type - copy first 2 bytes of request
-  memcpy(&diag_t, req, SIZE_DIAGS_T);
-  *mode = (TSDiagsT) diag_t;
-
-  // get msg size - copy next 4 bytes
-  memcpy(&msg_len, req + SIZE_DIAGS_T, SIZE_LEN);
-
-  // get msg
-  *diag_msg = (char *)ats_malloc(sizeof(char) * (msg_len + 1));
-  memcpy(*diag_msg, req + SIZE_DIAGS_T + SIZE_LEN, msg_len);
-  (*diag_msg)[msg_len] = '\0';  // end buffer
-
-  return TS_ERR_OKAY;
-}
-
-/**********************************************************************
- * parse_proxy_state_request
- *
- * purpose: parses a request to set the proxy state
- * input: diag_msg - the diag msg to be outputted
- *        mode     - indicates what type of diag message
- * output: TS_ERR_xx
- * notes: request format = <TSProxyStateT> <TSCacheClearT>
- **********************************************************************/
-TSMgmtError
-parse_proxy_state_request(char *req, TSProxyStateT * state, TSCacheClearT * clear)
-{
-  int16_t state_t, cache_t;
-
-  // check input is non-NULL
-  if (!req || !state || !clear)
-    return TS_ERR_PARAMS;
-
-  // get proxy on/off
-  memcpy(&state_t, req, SIZE_PROXY_T);
-  *state = (TSProxyStateT) state_t;
-
-  // get cahce-clearing type
-  memcpy(&cache_t, req + SIZE_PROXY_T, SIZE_TS_ARG_T);
-  *clear = (TSCacheClearT) cache_t;
-
-  return TS_ERR_OKAY;
-}
-
-/**********************************************************************
- * Marshal Replies
- **********************************************************************/
-/* NOTE: if the send function "return"s before writing to the socket
-  then that means that an error occurred, and so the calling function
-  must send_reply with the error that occurred. */
-
-/**********************************************************************
- * send_reply
- *
- * purpose: sends a simple TS_ERR_* reply to the request made
- * input: return value - could be extended to support more complex
- *        error codes but for now use only TS_ERR_FAIL, TS_ERR_OKAY
- *        int fd - socket fd to use.
- * output: TS_ERR_*
- * notes: this function does not need to go through the internal structure
- *        so no cleaning up is done.
- **********************************************************************/
-TSMgmtError
-send_reply(struct SocketInfo sock_info, TSMgmtError retval)
-{
-  TSMgmtError ret;
-  char msg[SIZE_ERR_T];
-  int16_t ret_val;
-
-  // write the return value
-  ret_val = (int16_t) retval;
-  memcpy(msg, (void *) &ret_val, SIZE_ERR_T);
-
-  // now push it to the socket
-  ret = socket_write_n(sock_info, msg, SIZE_ERR_T);
-
-  return ret;
-}
-
-/**********************************************************************
- * send_reply_list
- *
- * purpose: sends the reply in response to a request to get list of string
- *          tokens (delimited by REMOTE_DELIM_STR)
- * input: sock_info -
- *        retval - TSMgmtError return type for the CoreAPI call
- *        list - string delimited list of string tokens
- * output: TS_ERR_*
- * notes:
- * format: <TSMgmtError> <string_list_len> <delimited_string_list>
- **********************************************************************/
-TSMgmtError
-send_reply_list(struct SocketInfo sock_info, TSMgmtError retval, char *list)
-{
-  TSMgmtError ret;
-  int msg_pos = 0, total_len;
-  char *msg;
-  int16_t ret_val;
-  int32_t list_size;              // to be safe, typecast
-
-  if (!list) {
-    return TS_ERR_PARAMS;
-  }
-
-  total_len = SIZE_ERR_T + SIZE_LEN + strlen(list);
-  msg = (char *)ats_malloc(sizeof(char) * total_len);
-
-  // write the return value
-  ret_val = (int16_t) retval;
-  memcpy(msg, (void *) &ret_val, SIZE_ERR_T);
-  msg_pos += SIZE_ERR_T;
-
-  // write the length of the string list
-  list_size = (int32_t) strlen(list);
-  memcpy(msg + msg_pos, (void *) &list_size, SIZE_LEN);
-  msg_pos += SIZE_LEN;
-
-  // write the event string list
-  memcpy(msg + msg_pos, list, list_size);
-
-  // now push it to the socket
-  ret = socket_write_n(sock_info, msg, total_len);
-  ats_free(msg);
-
-  return ret;
-}
-
-
-/**********************************************************************
- * send_record_get_reply
- *
- * purpose: sends reply to the record_get request made
- * input: retval   - result of the record get request
- *        int fd   - socket fd to use.
- *        val      - the value of the record requested
- *        val_size - num bytes the value occupies
- *        rec_type - the type of the record value requested
- * output: TS_ERR_*
- * notes: this function does not need to go through the internal structure
- *        so no cleaning up is done.
- *        format = <TSMgmtError> <rec_val_len> <name_size> <rec_type> <rec_val> <rec_name>
- **********************************************************************/
-TSMgmtError
-send_record_get_reply(struct SocketInfo sock_info, TSMgmtError retval, void *val, int val_size,
-    TSRecordT rec_type, const char *rec_name)
-{
-  TSMgmtError ret;
-  int msg_pos = 0, total_len;
-  char *msg;
-  int16_t record_t, ret_val;
-  int32_t v_size = (int32_t) val_size; // to be safe, typecast
-  int32_t n_size = rec_name ? (int32_t)strlen(rec_name) : 0;
-
-  total_len = SIZE_ERR_T + SIZE_LEN + SIZE_LEN + SIZE_REC_T + v_size + n_size;
-  msg = (char *)ats_malloc(sizeof(char) * total_len);
-
-  // write the return value
-  ret_val = (int16_t) retval;
-  memcpy(msg, (void *) &ret_val, SIZE_ERR_T);
-  msg_pos += SIZE_ERR_T;
-
-  // write the size of the record value
-  memcpy(msg + msg_pos, (void *) &v_size, SIZE_LEN);
-  msg_pos += SIZE_LEN;
-
-  // write the size of the record name
-  memcpy(msg + msg_pos, (void *) &n_size, SIZE_LEN);
-  msg_pos += SIZE_LEN;
-
-  // write the record type
-  record_t = (int16_t) rec_type;
-  memcpy(msg + msg_pos, (void *) &record_t, SIZE_REC_T);
-  msg_pos += SIZE_REC_T;
-
-  // write the record value
-  if (v_size) {
-    memcpy(msg + msg_pos, val, v_size);
-    msg_pos += v_size;
-  }
-
-  // write the record name
-  if (n_size) {
-    memcpy(msg + msg_pos, rec_name, n_size);
-  }
-
-  // now push it to the socket
-  ret = socket_write_n(sock_info, msg, total_len);
-  ats_free(msg);
-
-  return ret;
-}
-
-/**********************************************************************
- * send_record_set_reply
- *
- * purpose: sends reply to the record_set request made
- * input:
- * output: TS_ERR_*
- * notes: this function does not need to go through the internal structure
- *        so no cleaning up is done.
- *        format =
- **********************************************************************/
-TSMgmtError
-send_record_set_reply(struct SocketInfo sock_info, TSMgmtError retval, TSActionNeedT action_need)
-{
-  TSMgmtError ret;
-  int total_len;
-  char *msg;
-  int16_t action_t, ret_val;
-
-  total_len = SIZE_ERR_T + SIZE_ACTION_T;
-  msg = (char *)ats_malloc(sizeof(char) * total_len);
-
-  // write the return value
-  ret_val = (int16_t) retval;
-  memcpy(msg, (void *) &ret_val, SIZE_ERR_T);
-
-  // write the action needed
-  action_t = (int16_t) action_need;
-  memcpy(msg + SIZE_ERR_T, (void *) &action_t, SIZE_ACTION_T);
-
-  // now push it to the socket
-  ret = socket_write_n(sock_info, msg, total_len);
-  ats_free(msg);
-
-  return ret;
-}
-
-
-/**********************************************************************
- * send_file_read_reply
- *
- * purpose: sends the reply in response to a file read request
- * input: return value - could be extended to support more complex
- *        error codes but for now use only TS_ERR_FAIL, TS_ERR_OKAY
- *        int fd - socket fd to use.
- * output: TS_ERR_*
- * notes: this function does not need to go through the internal structure
- *        so no cleaning up is done.
- *        reply format = <TSMgmtError> <file_ver> <file_size> <file_text>
- **********************************************************************/
-TSMgmtError
-send_file_read_reply(struct SocketInfo sock_info, TSMgmtError retval, int ver, int size, char *text)
-{
-  TSMgmtError ret;
-  int msg_pos = 0, msg_len;
-  char *msg;
-  int16_t ret_val, f_ver;
-  int32_t f_size;                 // to be safe
-
-  if (!text)
-    return TS_ERR_PARAMS;
-
-  // allocate space for buffer
-  msg_len = SIZE_ERR_T + SIZE_VER + SIZE_LEN + size;
-  msg = (char *)ats_malloc(sizeof(char) * msg_len);
-
-  // write the return value
-  ret_val = (int16_t) retval;
-  memcpy(msg, (void *) &ret_val, SIZE_ERR_T);
-  msg_pos += SIZE_ERR_T;
-
-  // write file version
-  f_ver = (int16_t) ver;
-  memcpy(msg + msg_pos, (void *) &f_ver, SIZE_VER);
-  msg_pos += SIZE_VER;
-
-  // write file size
-  f_size = (int32_t) size;
-  memcpy(msg + msg_pos, (void *) &f_size, SIZE_LEN);
-  msg_pos += SIZE_LEN;
-
-  // write the file text
-  memcpy(msg + msg_pos, text, size);
-
-  // now push it to the socket
-  ret = socket_write_n(sock_info, msg, msg_len);
-  ats_free(msg);
-
-  return ret;
-}
-
-
-/**********************************************************************
- * send_proxy_state_get_reply
- *
- * purpose: sends the reply in response to a request to get state of proxy
- * input:
- *        int fd - socket fd to use.
- * output: TS_ERR_*
- * notes: this function DOES NOT HAVE IT"S OWN TSMgmtError TO SEND!!!!
- *        reply format = <TSProxyStateT>
- **********************************************************************/
-TSMgmtError
-send_proxy_state_get_reply(struct SocketInfo sock_info, TSProxyStateT state)
-{
-  TSMgmtError ret;
-  char msg[SIZE_PROXY_T];
-  int16_t state_t;
-
-  // write the state
-  state_t = (int16_t) state;
-  memcpy(msg, (void *) &state_t, SIZE_PROXY_T);
-
-  // now push it to the socket
-  ret = socket_write_n(sock_info, msg, SIZE_PROXY_T);
-
-  return ret;
-}
-
-
-/**********************************************************************
- * send_event_active_reply
- *
- * purpose: sends the reply in response to a request check if event is active
- * input: sock_info -
- *        retval - TSMgmtError return type for the EventIsActive core call
- *        active - is the requested event active or not?
- * output: TS_ERR_*
- * notes:
- * format: <TSMgmtError> <bool>
- **********************************************************************/
-TSMgmtError
-send_event_active_reply(struct SocketInfo sock_info, TSMgmtError retval, bool active)
-{
-  TSMgmtError ret;
-  int total_len;
-  char *msg;
-  int16_t is_active, ret_val;
-
-  total_len = SIZE_ERR_T + SIZE_BOOL;
-  msg = (char *)ats_malloc(sizeof(char) * total_len);
-
-  // write the return value
-  ret_val = (int16_t) retval;
-  memcpy(msg, (void *) &ret_val, SIZE_ERR_T);
-
-  // write the boolean active state
-  is_active = (int16_t) active;
-  memcpy(msg + SIZE_ERR_T, (void *) &is_active, SIZE_BOOL);
-
-  // now push it to the socket
-  ret = socket_write_n(sock_info, msg, total_len);
-  ats_free(msg);
-
-  return ret;
-}
-
-/**********************************************************************
- * send_event_notification
- *
- * purpose: sends to the client a msg indicating that a certain event
- *          has occurred (this msg will be received by the event_poll_thread)
- * input: fd - file descriptor to use for writing
- *        event - the event that was signalled on TM side
- * output: TS_ERR_xx
- * note: format: <OpType> <event_name_len> <event_name> <desc_len> <desc>
- **********************************************************************/
-TSMgmtError
-send_event_notification(struct SocketInfo sock_info, TSMgmtEvent * event)
-{
-  TSMgmtError ret;
-  int total_len, name_len, desc_len;
-  char *msg;
-  int16_t op_t;
-  int32_t len;
-
-  if (!event || !event->name || !event->description)
-    return TS_ERR_PARAMS;
-
-  name_len = strlen(event->name);
-  desc_len = strlen(event->description);
-  total_len = SIZE_OP_T + (SIZE_LEN * 2) + name_len + desc_len;
-  msg = (char *)ats_malloc(sizeof(char) * total_len);
-
-  // write the operation
-  op_t = (int16_t) EVENT_NOTIFY;
-  memcpy(msg, (void *) &op_t, SIZE_OP_T);
-
-  // write the size of the event name
-  len = (int32_t) name_len;
-  memcpy(msg + SIZE_OP_T, (void *) &len, SIZE_LEN);
-
-  // write the event name
-  memcpy(msg + SIZE_OP_T + SIZE_LEN, event->name, name_len);
-
-  // write size of description
-  len = (int32_t) desc_len;
-  memcpy(msg + SIZE_OP_T + SIZE_LEN + name_len, (void *) &len, SIZE_LEN);
-
-  // write the description
-  memcpy(msg + SIZE_OP_T + SIZE_LEN + name_len + SIZE_LEN, event->description, desc_len);
-
-  // now push it to the socket
-  ret = socket_write_n(sock_info, msg, total_len);
-  ats_free(msg);
-
-  return ret;
 }

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/6250f431/mgmt/api/NetworkUtilsLocal.h
----------------------------------------------------------------------
diff --git a/mgmt/api/NetworkUtilsLocal.h b/mgmt/api/NetworkUtilsLocal.h
index b2d75f4..06d2c17 100644
--- a/mgmt/api/NetworkUtilsLocal.h
+++ b/mgmt/api/NetworkUtilsLocal.h
@@ -21,61 +21,15 @@
   limitations under the License.
  */
 
-/***************************************************************************
- * NetworkUtils.h
- *
- * Defines interface for marshalling requests and unmarshalling responses
- * between the remote API client and Traffic Manager
- *
- *
- ***************************************************************************/
-
-/*****************************************************************************
- * NetworkUtils.h
- *
- * Defines interface for marshalling requests and unmarshalling responses
- * between the remote API client and Traffic Manager
- *****************************************************************************/
-
-#ifndef _NETWORK_UTILS_H_
-#define _NETWORK_UTILS_H_
+#ifndef _NETWORK_UTILS_LOCAL_H_
+#define _NETWORK_UTILS_LOCAL_H_
 
 #include "ink_defs.h"
-#include "WebUtils.h"           // for SocketInfo, socket_read, socket_write
-
 #include "mgmtapi.h"
-#include "NetworkUtilsDefs.h"
-
-/*****************************************************************************
- * general socket functions
- *****************************************************************************/
-TSMgmtError socket_flush(struct SocketInfo sock_info);
-TSMgmtError socket_read_n(struct SocketInfo sock_info, char *buf, int bytes);
-TSMgmtError socket_write_n(struct SocketInfo sock_info, const char *buf, int bytes);
 
 /*****************************************************************************
  * Unmarshalling/marshalling
  *****************************************************************************/
-TSMgmtError preprocess_msg(struct SocketInfo sock_info, OpType * op_t, char **msg);
-
-TSMgmtError parse_request_name_value(char *req, char **name, char **val);
-TSMgmtError parse_record_get_request(char *req, char **rec_name);
-TSMgmtError parse_file_read_request(char *req, TSFileNameT * file);
-TSMgmtError parse_file_write_request(char *req, TSFileNameT * file, int *ver, int *size, char **text);
-TSMgmtError parse_diags_request(char *req, TSDiagsT * mode, char **diag_msg);
-TSMgmtError parse_proxy_state_request(char *req, TSProxyStateT * state, TSCacheClearT * clear);
-
-TSMgmtError send_reply(struct SocketInfo sock_info, TSMgmtError retval);
-TSMgmtError send_reply_list(struct SocketInfo sock_info, TSMgmtError retval, char *list);
-
-TSMgmtError send_record_get_reply(struct SocketInfo sock_info, TSMgmtError retval, void *val, int val_size,
-                               TSRecordT rec_type, const char *rec_name);
-TSMgmtError send_record_set_reply(struct SocketInfo sock_info, TSMgmtError retval, TSActionNeedT action_need);
-TSMgmtError send_file_read_reply(struct SocketInfo sock_info, TSMgmtError retval, int ver, int size, char *text);
-TSMgmtError send_proxy_state_get_reply(struct SocketInfo sock_info, TSProxyStateT state);
-
-TSMgmtError send_event_active_reply(struct SocketInfo sock_info, TSMgmtError retval, bool active);
-
-TSMgmtError send_event_notification(struct SocketInfo sock_info, TSMgmtEvent * event);
+TSMgmtError preprocess_msg(int fd, void ** req, size_t * reqlen);
 
-#endif
+#endif /* _NETWORK_UTILS_LOCAL_H_ */

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/6250f431/mgmt/api/NetworkUtilsRemote.cc
----------------------------------------------------------------------
diff --git a/mgmt/api/NetworkUtilsRemote.cc b/mgmt/api/NetworkUtilsRemote.cc
index f7877d0..8c0f544 100644
--- a/mgmt/api/NetworkUtilsRemote.cc
+++ b/mgmt/api/NetworkUtilsRemote.cc
@@ -21,17 +21,6 @@
   limitations under the License.
  */
 
-/*****************************************************************************
- * Filename: NetworkUtilsRemote.cc
- * Purpose: This file contains functions used by remote api client to
- *          marshal requests to TM and unmarshal replies from TM.
- *          Also stores the information about the client's current
- *          socket connection to Traffic Manager
- * Created: 8/9/00
- *
- *
- ***************************************************************************/
-
 #include "ink_config.h"
 #include "ink_defs.h"
 #include "ink_sock.h"
@@ -41,10 +30,10 @@
 #include "NetworkUtilsRemote.h"
 #include "CoreAPI.h"
 #include "CoreAPIShared.h"
-#include "EventRegistration.h"
 #include "MgmtSocket.h"
+#include "MgmtMarshall.h"
 
-extern CallbackTable *remote_event_callbacks;
+CallbackTable *remote_event_callbacks;
 
 int main_socket_fd = -1;
 int event_socket_fd = -1;
@@ -53,11 +42,7 @@ int event_socket_fd = -1;
 char *main_socket_path = NULL;  // "<path>/mgmtapisocket"
 char *event_socket_path = NULL; // "<path>/eventapisocket"
 
-// From CoreAPIRemote.cc
-extern ink_thread ts_test_thread;
-extern ink_thread ts_event_thread;
-extern TSInitOptionT ts_init_options;
-
+static void * event_callback_thread(void * arg);
 
 /**********************************************************************
  * Socket Helper Functions
@@ -87,45 +72,23 @@ set_socket_paths(const char *path)
  *
  * purpose: performs socket write to check status of other end of connection
  * input: None
- * output: return   0 if other end of connection closed;
- *         return < 0 if socket write failed due to some other error
- *         return > 0 if socket write successful
- * notes: send the test msg: UNDEFINED_OP 0(=msg_len)
+ * output: return false if socket write failed due to some other error
+ *         return true if socket write successful
+ * notes: send the API_PING test msg
  **********************************************************************/
-int
+static bool
 socket_test(int fd)
 {
-  char msg[6];                  /* 6 = SIZE_OP + SIZE_LEN */
-  int16_t op;
-  int32_t msg_len = 0;
-  int ret, amount_read = 0;
-
-  // write the op
-  op = (int16_t) UNDEFINED_OP;
-  memcpy(msg, (void *) &op, SIZE_OP_T);
-
-  // write msg-len = 0
-  memcpy(msg + SIZE_OP_T, &msg_len, SIZE_LEN);
-
-  while (amount_read < 6) {
-    ret = write(fd, msg + amount_read, 6 - amount_read);
-    if (ret < 0) {
-      if (errno == EAGAIN)
-        continue;
-
-      if (errno == EPIPE || errno == ENOTCONN) {        // other socket end is closed
-        return 0;
-      }
+  MgmtMarshallInt optype = API_PING;
+  MgmtMarshallInt now = time(NULL);
 
-      return -1;
-    }
-    amount_read += ret;
+  if (MGMTAPI_SEND_MESSAGE(fd, API_PING, &optype, &now) == TS_ERR_OKAY) {
+    return true; // write was successful; connection still open
   }
 
-  return 1;                     // write was successful; connection still open
+  return false;
 }
 
-
 /***************************************************************************
  * connect
  *
@@ -267,7 +230,7 @@ reconnect()
 
   // relaunch a new event thread since socket_fd changed
   if (0 == (ts_init_options & TS_MGMT_OPT_NO_EVENTS)) {
-    ts_event_thread = ink_thread_create(event_poll_thread_main, &event_socket_fd, 0, DEFAULT_STACK_SIZE);
+    ts_event_thread = ink_thread_create(event_poll_thread_main, &event_socket_fd);
     // reregister the callbacks on the TM side for this new client connection
     if (remote_event_callbacks) {
       err = send_register_all_callbacks(event_socket_fd, remote_event_callbacks);
@@ -335,11 +298,10 @@ reconnect_loop(int num_attempts)
  * which is not open; which will by default terminate the process;
  * client needs to "ignore" the SIGPIPE signal
  **************************************************************************/
-TSMgmtError
-connect_and_send(const char *msg, int msg_len)
+static TSMgmtError
+main_socket_reconnect()
 {
   TSMgmtError err;
-  int total_wrote = 0, ret;
 
   // connects to TM and does all necessary event updates required
   err = reconnect();
@@ -350,110 +312,36 @@ connect_and_send(const char *msg, int msg_len)
   if (mgmt_write_timeout(main_socket_fd, MAX_TIME_WAIT, 0) <= 0) {
     return TS_ERR_NET_TIMEOUT;
   }
-  // connection successfully established; resend msg
-  // socket_fd should be new fd
-  while (total_wrote < msg_len) {
-    ret = write(main_socket_fd, msg + total_wrote, msg_len - total_wrote);
-
-    if (ret == 0) {
-      return TS_ERR_NET_EOF;
-    }
-
-    if (ret < 0) {
-      if (errno == EAGAIN)
-        continue;
-      else if (errno == EPIPE || errno == ENOTCONN) {
-        // clean-up sockets
-        close(main_socket_fd);
-        close(event_socket_fd);
-        main_socket_fd = -1;
-        event_socket_fd = -1;
-
-        return TS_ERR_NET_ESTABLISH;   // can't establish connection
-
-      } else
-        return TS_ERR_NET_WRITE;       // general socket writing error
-
-    }
-
-    total_wrote += ret;
-  }
-
-  return TS_ERR_OKAY;
-}
-
-static TSMgmtError
-socket_read_conn(int fd, uint8_t * buf, size_t needed)
-{
-  size_t consumed = 0;
-  ssize_t ret;
-
-  while (needed > consumed) {
-    ret = read(fd, buf, needed - consumed);
-
-    if (ret < 0) {
-      if (errno == EAGAIN) {
-        continue;
-      } else {
-        return TS_ERR_NET_READ;
-      }
-    }
-
-    if (ret == 0) {
-      return TS_ERR_NET_EOF;
-    }
-
-    buf += ret;
-    consumed += ret;
-  }
 
   return TS_ERR_OKAY;
 }
 
-/**************************************************************************
- * socket_write_conn
- *
- * purpose: guarantees writing of n bytes; if connection error, tries
- *          reconnecting to TM again (in case TM was restarted)
- * input:   fd to write to, buffer to write from & number of bytes to write
- * output:  TS_ERR_xx
- * note:   EPIPE - this happens if client makes a call after stopping then
- *         starting TM again.
- *         ENOTCONN - this happens if the client tries to make a call after
- *         stopping TM, but before starting it; then restarts TM and makes a
- *          new call
- * In the send_xx_request function, use a special socket writing function
- * which calls connect_and_send() instead of just the basic connect():
- * 1) if the write returns EPIPE error, then call connect_and_send()
- * 2) return the value returned from EPIPE
- *************************************************************************/
 static TSMgmtError
-socket_write_conn(int fd, const char *msg_buf, int bytes)
+socket_write_conn(int fd, const void * msg_buf, size_t bytes)
 {
-  int ret, byte_wrote = 0;
+  size_t byte_wrote = 0;
 
   // makes sure the descriptor is writable
   if (mgmt_write_timeout(fd, MAX_TIME_WAIT, 0) <= 0) {
     return TS_ERR_NET_TIMEOUT;
   }
+
   // write until we fulfill the number
   while (byte_wrote < bytes) {
-    ret = write(fd, msg_buf + byte_wrote, bytes - byte_wrote);
+    ssize_t ret = write(fd, (const char *)msg_buf + byte_wrote, bytes - byte_wrote);
 
     if (ret == 0) {
       return TS_ERR_NET_EOF;
     }
 
     if (ret < 0) {
-      if (errno == EAGAIN)
+      if (mgmt_transient_error()) {
         continue;
-
-      else if (errno == EPIPE || errno == ENOTCONN) {   // other socket end is closed
-        // clean-up of sockets is done in reconnect()
-        return connect_and_send(msg_buf, bytes);
-      } else
+      } else {
         return TS_ERR_NET_WRITE;
+      }
     }
+
     // we are all good here
     byte_wrote += ret;
   }
@@ -461,6 +349,33 @@ socket_write_conn(int fd, const char *msg_buf, int bytes)
   return TS_ERR_OKAY;
 }
 
+TSMgmtError
+mgmtapi_sender::send(void * msg, size_t msglen) const
+{
+  const unsigned tries = 5;
+  TSMgmtError err;
+
+  for (unsigned i = 0; i < tries; ++i) {
+    err = socket_write_conn(this->fd, msg, msglen);
+    if (err == TS_ERR_OKAY) {
+      return err;
+    }
+
+    // clean-up sockets
+    close(main_socket_fd);
+    close(event_socket_fd);
+    main_socket_fd = -1;
+    event_socket_fd = -1;
+
+    err = main_socket_reconnect();
+    if (err != TS_ERR_OKAY) {
+      return err;
+    }
+  }
+
+  return TS_ERR_NET_ESTABLISH;   // can't establish connection
+}
+
 /**********************************************************************
  * socket_test_thread
  *
@@ -488,7 +403,7 @@ socket_test_thread(void *)
 {
   // loop until client process dies
   while (1) {
-    if (main_socket_fd == -1 || socket_test(main_socket_fd) <= 0) {
+    if (main_socket_fd == -1 || !socket_test(main_socket_fd)) {
       // ASSUMES that in between the time the socket_test is made
       // and this reconnect call is made, the main_socket_fd remains
       // the same (eg. no one else called reconnect to TM successfully!!
@@ -512,387 +427,6 @@ socket_test_thread(void *)
 /**********************************************************************
  * MARSHALL REQUESTS
  **********************************************************************/
-/**********************************************************************
- * send_request
- *
- * purpose: sends file read request to Traffic Manager
- * input:   fd - file descriptor to use to send to
- *          op - the type of OpType request sending
- * output:  TS_ERR_xx
- * notes:  used by operations which don't need to send any additional
- *         parameters
- * format: <OpType> <msg_len=0>
- **********************************************************************/
-TSMgmtError
-send_request(int fd, OpType op)
-{
-  int16_t op_t;
-  int32_t msg_len;
-  char msg_buf[SIZE_OP_T + SIZE_LEN];
-  TSMgmtError err;
-
-  // fill in op type
-  op_t = (int16_t) op;
-  memcpy(msg_buf, &op_t, SIZE_OP_T);
-
-  // fill in msg_len == 0
-  msg_len = 0;
-  memcpy(msg_buf + SIZE_OP_T, &msg_len, SIZE_LEN);
-
-  // send message
-  err = socket_write_conn(fd, msg_buf, SIZE_OP_T + SIZE_LEN);
-  return err;
-}
-
-/**********************************************************************
- * send_request_name (helper fn)
- *
- * purpose: sends generic  request with one string argument name
- * input: fd - file descriptor to use
- *        op - .
- * output: TS_ERR_xx
- * note: format: <OpType> <str_len> <string>
- **********************************************************************/
-TSMgmtError
-send_request_name(int fd, OpType op, const char *name)
-{
-  char *msg_buf;
-  int16_t op_t;
-  int32_t msg_len;
-  int total_len;
-  TSMgmtError err;
-
-  if (name == NULL) {           //reg callback for all events when op==EVENT_REG_CALLBACK
-    msg_len = 0;
-  } else {
-    msg_len = (int32_t) strlen(name);
-  }
-
-  total_len = SIZE_OP_T + SIZE_LEN + msg_len;
-  msg_buf = (char *)ats_malloc(sizeof(char) * total_len);
-
-  // fill in op type
-  op_t = (int16_t) op;
-  memcpy(msg_buf, (void *) &op_t, SIZE_OP_T);
-
-  // fill in msg_len
-  memcpy(msg_buf + SIZE_OP_T, (void *) &msg_len, SIZE_LEN);
-
-  // fill in name (if NOT NULL)
-  if (name)
-    memcpy(msg_buf + SIZE_OP_T + SIZE_LEN, name, msg_len);
-
-
-  // send message
-  err = socket_write_conn(fd, msg_buf, total_len);
-  ats_free(msg_buf);
-  return err;
-}
-
-/**********************************************************************
- * send_request_name_value (helper fn)
- *
- * purpose: sends generic request with 2 str arguments; a name-value pair
- * input: fd - file descriptor to use
- *        op - Op type
- * output: TS_ERR_xx
- * note: format: <OpType> <name-len> <val-len> <name> <val>
- **********************************************************************/
-TSMgmtError
-send_request_name_value(int fd, OpType op, const char *name, const char *value)
-{
-  char *msg_buf;
-  int msg_pos = 0, total_len;
-  int32_t msg_len, name_len, val_size;    // these are written to msg
-  int16_t op_t;
-  TSMgmtError err;
-
-  if (!name || !value)
-    return TS_ERR_PARAMS;
-
-  // set the sizes
-  name_len = strlen(name);
-  val_size = strlen(value);
-  msg_len = (SIZE_LEN * 2) + name_len + val_size;
-  total_len = SIZE_OP_T + SIZE_LEN + msg_len;
-  msg_buf = (char *)ats_malloc(sizeof(char) * (total_len));
-
-  // fill in op type
-  op_t = (int16_t) op;
-  memcpy(msg_buf + msg_pos, (void *) &op_t, SIZE_OP_T);
-  msg_pos += SIZE_OP_T;
-
-  // fill in msg length
-  memcpy(msg_buf + msg_pos, (void *) &msg_len, SIZE_LEN);
-  msg_pos += SIZE_LEN;
-
-  // fill in record name length
-  memcpy(msg_buf + msg_pos, (void *) &name_len, SIZE_LEN);
-  msg_pos += SIZE_LEN;
-
-  // fill in record value length
-  memcpy(msg_buf + msg_pos, (void *) &val_size, SIZE_LEN);
-  msg_pos += SIZE_LEN;
-
-  // fill in record name
-  memcpy(msg_buf + msg_pos, name, name_len);
-  msg_pos += name_len;
-
-  // fill in record value
-  memcpy(msg_buf + msg_pos, value, val_size);
-
-  // send message
-  err = socket_write_conn(fd, msg_buf, total_len);
-  ats_free(msg_buf);
-  return err;
-}
-
-/**********************************************************************
- * send_request_bool (helper)
- *
- * purpose: sends a simple op with a boolean flag argument
- * input: fd      - file descriptor to use
- *        flag    - boolean flag
- * output: TS_ERR_xx
- **********************************************************************/
-TSMgmtError
-send_request_bool(int fd, OpType op, bool flag)
-{
-  char msg_buf[SIZE_OP_T + SIZE_LEN + SIZE_BOOL];
-  int16_t flag_t;
-  int32_t msg_len;
-
-  // Fill in the operator
-  memcpy(msg_buf, (void *) &op, SIZE_OP_T);
-
-  // fill in msg_len = SIZE_BOOL
-  msg_len = (int32_t) SIZE_BOOL;
-  memcpy(msg_buf + SIZE_OP_T, (void *)&msg_len, SIZE_LEN);
-
-  // Fill in the argument (the boolean flag)
-  flag_t = flag ? 1 : 0;
-  memcpy(msg_buf + SIZE_OP_T + SIZE_LEN, (void *) &flag_t, SIZE_BOOL);
-
-  // send message
-  return socket_write_conn(fd, msg_buf, SIZE_OP_T + SIZE_LEN + SIZE_BOOL);
-}
-
-/**********************************************************************
- * send_file_read_request
- *
- * purpose: sends file read request to Traffic Manager
- * input:   fd - file descriptor to use to send to
- *          file - file to read
- * output:  TS_ERR_xx
- * notes:   first must create the message and then send it across network
- *          msg format = <OpType> <msg_len> <TSFileNameT>
- **********************************************************************/
-TSMgmtError
-send_file_read_request(int fd, TSFileNameT file)
-{
-  char msg_buf[SIZE_OP_T + SIZE_LEN + SIZE_FILE_T];
-  int msg_pos = 0;
-  int32_t msg_len = (int32_t) SIZE_FILE_T;  //marshalled values
-  int16_t op, file_t;
-
-  // fill in op type
-  op = (int16_t) FILE_READ;
-  memcpy(msg_buf + msg_pos, &op, SIZE_OP_T);
-  msg_pos += SIZE_OP_T;
-
-  // fill in msg length
-  memcpy(msg_buf + msg_pos, &msg_len, SIZE_LEN);
-  msg_pos += SIZE_LEN;
-
-  // fill in file type
-  file_t = (int16_t) file;
-  memcpy(msg_buf + msg_pos, &file_t, SIZE_FILE_T);
-
-  // send message
-  return socket_write_conn(fd, msg_buf, SIZE_OP_T + SIZE_LEN + SIZE_FILE_T);
-}
-
-/**********************************************************************
- * send_file_write_request
- *
- * purpose: sends file write request to Traffic Manager
- * input: fd - file descriptor to use
- *        file - file to read
- *        text - new text to write to specified file
- *        size - length of the text
- *        ver  - version of the file to be written
- * output: TS_ERR_xx
- * notes: format - FILE_WRITE <msg_len> <file_type> <file_ver> <file_size> <text>
- **********************************************************************/
-TSMgmtError
-send_file_write_request(int fd, TSFileNameT file, int ver, int size, char *text)
-{
-  char *msg_buf;
-  int msg_pos = 0, total_len;
-  int32_t msg_len, f_size;        //marshalled values
-  int16_t op, file_t, f_ver;
-  TSMgmtError err;
-
-  if (!text)
-    return TS_ERR_PARAMS;
-
-  msg_len = SIZE_FILE_T + SIZE_VER + SIZE_LEN + size;
-  total_len = SIZE_OP_T + SIZE_LEN + msg_len;
-  msg_buf = (char *)ats_malloc(sizeof(char) * total_len);
-
-  // fill in op type
-  op = (int16_t) FILE_WRITE;
-  memcpy(msg_buf + msg_pos, &op, SIZE_OP_T);
-  msg_pos += SIZE_OP_T;
-
-  // fill in msg length
-  memcpy(msg_buf + msg_pos, &msg_len, SIZE_LEN);
-  msg_pos += SIZE_LEN;
-
-  // fill in file type
-  file_t = (int16_t) file;
-  memcpy(msg_buf + msg_pos, &file_t, SIZE_FILE_T);
-  msg_pos += SIZE_FILE_T;
-
-  // fill in file version
-  f_ver = (int16_t) ver;
-  memcpy(msg_buf + msg_pos, &f_ver, SIZE_VER);
-  msg_pos += SIZE_VER;
-
-  // fill in file size
-  f_size = (int32_t) size;        //typecase to be safe
-  memcpy(msg_buf + msg_pos, &f_size, SIZE_LEN);
-  msg_pos += SIZE_LEN;
-
-  // fill in text of file
-  memcpy(msg_buf + msg_pos, text, size);
-
-  // send message
-  err = socket_write_conn(fd, msg_buf, total_len);
-  ats_free(msg_buf);
-  return err;
-}
-
-static TSMgmtError
-send_record_get_x_request(OpType optype, int fd, const char *rec_name)
-{
-  char *msg_buf;
-  int msg_pos = 0, total_len;
-  int16_t op = (int16_t)optype;
-  int32_t msg_len;
-  TSMgmtError err;
-
-  ink_assert(op == RECORD_GET || op == RECORD_MATCH_GET);
-
-  if (!rec_name) {
-    return TS_ERR_PARAMS;
-  }
-
-  total_len = SIZE_OP_T + SIZE_LEN + strlen(rec_name);
-  msg_buf = (char *)ats_malloc(sizeof(char) * total_len);
-
-  // fill in op type
-  memcpy(msg_buf + msg_pos, (void *) &op, SIZE_OP_T);
-  msg_pos += SIZE_OP_T;
-
-  // fill in msg length
-  msg_len = (int32_t) strlen(rec_name);
-  memcpy(msg_buf + msg_pos, (void *) &msg_len, SIZE_LEN);
-  msg_pos += SIZE_LEN;
-
-  // fill in record name
-  memcpy(msg_buf + msg_pos, rec_name, strlen(rec_name));
-
-  // send message
-  err = socket_write_conn(fd, msg_buf, total_len);
-  ats_free(msg_buf);
-  return err;
-}
-
-/**********************************************************************
- * send_record_get_request
- *
- * purpose: sends request to get record value from Traffic Manager
- * input: fd       - file descriptor to use
- *        rec_name - name of record to retrieve value for
- * output: TS_ERR_xx
- * format: RECORD_GET <msg_len> <rec_name>
- **********************************************************************/
-TSMgmtError
-send_record_get_request(int fd, const char *rec_name)
-{
-  return send_record_get_x_request(RECORD_GET, fd, rec_name);
-}
-
-/**********************************************************************
- * send_record_match_request
- *
- * purpose: sends request to get a list of matching record values from Traffic Manager
- * input: fd       - file descriptor to use
- *        rec_name - regex to match against record names
- * output: TS_ERR_xx
- * format: sequence of RECORD_GET <msg_len> <rec_name>
- **********************************************************************/
-TSMgmtError
-send_record_match_request(int fd, const char *rec_regex)
-{
-  return send_record_get_x_request(RECORD_MATCH_GET, fd, rec_regex);
-}
-
-/*------ control functions -------------------------------------------*/
-/**********************************************************************
- * send_proxy_state_get_request
- *
- * purpose: sends request to get the proxy state (on/off)
- * input: fd       - file descriptor to use
- * output: TS_ERR_xx
- * note: format: PROXY_STATE_GET 0(=msg_len)
- **********************************************************************/
-TSMgmtError
-send_proxy_state_get_request(int fd)
-{
-  TSMgmtError err;
-
-  err = send_request(fd, PROXY_STATE_GET);
-  return err;
-}
-
-/**********************************************************************
- * send_proxy_state_set_request
- *
- * purpose: sends request to set the proxy state (on/off)
- * input: fd    - file descriptor to use
- *        state - TS_PROXY_ON, TS_PROXY_OFF
- * output: TS_ERR_xx
- * note: format: PROXY_STATE_SET  <msg_len> <TSProxyStateT> <TSCacheClearT>
- **********************************************************************/
-TSMgmtError
-send_proxy_state_set_request(int fd, TSProxyStateT state, TSCacheClearT clear)
-{
-  char msg_buf[SIZE_OP_T + SIZE_LEN + SIZE_PROXY_T + SIZE_TS_ARG_T];
-  int16_t op, state_t, cache_t;
-  int32_t msg_len;
-
-  // fill in op type
-  op = (int16_t) PROXY_STATE_SET;
-  memcpy(msg_buf, (void *) &op, SIZE_OP_T);
-
-  // fill in msg_len
-  msg_len = (int32_t) (SIZE_PROXY_T + SIZE_TS_ARG_T);
-  memcpy(msg_buf + SIZE_OP_T, (void *) &msg_len, SIZE_LEN);
-
-  // fill in proxy state
-  state_t = (int16_t) state;
-  memcpy(msg_buf + SIZE_OP_T + SIZE_LEN, (void *) &state_t, SIZE_PROXY_T);
-
-  // fill in cache clearing option
-  cache_t = (int16_t) clear;
-  memcpy(msg_buf + SIZE_OP_T + SIZE_LEN + SIZE_PROXY_T, (void *) &cache_t, SIZE_TS_ARG_T);
-
-  // send message
-  return socket_write_conn(fd, msg_buf, SIZE_OP_T + SIZE_LEN + SIZE_PROXY_T + SIZE_TS_ARG_T);
-}
-
 
 /*------ events -------------------------------------------------------*/
 
@@ -904,11 +438,8 @@ send_proxy_state_set_request(int fd, TSProxyStateT state, TSCacheClearT clear)
  *          registered for each event
  * input: None
  * output: return TS_ERR_OKAY only if ALL events sent okay
- * notes: could create a function which just sends a list of all the events to
- * reregister; but actually just reuse the function
- * send_request_name(EVENT_REG_CALLBACK) and call it for each event
  * 1) get list of all events with callbacks
- * 2) for each event, call send_request_name
+ * 2) for each event, send a EVENT_REG_CALLBACK message
  **********************************************************************/
 TSMgmtError
 send_register_all_callbacks(int fd, CallbackTable * cb_table)
@@ -920,19 +451,22 @@ send_register_all_callbacks(int fd, CallbackTable * cb_table)
   events_with_cb = get_events_with_callbacks(cb_table);
   // need to check that the list has all the events registered
   if (!events_with_cb) {        // all events have registered callback
-    err = send_request_name(fd, EVENT_REG_CALLBACK, NULL);
+    MgmtMarshallInt optype = EVENT_REG_CALLBACK;
+    MgmtMarshallString event_name = NULL;
+
+    err = MGMTAPI_SEND_MESSAGE(fd, EVENT_REG_CALLBACK, &optype, &event_name);
     if (err != TS_ERR_OKAY)
       return err;
   } else {
-    char *event_name;
-    int event_id;
     int num_events = queue_len(events_with_cb);
     // iterate through the LLQ and send request for each event
     for (int i = 0; i < num_events; i++) {
-      event_id = *(int *) dequeue(events_with_cb);
-      event_name = (char *) get_event_name(event_id);
+      MgmtMarshallInt optype = EVENT_REG_CALLBACK;
+      MgmtMarshallInt event_id = *(int *) dequeue(events_with_cb);
+      MgmtMarshallString event_name = (char *) get_event_name(event_id);
+
       if (event_name) {
-        err = send_request_name(fd, EVENT_REG_CALLBACK, event_name);
+        err = MGMTAPI_SEND_MESSAGE(fd, EVENT_REG_CALLBACK, &optype, &event_name);
         ats_free(event_name);      // free memory
         if (err != TS_ERR_OKAY) {
           send_err = err;       // save the type of send error
@@ -960,14 +494,10 @@ send_register_all_callbacks(int fd, CallbackTable * cb_table)
  *          callbacks registered for that event
  * input: None
  * output: TS_ERR_OKAY only if all send requests are okay
- * notes: could create a function which just sends a list of all the events to
- * unregister; but actually just reuse the function
- * send_request_name(EVENT_UNREG_CALLBACK) and call it for each event
  **********************************************************************/
 TSMgmtError
 send_unregister_all_callbacks(int fd, CallbackTable * cb_table)
 {
-  char *event_name;
   int event_id;
   LLQ *events_with_cb;          // list of events with at least one callback
   int reg_callback[NUM_EVENTS];
@@ -995,8 +525,10 @@ send_unregister_all_callbacks(int fd, CallbackTable * cb_table)
   // send message to TM to mark unregister
   for (int k = 0; k < NUM_EVENTS; k++) {
     if (reg_callback[k] == 0) { // event has no registered callbacks
-      event_name = get_event_name(k);
-      err = send_request_name(fd, EVENT_UNREG_CALLBACK, event_name);
+      MgmtMarshallInt optype = EVENT_UNREG_CALLBACK;
+      MgmtMarshallString event_name = get_event_name(k);
+
+      err = MGMTAPI_SEND_MESSAGE(fd, EVENT_UNREG_CALLBACK, &optype, &event_name);
       ats_free(event_name);
       if (err != TS_ERR_OKAY) {
         send_err = err;         //save the type of the sending error
@@ -1015,545 +547,165 @@ send_unregister_all_callbacks(int fd, CallbackTable * cb_table)
 }
 
 /**********************************************************************
- * send_diags_msg
- *
- * purpose: sends the diag msg across along with they diag msg type
- * input: mode - type of diags msg
- *        msg  - the diags msg
- * output: TS_ERR_xx
- * note: format: <OpType> <msg_len> <TSDiagsT> <diag_msg_len> <diag_msg>
- **********************************************************************/
-TSMgmtError
-send_diags_msg(int fd, TSDiagsT mode, const char *diag_msg)
-{
-  char *msg_buf;
-  int16_t op_t, diag_t;
-  int32_t msg_len, diag_msg_len;
-  int total_len;
-  TSMgmtError err;
-
-  if (!diag_msg)
-    return TS_ERR_PARAMS;
-
-  diag_msg_len = (int32_t) strlen(diag_msg);
-  msg_len = SIZE_DIAGS_T + SIZE_LEN + diag_msg_len;
-  total_len = SIZE_OP_T + SIZE_LEN + msg_len;
-  msg_buf = (char *)ats_malloc(sizeof(char) * total_len);
-
-  // fill in op type
-  op_t = (int16_t) DIAGS;
-  memcpy(msg_buf, (void *) &op_t, SIZE_OP_T);
-
-  // fill in entire msg len
-  memcpy(msg_buf + SIZE_OP_T, (void *) &msg_len, SIZE_LEN);
-
-  // fill in TSDiagsT
-  diag_t = (int16_t) mode;
-  memcpy(msg_buf + SIZE_OP_T + SIZE_LEN, (void *) &diag_t, SIZE_DIAGS_T);
-
-  // fill in diags msg_len
-  memcpy(msg_buf + SIZE_OP_T + SIZE_LEN + SIZE_DIAGS_T, (void *) &diag_msg_len, SIZE_LEN);
-
-  // fill in diags msg
-  memcpy(msg_buf + SIZE_OP_T + SIZE_LEN + SIZE_DIAGS_T + SIZE_LEN, diag_msg, diag_msg_len);
-
-  // send message
-  err = socket_write_conn(fd, msg_buf, total_len);
-  ats_free(msg_buf);
-  return err;
-}
-
-
-/**********************************************************************
  * UNMARSHAL REPLIES
  **********************************************************************/
 
-/* Error handling implementation:
- * All the parsing functions which parse the reply returned from local side
- * also must read the TSERror return value sent from local side; this return
- * value is the same value that will be returned by the parsing function.
- * ALL PARSING FUNCTIONS MUST FIRST CHECK that the retval is TS_ERR_OKAY;
- * if it is not, then DON"T PARSE THE REST OF THE REPLY!!
- */
-
-/* Reading replies:
- * The reading is done in while loop in the parse_xx_reply functions;
- * need to add a timeout so that the function is not left looping and
- * waiting if a msg isn't sent to the socket from local side (eg. TM died)
- */
-
-/**********************************************************************
- * parse_reply
- *
- * purpose: parses a reply from traffic manager. return that error
- * input: fd
- * output: errors on error or fill up class with response &
- *         return TS_ERR_xx
- * notes: only returns an TSMgmtError
- **********************************************************************/
-TSMgmtError
-parse_reply(int fd)
-{
-  TSMgmtError ret;
-  int16_t ret_val;
-
-  // check to see if anything to read; wait for specified time = 1 sec
-  if (mgmt_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) { // time expires before ready to read
-    return TS_ERR_NET_TIMEOUT;
-  }
-
-  ret = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T);
-  if (ret != TS_ERR_OKAY) {
-    return ret;
-  }
-
-  return (TSMgmtError) ret_val;
-}
-
-/**********************************************************************
- * parse_reply_list
- *
- * purpose: parses a TM reply to a request to get a list of string tokens
- * input: fd - socket to read
- *        list - will contain delimited string list of tokens
- * output: TS_ERR_xx
- * notes:
- * format: <TSMgmtError> <string_list_len> <delimited_string_list>
- **********************************************************************/
-TSMgmtError
-parse_reply_list(int fd, char **list)
-{
-  int16_t ret_val;
-  int32_t list_size;
-  TSMgmtError err_t;
-
-  if (!list) {
-    return TS_ERR_PARAMS;
-  }
-
-  // check to see if anything to read; wait for specified time
-  if (mgmt_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) {
-    return TS_ERR_NET_TIMEOUT;
-  }
-
-  // get the return value (TSMgmtError type)
-  err_t = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T);
-  if (err_t != TS_ERR_OKAY) {
-    return err_t;
-  }
-
-  // if !TS_ERR_OKAY, stop reading rest of msg
-  err_t = (TSMgmtError) ret_val;
-  if (err_t != TS_ERR_OKAY) {
-    return err_t;
-  }
-
-  // now get size of string event list
-  err_t = socket_read_conn(fd, (uint8_t *)&list_size, SIZE_LEN);
-  if (err_t != TS_ERR_OKAY) {
-    return err_t;
-  }
-
-  // get the delimited event list string
-  *list = (char *)ats_malloc(sizeof(char) * (list_size + 1));
-  err_t = socket_read_conn(fd, (uint8_t *)(*list), list_size);
-  if (err_t != TS_ERR_OKAY) {
-    ats_free(*list);
-    *list = NULL;
-    return err_t;
-  }
-
-  // add end of string to end of the record value
-  ((char *) (*list))[list_size] = '\0';
-  return err_t;
-}
-
-
-/**********************************************************************
- * parse_file_read_reply
- *
- * purpose: parses a file read reply from traffic manager.
- * input: fd
- *        ver -
- *        size - size of text
- *        text -
- * output: errors on error or fill up class with response &
- *         return TS_ERR_xx
- * notes: reply format = <TSMgmtError> <file_version> <file_size> <text>
- **********************************************************************/
 TSMgmtError
-parse_file_read_reply(int fd, int *ver, int *size, char **text)
+parse_generic_response(OpType optype, int fd)
 {
-  int32_t f_size;
-  int16_t ret_val, f_ver;
-  TSMgmtError err_t;
-
-  if (!ver || !size || !text)
-    return TS_ERR_PARAMS;
-
-  // check to see if anything to read; wait for specified time
-  if (mgmt_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) { // time expires before ready to read
-    return TS_ERR_NET_TIMEOUT;
-  }
-
-  // get the error return value
-  err_t = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T);
-  if (err_t != TS_ERR_OKAY) {
-    return err_t;
-  }
-
-  // if !TS_ERR_OKAY, stop reading rest of msg
-  err_t = (TSMgmtError) ret_val;
-  if (err_t != TS_ERR_OKAY) {
-    return err_t;
-  }
-
-  // now get file version
-  err_t = socket_read_conn(fd, (uint8_t *)&f_ver, SIZE_VER);
-  if (err_t != TS_ERR_OKAY) {
-    return err_t;
-  }
-
-  *ver = (int) f_ver;
+  TSMgmtError err;
+  MgmtMarshallInt ival;
+  MgmtMarshallData data = { NULL, 0 };
 
-  // now get file size
-  err_t = socket_read_conn(fd, (uint8_t *)&f_size, SIZE_LEN);
-  if (err_t != TS_ERR_OKAY) {
-    return err_t;
+  err = recv_mgmt_message(fd, data);
+  if (err != TS_ERR_OKAY) {
+    return err;
   }
 
-  *size = (int) f_size;
-
-  // check size before reading text
-  if ((*size) <= 0) {
-    *text = ats_strndup("", 1);                 // set to empty string
-    return TS_ERR_OKAY;
-  }
+  err = recv_mgmt_response(data.ptr, data.len, optype, &ival);
+  ats_free(data.ptr);
 
-  // now we got the size, we can read everything into our msg * then parse it
-  *text = (char *)ats_malloc(sizeof(char) * (f_size + 1));
-  err_t = socket_read_conn(fd, (uint8_t *)(*text), f_size);
-  if (err_t != TS_ERR_OKAY) {
-    ats_free(*text);
-    *text = NULL;
-    return err_t;
+  if (err != TS_ERR_OKAY) {
+    return err;
   }
 
-  (*text)[f_size] = '\0';     // end the string
-  return TS_ERR_OKAY;
+  return (TSMgmtError)ival;
 }
 
 /**********************************************************************
- * parse_record_get_reply
+ * event_poll_thread_main
  *
- * purpose: parses a record_get reply from traffic manager.
- * input: fd
- *        retval   -
- *        rec_type - the type of the record
- *        rec_value - the value of the record in string format
- * output: errors on error or fill up class with response &
- *         return SUCC
- * notes: reply format = <TSMgmtError> <val_size> <name_size> <rec_type> <record_value> <record_name>
- * Zero-length values and names are supported. If the size field is 0, the corresponding
- * value field is not transmitted.
- * It's the responsibility of the calling function to conver the rec_value
- * based on the rec_type!!
+ * purpose: thread listens on the client's event socket connection;
+ *          only reads from the event_socket connection and
+ *          processes EVENT_NOTIFY messages; each time client
+ *          makes new event-socket connection to TM, must launch
+ *          a new event_poll_thread_main thread
+ * input:   arg - contains the socket_fd to listen on
+ * output:  NULL - if error
+ * notes:   each time the client's socket connection to TM is reset
+ *          a new thread will be launched as old one dies; there are
+ *          only two places where a new thread is created:
+ *          1) when client first connects (TSInit call)
+ *          2) client reconnects() due to a TM restart
+ * Uses blocking socket; so blocks until receives an event notification.
+ * Shouldn't need to use select since only waiting for a notification
+ * message from event_callback_main thread!
  **********************************************************************/
-TSMgmtError
-parse_record_get_reply(int fd, TSRecordT * rec_type, void **rec_val, char **rec_name)
+void *
+event_poll_thread_main(void *arg)
 {
-  int16_t ret_val, rec_t;
-  int32_t val_size, name_size;
-  TSMgmtError err_t;
-
-  if (!rec_type || !rec_val) {
-    return TS_ERR_PARAMS;
-  }
-
-  *rec_name = NULL;
-  *rec_val = NULL;
-
-  // check to see if anything to read; wait for specified time
-  if (mgmt_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) { //time expired before ready to read
-    return TS_ERR_NET_TIMEOUT;
-  }
-
-  // get the return value (TSMgmtError type)
-  err_t = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T);
-  if (err_t != TS_ERR_OKAY) {
-    goto fail;
-  }
-
-  // if !TS_ERR_OKAY, stop reading rest of msg
-  err_t = (TSMgmtError) ret_val;
-  if (err_t != TS_ERR_OKAY) {
-    goto fail;
-  }
-
-  // now get size of record_value
-  err_t = socket_read_conn(fd, (uint8_t *)&val_size, SIZE_LEN);
-  if (err_t != TS_ERR_OKAY) {
-    goto fail;
-  }
-
-  // now get size of record name
-  err_t = socket_read_conn(fd, (uint8_t *)&name_size, SIZE_LEN);
-  if (err_t != TS_ERR_OKAY) {
-    goto fail;
-  }
+  int sock_fd;
 
-  // get the record type
-  err_t = socket_read_conn(fd, (uint8_t *)&rec_t, SIZE_REC_T);
-  if (err_t != TS_ERR_OKAY) {
-    goto fail;
-  }
+  sock_fd = *((int *) arg);     // should be same as event_socket_fd
 
-  *rec_type = (TSRecordT) rec_t;
+  // the sock_fd is going to be the one we listen for events on
+  while (1) {
+    TSMgmtError ret;
+    TSMgmtEvent * event = NULL;
 
-  // get record value (if there is one)
-  if (val_size) {
-    if (*rec_type == TS_REC_STRING) {
-      *rec_val = ats_malloc(sizeof(char) * (val_size + 1));
-    } else {
-      *rec_val = ats_malloc(sizeof(char) * (val_size));
-    }
+    MgmtMarshallData reply = { NULL, 0 };
+    MgmtMarshallInt optype;
+    MgmtMarshallString name = NULL;
+    MgmtMarshallString desc = NULL;
 
-    err_t = socket_read_conn(fd, (uint8_t *)(*rec_val), val_size);
-    if (err_t != TS_ERR_OKAY) {
-      goto fail;
+    // possible sock_fd is invalid if TM restarts and client reconnects
+    if (sock_fd < 0) {
+      break;
     }
 
-    // add end of string to end of the record value
-    if (*rec_type == TS_REC_STRING) {
-      ((char *) (*rec_val))[val_size] = '\0';
+    // Just wait until we get an event or error. The 0 return from select(2)
+    // means we timed out ...
+    if (mgmt_read_timeout(main_socket_fd, MAX_TIME_WAIT, 0) == 0) {
+      continue;
     }
-  }
 
-  // get the record name (if there is one)
-  if (name_size) {
-    *rec_name = (char *)ats_malloc(sizeof(char) * (name_size + 1));
-    err_t = socket_read_conn(fd, (uint8_t *)(*rec_name), name_size);
-    if (err_t != TS_ERR_OKAY) {
-      goto fail;
+    ret = recv_mgmt_message(main_socket_fd, reply);
+    if (ret != TS_ERR_OKAY) {
+      break;
     }
 
-    (*rec_name)[name_size] = '\0';
-  }
-
-  return TS_ERR_OKAY;
-
-fail:
-  ats_free(*rec_val);
-  ats_free(*rec_name);
-  *rec_val = NULL;
-  *rec_name = NULL;
-  return err_t;
-}
-
-/**********************************************************************
- * parse_record_set_reply
- *
- * purpose: parses a record_set reply from traffic manager.
- * input: fd
- *        action_need - will contain the type of action needed from the set
- * output: TS_ERR_xx
- * notes: reply format = <TSMgmtError> <val_size> <rec_type> <record_value>
- * It's the responsibility of the calling function to conver the rec_value
- * based on the rec_type!!
- **********************************************************************/
-TSMgmtError
-parse_record_set_reply(int fd, TSActionNeedT * action_need)
-{
-  int16_t ret_val, action_t;
-  TSMgmtError err_t;
-
-  if (!action_need)
-    return TS_ERR_PARAMS;
-
-  // check to see if anything to read; wait for specified time
-  if (mgmt_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) {
-    return TS_ERR_NET_TIMEOUT;
-  }
-
-  // get the return value (TSMgmtError type)
-  err_t = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T);
-  if (err_t != TS_ERR_OKAY) {
-    return err_t;
-  }
-
-  // if !TS_ERR_OKAY, stop reading rest of msg
-  err_t = (TSMgmtError) ret_val;
-  if (err_t != TS_ERR_OKAY) {
-    return err_t;
-  }
-
-  // now get the action needed
-  err_t = socket_read_conn(fd, (uint8_t *)&action_t, SIZE_ACTION_T);
-  if (err_t != TS_ERR_OKAY) {
-    return err_t;
-  }
-
-  *action_need = (TSActionNeedT) action_t;
-  return err_t;
-}
+    ret = recv_mgmt_request(reply.ptr, reply.len, EVENT_NOTIFY, &optype, &name, &desc);
+    ats_free(reply.ptr);
 
+    if (ret != TS_ERR_OKAY) {
+      ats_free(name);
+      ats_free(desc);
+      break;
+    }
 
-/**********************************************************************
- * parse_proxy_state_get_reply
- *
- * purpose: parses a TM reply to a PROXY_STATE_GET request
- * input: fd
- *        state - will contain the state of the proxy
- * output: TS_ERR_xx
- * notes: function is DIFFERENT becuase it has NO TSMgmtError at head of msg
- * format: <TSProxyStateT>
- **********************************************************************/
-TSMgmtError
-parse_proxy_state_get_reply(int fd, TSProxyStateT * state)
-{
-  TSMgmtError err_t;
-  int16_t state_t;
+    ink_assert(optype == EVENT_NOTIFY);
 
-  if (!state)
-    return TS_ERR_PARAMS;
+    // The new event takes ownership of the message strings.
+    event = TSEventCreate();
+    event->name = name;
+    event->id = get_event_id(name);
+    event->description = desc;
 
-  // check to see if anything to read; wait for specified time
-  if (mgmt_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) { // time expires before ready to read
-    return TS_ERR_NET_TIMEOUT;
+    // got event notice; spawn new thread to handle the event's callback functions
+    ink_thread_create(event_callback_thread, (void *)event);
   }
 
-  // now get proxy state
-  err_t = socket_read_conn(fd, (uint8_t *)&state_t, SIZE_PROXY_T);
-  if (err_t != TS_ERR_OKAY) {
-    return err_t;
-  }
-
-  *state = (TSProxyStateT) state_t;
-  return TS_ERR_OKAY;
+  ink_thread_exit(NULL);
+  return NULL;
 }
 
-/*------------- events ---------------------------------------------*/
-
 /**********************************************************************
- * parse_event_active_reply
+ * event_callback_thread
  *
- * purpose: parses a TM reply to a request to get status of an event
- * input: fd - socket to read
- *        is_active - set to true if event is active; false otherwise
- * output: TS_ERR_xx
- * notes:
- * format: reply format = <TSMgmtError> <bool>
+ * purpose: Given an event, determines and calls the registered cb functions
+ *          in the CallbackTable for remote events
+ * input: arg - should be an TSMgmtEvent with the event info sent from TM msg
+ * output: returns when done calling all the callbacks
+ * notes: None
  **********************************************************************/
-TSMgmtError
-parse_event_active_reply(int fd, bool * is_active)
+static void *
+event_callback_thread(void *arg)
 {
-  int16_t ret_val, active;
-  TSMgmtError err_t;
-
-  if (!is_active)
-    return TS_ERR_PARAMS;
-
-  // check to see if anything to read; wait for specified time
-  if (mgmt_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) {
-    return TS_ERR_NET_TIMEOUT;
-  }
+  TSMgmtEvent *event_notice;
+  EventCallbackT *event_cb;
+  int index;
 
-  // get the return value (TSMgmtError type)
-  err_t = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T);
-  if (err_t != TS_ERR_OKAY) {
-    return err_t;
-  }
-
-  // if !TS_ERR_OKAY, stop reading rest of msg
-  err_t = (TSMgmtError) ret_val;
-  if (err_t != TS_ERR_OKAY) {
-    return err_t;
-  }
+  event_notice = (TSMgmtEvent *) arg;
+  index = (int) event_notice->id;
+  LLQ *func_q;                  // list of callback functions need to call
 
-  // now get the boolean
-  err_t = socket_read_conn(fd, (uint8_t *)&active, SIZE_BOOL);
-  if (err_t != TS_ERR_OKAY) {
-    return err_t;
+  func_q = create_queue();
+  if (!func_q) {
+    if (event_notice)
+      TSEventDestroy(event_notice);
+    return NULL;
   }
 
-  *is_active = (bool) active;
-  return err_t;
-}
+  // obtain lock
+  ink_mutex_acquire(&remote_event_callbacks->event_callback_lock);
 
-/**********************************************************************
- * parse_event_notification
- *
- * purpose: parses the event notification message from TM when an event
- *          is signalled; stores the event info in the TSMgmtEvent
- * input: fd - socket to read
- *        event - where the event info from msg is stored
- * output:TS_ERR_OKAY, TS_ERR_NET_READ, TS_ERR_NET_EOF, TS_ERR_PARAMS
- * notes:
- * format: <OpType> <event_name_len> <event_name> <desc_len> <desc>
- **********************************************************************/
-TSMgmtError
-parse_event_notification(int fd, TSMgmtEvent * event)
-{
-  OpType msg_type;
-  int16_t type_op;
-  int32_t msg_len;
-  char *event_name = NULL, *desc = NULL;
-  TSMgmtError err_t;
-
-  if (!event)
-    return TS_ERR_PARAMS;
-
-  // read the operation type; should be EVENT_NOTIFY
-  err_t = socket_read_conn(fd, (uint8_t *)&type_op, SIZE_OP_T);
-  if (err_t != TS_ERR_OKAY) {
-    return err_t;
-  }
+  TSEventSignalFunc cb;
 
-  // got the message type; the msg_type should be EVENT_NOTIFY
-  msg_type = (OpType) type_op;
-  if (msg_type != EVENT_NOTIFY) {
-    return TS_ERR_FAIL;
-  }
+  // check if we have functions to call
+  if (remote_event_callbacks->event_callback_l[index] && (!queue_is_empty(remote_event_callbacks->event_callback_l[index]))) {
+    int queue_depth = queue_len(remote_event_callbacks->event_callback_l[index]);
 
-  // read in event name length
-  err_t = socket_read_conn(fd, (uint8_t *)&msg_len, SIZE_LEN);
-  if (err_t != TS_ERR_OKAY) {
-    return err_t;
-  }
-
-  // read the event name
-  event_name = (char *)ats_malloc(sizeof(char) * (msg_len + 1));
-  err_t = socket_read_conn(fd, (uint8_t *)event_name, msg_len);
-  if (err_t != TS_ERR_OKAY) {
-    goto fail;
-  }
-
-  event_name[msg_len] = '\0';   // end the string
-
-  // read in event description length
-  err_t = socket_read_conn(fd, (uint8_t *)&msg_len, SIZE_LEN);
-  if (err_t != TS_ERR_OKAY) {
-    goto fail;
+    for (int i = 0; i < queue_depth; i++) {
+      event_cb = (EventCallbackT *) dequeue(remote_event_callbacks->event_callback_l[index]);
+      cb = event_cb->func;
+      enqueue(remote_event_callbacks->event_callback_l[index], event_cb);
+      enqueue(func_q, (void *) cb);     // add callback function only to list
+    }
   }
+  // release lock
+  ink_mutex_release(&remote_event_callbacks->event_callback_lock);
 
-  // read the event description
-  desc = (char *)ats_malloc(sizeof(char) * (msg_len + 1));
-  err_t = socket_read_conn(fd, (uint8_t *)desc, msg_len);
-  if (err_t != TS_ERR_OKAY) {
-    goto fail;
+  // execute the callback function
+  while (!queue_is_empty(func_q)) {
+    cb = (TSEventSignalFunc) dequeue(func_q);
+    (*cb) (event_notice->name, event_notice->description, event_notice->priority, NULL);
   }
 
-  desc[msg_len] = '\0';         // end the string
+  // clean up event notice
+  TSEventDestroy(event_notice);
+  delete_queue(func_q);
 
-  // fill in event info
-  event->name = event_name;
-  event->id = (int) get_event_id(event_name);
-  event->description = desc;
-
-  return TS_ERR_OKAY;
-
-fail:
-  ats_free(event_name);
-  ats_free(desc);
-  return err_t;
+  // all done!
+  ink_thread_exit(NULL);
+  return NULL;
 }

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/6250f431/mgmt/api/NetworkUtilsRemote.h
----------------------------------------------------------------------
diff --git a/mgmt/api/NetworkUtilsRemote.h b/mgmt/api/NetworkUtilsRemote.h
index 9a2ac7d..dfa3e74 100644
--- a/mgmt/api/NetworkUtilsRemote.h
+++ b/mgmt/api/NetworkUtilsRemote.h
@@ -32,28 +32,25 @@
  *
  ***************************************************************************/
 
-#ifndef _NETWORK_UTILS_H_
-#define _NETWORK_UTILS_H_
-
-#include "ink_defs.h"
-#include "ink_mutex.h"
+#ifndef _NETWORK_UTILS_REMOTE_H_
+#define _NETWORK_UTILS_REMOTE_H_
 
 #include "mgmtapi.h"
-#include "NetworkUtilsDefs.h"
+#include "NetworkMessage.h"
 #include "EventCallback.h"
 
-#ifdef __cplusplus
-extern "C"
-{
-#endif /* __cplusplus */
+extern int main_socket_fd;
+extern int event_socket_fd;
+extern CallbackTable *remote_event_callbacks;
 
-const int DEFAULT_STACK_SIZE = 1048576; // 1MB stack
+// From CoreAPIRemote.cc
+extern ink_thread ts_event_thread;
+extern TSInitOptionT ts_init_options;
 
 /**********************************************************************
  * Socket Helper Functions
  **********************************************************************/
 void set_socket_paths(const char *path);
-int socket_test(int fd);
 
 /* The following functions are specific for a client connection; uses
  * the client connection information stored in the variables in
@@ -63,47 +60,30 @@ TSMgmtError ts_connect(); /* TODO: update documenation, Renamed due to conflict
 TSMgmtError disconnect();
 TSMgmtError reconnect();
 TSMgmtError reconnect_loop(int num_attempts);
-TSMgmtError connect_and_send(const char *msg, int msg_len);
+
 void *socket_test_thread(void *arg);
+void *event_poll_thread_main(void *arg);
+
+struct mgmtapi_sender : public mgmt_message_sender
+{
+  explicit mgmtapi_sender(int _fd) : fd(_fd) {}
+  virtual TSMgmtError send(void * msg, size_t msglen) const;
+
+  int fd;
+};
+
+#define MGMTAPI_SEND_MESSAGE(fd, optype, ...) send_mgmt_request(mgmtapi_sender(fd), (optype), __VA_ARGS__)
 
 /*****************************************************************************
  * Marshalling (create requests)
  *****************************************************************************/
-TSMgmtError send_request(int fd, OpType op);
-TSMgmtError send_request_name(int fd, OpType op, const char *name);
-TSMgmtError send_request_name_value(int fd, OpType op, const char *name, const char *value);
-TSMgmtError send_request_bool(int fd, OpType op, bool flag);
-
-TSMgmtError send_file_read_request(int fd, TSFileNameT file);
-TSMgmtError send_file_write_request(int fd, TSFileNameT file, int ver, int size, char *text);
-TSMgmtError send_record_get_request(int fd, const char *rec_name);
-TSMgmtError send_record_match_request(int fd, const char *rec_regex);
-
-TSMgmtError send_proxy_state_set_request(int fd, TSProxyStateT state, TSCacheClearT clear);
 
 TSMgmtError send_register_all_callbacks(int fd, CallbackTable * cb_table);
 TSMgmtError send_unregister_all_callbacks(int fd, CallbackTable * cb_table);
 
-TSMgmtError send_diags_msg(int fd, TSDiagsT mode, const char *diag_msg);
-
 /*****************************************************************************
  * Un-marshalling (parse responses)
  *****************************************************************************/
-TSMgmtError parse_reply(int fd);
-TSMgmtError parse_reply_list(int fd, char **list);
-
-TSMgmtError parse_file_read_reply(int fd, int *version, int *size, char **text);
-
-TSMgmtError parse_record_get_reply(int fd, TSRecordT * rec_type, void **rec_val, char **rec_name);
-TSMgmtError parse_record_set_reply(int fd, TSActionNeedT * action_need);
-
-TSMgmtError parse_proxy_state_get_reply(int fd, TSProxyStateT * state);
-
-TSMgmtError parse_event_active_reply(int fd, bool * is_active);
-TSMgmtError parse_event_notification(int fd, TSMgmtEvent * event);
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
+TSMgmtError parse_generic_response(OpType optype, int fd);
 
-#endif
+#endif /* _NETWORK_UTILS_REMOTE_H_ */