You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2014/08/26 00:57:16 UTC
[3/7] TS-3033: reimplement management API with generic marshalling
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/6250f431/mgmt/api/NetworkMessage.h
----------------------------------------------------------------------
diff --git a/mgmt/api/NetworkMessage.h b/mgmt/api/NetworkMessage.h
new file mode 100644
index 0000000..412c23c
--- /dev/null
+++ b/mgmt/api/NetworkMessage.h
@@ -0,0 +1,89 @@
+/** @file
+
+ Network message marshalling.
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+#ifndef _NETWORK_MESSAGE_H_
+#define _NETWORK_MESSAGE_H_
+
+#include "MgmtMarshall.h"
+
+#define REMOTE_DELIM ':'
+#define REMOTE_DELIM_STR ":"
+
+#define MAX_CONN_TRIES 10 // maximum number of attemps to reconnect to TM
+
+// the possible operations or msg types sent from remote client to TM
+typedef enum
+{
+ FILE_READ,
+ FILE_WRITE,
+ RECORD_SET,
+ RECORD_GET,
+ PROXY_STATE_GET,
+ PROXY_STATE_SET,
+ RECONFIGURE,
+ RESTART,
+ BOUNCE,
+ EVENT_RESOLVE,
+ EVENT_GET_MLT,
+ EVENT_ACTIVE,
+ EVENT_REG_CALLBACK,
+ EVENT_UNREG_CALLBACK,
+ EVENT_NOTIFY, /* only msg sent from TM to client */
+ SNAPSHOT_TAKE,
+ SNAPSHOT_RESTORE,
+ SNAPSHOT_REMOVE,
+ SNAPSHOT_GET_MLT,
+ DIAGS,
+ STATS_RESET_NODE,
+ STATS_RESET_CLUSTER,
+ STORAGE_DEVICE_CMD_OFFLINE,
+ RECORD_MATCH_GET,
+ API_PING,
+ UNDEFINED_OP /* This must be last */
+} OpType;
+
+struct mgmt_message_sender
+{
+ virtual TSMgmtError send(void * msg, size_t msglen) const = 0;
+};
+
+// Marshall and send a request, prefixing the message length as a MGMT_MARSHALL_INT.
+TSMgmtError send_mgmt_request(const mgmt_message_sender& snd, OpType optype, ...);
+TSMgmtError send_mgmt_request(int fd, OpType optype, ...);
+
+// Parse a request message from a buffer.
+TSMgmtError recv_mgmt_request(void * buf, size_t buflen, OpType optype, ...);
+
+// Marshall and send a response, prefixing the message length as a MGMT_MARSHALL_INT.
+TSMgmtError send_mgmt_response(int fd, OpType optype, ...);
+
+// Parse a response message from a buffer.
+TSMgmtError recv_mgmt_response(void * buf, size_t buflen, OpType optype, ...);
+
+// Pull a management message (either request or response) off the wire.
+TSMgmtError recv_mgmt_message(int fd, MgmtMarshallData& msg);
+
+// Extract the first MGMT_MARSHALL_INT from the buffered message. This is the OpType.
+OpType extract_mgmt_request_optype(void * msg, size_t msglen);
+
+#endif /* _NETWORK_MESSAGE_H_ */
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/6250f431/mgmt/api/NetworkUtilsDefs.h
----------------------------------------------------------------------
diff --git a/mgmt/api/NetworkUtilsDefs.h b/mgmt/api/NetworkUtilsDefs.h
deleted file mode 100644
index bd94fdc..0000000
--- a/mgmt/api/NetworkUtilsDefs.h
+++ /dev/null
@@ -1,87 +0,0 @@
-/** @file
-
- A brief file description
-
- @section license License
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-
-/***************************************************************************
- * NetworkUtilsDefs.h
- *
- * contains general definitions used by both NetworkUtilsRemote and
- * NetworkUtilsLocal
- *
- *
- ***************************************************************************/
-
-#ifndef _NETWORK_UTILS_DEFS_H_
-#define _NETWORK_UTILS_DEFS_H_
-
-#define REMOTE_DELIM ':'
-#define REMOTE_DELIM_STR ":"
-
-#define MAX_CONN_TRIES 10 // maximum number of attemps to reconnect to TM
-#define MAX_TIME_WAIT 60 // num secs for a timeout on a select call (remote only)
-
-// measure in bytes used in construcing network messages
-#define SIZE_OP_T 2 // num bytes used to specify OpType
-#define SIZE_FILE_T 2 // num bytes used to specify INKFileNameT
-#define SIZE_LEN 4 // max num bytes used to specify length of anything
-#define SIZE_ERR_T 2 // num bytes used to specify INKError return value
-#define SIZE_VER 2 // num bytes used to specify file version
-#define SIZE_REC_T 2 // num bytes used to specify INKRecordT
-#define SIZE_PROXY_T 2 // num bytes used to specify INKProxyStateT
-#define SIZE_TS_ARG_T 2 // num bytes used to specify INKCacheClearT
-#define SIZE_DIAGS_T 2 // num bytes used to specify INKDiagsT
-#define SIZE_BOOL 2
-#define SIZE_ACTION_T 2 // num bytes used to specify INKActionNeedT
-#define SIZE_EVENT_ID 2 // num bytes used to specify event_id
-
-
-// the possible operations or msg types sent from remote client to TM
-typedef enum
-{
- FILE_READ,
- FILE_WRITE,
- RECORD_SET,
- RECORD_GET,
- PROXY_STATE_GET,
- PROXY_STATE_SET,
- RECONFIGURE,
- RESTART,
- BOUNCE,
- EVENT_RESOLVE,
- EVENT_GET_MLT,
- EVENT_ACTIVE,
- EVENT_REG_CALLBACK,
- EVENT_UNREG_CALLBACK,
- EVENT_NOTIFY, /* only msg sent from TM to client */
- SNAPSHOT_TAKE,
- SNAPSHOT_RESTORE,
- SNAPSHOT_REMOVE,
- SNAPSHOT_GET_MLT,
- DIAGS,
- STATS_RESET_NODE,
- STATS_RESET_CLUSTER,
- STORAGE_DEVICE_CMD_OFFLINE,
- RECORD_MATCH_GET,
- UNDEFINED_OP /* This must be last */
-} OpType;
-
-#endif
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/6250f431/mgmt/api/NetworkUtilsLocal.cc
----------------------------------------------------------------------
diff --git a/mgmt/api/NetworkUtilsLocal.cc b/mgmt/api/NetworkUtilsLocal.cc
index 8da5d63..ae812ab 100644
--- a/mgmt/api/NetworkUtilsLocal.cc
+++ b/mgmt/api/NetworkUtilsLocal.cc
@@ -35,138 +35,10 @@
#include "Diags.h"
#include "MgmtUtils.h"
#include "MgmtSocket.h"
+#include "MgmtMarshall.h"
#include "CoreAPIShared.h"
#include "NetworkUtilsLocal.h"
-
-#ifndef MAX_BUF_SIZE
-#define MAX_BUF_SIZE 4096
-#endif
-
-/**************************************************************************
- * socket_flush
- *
- * flushes the socket by reading the entire message out of the socket
- * and then gets rid of the msg
- **************************************************************************/
-TSMgmtError
-socket_flush(struct SocketInfo sock_info)
-{
- int ret, byte_read = 0;
- char buf[MAX_BUF_SIZE];
-
- // check to see if anything to read; wait only for specified time
- if (mgmt_read_timeout(sock_info.fd, MAX_TIME_WAIT, 0) <= 0) {
- return TS_ERR_NET_TIMEOUT;
- }
- // read entire message
- while (byte_read < MAX_BUF_SIZE) {
-
- ret = socket_read(sock_info, buf + byte_read, MAX_BUF_SIZE - byte_read);
-
- if (ret < 0) {
- if (errno == EAGAIN)
- continue;
-
- Debug("ts_main", "[socket_read_n] socket read for version byte failed.\n");
- mgmt_elog(0, "[socket_flush] (TS_ERR_NET_READ) %s\n", strerror(errno));
- return TS_ERR_NET_READ;
- }
-
- if (ret == 0) {
- Debug("ts_main", "[socket_read_n] returned 0 on reading: %s.\n", strerror(errno));
- return TS_ERR_NET_EOF;
- }
- // we are all good here
- byte_read += ret;
- }
-
- mgmt_elog(0, "[socket_flush] uh oh! didn't finish flushing socket!\n");
- return TS_ERR_FAIL;
-}
-
-/**************************************************************************
- * socket_read_n
- *
- * purpose: guarantees reading of n bytes or return error.
- * input: socket info struct, buffer to read into and number of bytes to read
- * output: number of bytes read
- * note: socket_read is implemented in WebUtils.cc
- *************************************************************************/
-TSMgmtError
-socket_read_n(struct SocketInfo sock_info, char *buf, int bytes)
-{
- int ret, byte_read = 0;
-
- // check to see if anything to read; wait for specified time
- if (mgmt_read_timeout(sock_info.fd, MAX_TIME_WAIT, 0) <= 0) {
- return TS_ERR_NET_TIMEOUT;
- }
- // read until we fulfill the number
- while (byte_read < bytes) {
- ret = socket_read(sock_info, buf + byte_read, bytes - byte_read);
-
- // error!
- if (ret < 0) {
- if (errno == EAGAIN)
- continue;
-
- Debug("ts_main", "[socket_read_n] socket read for version byte failed.\n");
- mgmt_elog(0, "[socket_read_n] (TS_ERR_NET_READ) %s\n", strerror(errno));
- return TS_ERR_NET_READ;
- }
-
- if (ret == 0) {
- Debug("ts_main", "[socket_read_n] returned 0 on reading: %s.\n", strerror(errno));
- return TS_ERR_NET_EOF;
- }
- // we are all good here
- byte_read += ret;
- }
-
- return TS_ERR_OKAY;
-}
-
-/**************************************************************************
- * socket_write_n
- *
- * purpose: guarantees writing of n bytes or return error
- * input: socket info struct, buffer to write from & number of bytes to write
- * output: TS_ERR_xx (depends on num bytes written)
- * note: socket_read is implemented in WebUtils.cc
- *************************************************************************/
-TSMgmtError
-socket_write_n(struct SocketInfo sock_info, const char *buf, int bytes)
-{
- int ret, byte_wrote = 0;
-
- // makes sure the socket descriptor is writable
- if (mgmt_write_timeout(sock_info.fd, MAX_TIME_WAIT, 0) <= 0) {
- return TS_ERR_NET_TIMEOUT;
- }
- // read until we fulfill the number
- while (byte_wrote < bytes) {
- ret = socket_write(sock_info, buf + byte_wrote, bytes - byte_wrote);
-
- if (ret < 0) {
- Debug("ts_main", "[socket_write_n] return error %s \n", strerror(errno));
- mgmt_elog(0, "[socket_write_n] %s\n", strerror(errno));
- if (errno == EAGAIN)
- continue;
-
- return TS_ERR_NET_WRITE;
- }
-
- if (ret == 0) {
- mgmt_elog(0, "[socket_write_n] %s\n", strerror(errno));
- return TS_ERR_NET_EOF;
- }
- // we are all good here
- byte_wrote += ret;
- }
-
- return TS_ERR_OKAY;
-}
-
+#include "NetworkMessage.h"
/**********************************************************************
* preprocess_msg
@@ -174,602 +46,32 @@ socket_write_n(struct SocketInfo sock_info, const char *buf, int bytes)
* purpose: reads in all the message; parses the message into header info
* (OpType + msg_len) and the request portion (used by the handle_xx fns)
* input: sock_info - socket msg is read from
- * op_t - the operation type specified in the msg
* msg - the data from the network message (no OpType or msg_len)
* output: TS_ERR_xx ( if TS_ERR_OKAY, then parameters set successfully)
* notes: Since preprocess_msg already removes the OpType and msg_len, this part o
* the message is not dealt with by the other parsing functions
**********************************************************************/
TSMgmtError
-preprocess_msg(struct SocketInfo sock_info, OpType * op_t, char **req)
+preprocess_msg(int fd, void ** req, size_t * reqlen)
{
TSMgmtError ret;
- int req_len;
- int16_t op;
-
- // read operation type
- ret = socket_read_n(sock_info, (char *) &op, SIZE_OP_T);
- if (ret != TS_ERR_OKAY) {
- Debug("ts_main", "[preprocess_msg] ERROR %d reading op type\n", ret);
- goto Lerror;
- }
-
- Debug("ts_main", "[preprocess_msg] operation = %d", op);
- *op_t = (OpType) op; // convert to proper format
-
- // check if invalid op type
- if ((int) op > UNDEFINED_OP) {
- mgmt_elog(0, "[preprocess_msg] ERROR: %d is invalid op type\n", op);
+ MgmtMarshallData msg;
- // need to flush the invalid message from the socket
- if ((ret = socket_flush(sock_info)) != TS_ERR_NET_EOF)
- mgmt_log("[preprocess_msg] unsuccessful socket flushing\n");
- else
- mgmt_log("[preprocess_msg] successfully flushed the socket\n");
+ *req = NULL;
+ *reqlen = 0;
- goto Lerror;
- }
- // now read the request msg size
- ret = socket_read_n(sock_info, (char *) &req_len, SIZE_LEN);
+ ret = recv_mgmt_message(fd, msg);
if (ret != TS_ERR_OKAY) {
- mgmt_elog(0, "[preprocess_msg] ERROR %d reading msg size\n", ret);
- Debug("ts_main", "[preprocess_msg] ERROR %d reading msg size\n", ret);
- goto Lerror;
+ return ret;
}
- Debug("ts_main", "[preprocess_msg] length = %d\n", req_len);
-
- // use req msg length to fetch the rest of the message
- // first check that there is a "rest of the msg", some msgs just
- // have the op specified
- if (req_len == 0) {
- *req = NULL;
- Debug("ts_main", "[preprocess_msg] request message = NULL\n");
- } else {
- *req = (char *)ats_malloc(sizeof(char) * (req_len + 1));
- ret = socket_read_n(sock_info, *req, req_len);
- if (ret != TS_ERR_OKAY) {
- ats_free(*req);
- goto Lerror;
- }
- // add end of string to end of msg
- (*req)[req_len] = '\0';
- Debug("ts_main", "[preprocess_msg] request message = %s\n", *req);
+ // We should never receive an empty payload.
+ if (msg.ptr == NULL) {
+ return TS_ERR_NET_READ;
}
+ *req = msg.ptr;
+ *reqlen = msg.len;
+ Debug("ts_main", "[preprocess_msg] read message length = %zd\n", msg.len);
return TS_ERR_OKAY;
-
-Lerror:
- return ret;
-}
-
-
-/**********************************************************************
- * Unmarshal Requests
- **********************************************************************/
-
-/**********************************************************************
- * parse_file_read_request
- *
- * purpose: parses a file read request from a remote API client
- * input: req - data that needs to be parsed
- * file - the file type sent in the request
- * output: TS_ERR_xx
- * notes: request format = <TSFileNameT>
- **********************************************************************/
-TSMgmtError
-parse_file_read_request(char *req, TSFileNameT * file)
-{
- int16_t file_t;
-
- if (!req || !file)
- return TS_ERR_PARAMS;
-
- // get file type - copy first 2 bytes of request
- memcpy(&file_t, req, SIZE_FILE_T);
- *file = (TSFileNameT) file_t;
-
- return TS_ERR_OKAY;
-}
-
-/**********************************************************************
- * parse_file_write_request
- *
- * purpose: parses a file write request from a remote API client
- * input: socket info
- * file - the file type to write that was sent in the request
- * text - the text that needs to be written
- * size - length of the text
- * ver - version of the file that is to be written
- * output: TS_ERR_xx
- * notes: request format = <TSFileNameT> <version> <size> <text>
- **********************************************************************/
-TSMgmtError
-parse_file_write_request(char *req, TSFileNameT * file, int *ver, int *size, char **text)
-{
- int16_t file_t, f_ver;
- int32_t f_size;
-
- // check input is non-NULL
- if (!req || !file || !ver || !size || !text)
- return TS_ERR_PARAMS;
-
- // get file type - copy first 2 bytes of request
- memcpy(&file_t, req, SIZE_FILE_T);
- *file = (TSFileNameT) file_t;
-
- // get file version - copy next 2 bytes
- memcpy(&f_ver, req + SIZE_FILE_T, SIZE_VER);
- *ver = (int) f_ver;
-
- // get file size - copy next 4 bytes
- memcpy(&f_size, req + SIZE_FILE_T + SIZE_VER, SIZE_LEN);
- *size = (int) f_size;
-
- // get file text
- *text = (char *)ats_malloc(sizeof(char) * (f_size + 1));
- memcpy(*text, req + SIZE_FILE_T + SIZE_VER + SIZE_LEN, f_size);
- (*text)[f_size] = '\0'; // end buffer
-
- return TS_ERR_OKAY;
-}
-
-/**********************************************************************
- * parse_request_name_value
- *
- * purpose: parses a request w/ 2 args from a remote API client
- * input: req - request info from requestor
- * name - first arg
- * val - second arg
- * output: TS_ERR_xx
- * notes: format= <name_len> <val_len> <name> <val>
- **********************************************************************/
-TSMgmtError
-parse_request_name_value(char *req, char **name_1, char **val_1)
-{
- int32_t name_len, val_len;
- char *name, *val;
-
- if (!req || !name_1 || !val_1)
- return TS_ERR_PARAMS;
-
- // get record name length
- memcpy(&name_len, req, SIZE_LEN);
-
- // get record value length
- memcpy(&val_len, req + SIZE_LEN, SIZE_LEN);
-
- // get record name
- name = (char *)ats_malloc(sizeof(char) * (name_len + 1));
- memcpy(name, req + SIZE_LEN + SIZE_LEN, name_len);
- name[name_len] = '\0'; // end string
- *name_1 = name;
-
- // get record value - can be a MgmtInt, MgmtCounter ...
- val = (char *)ats_malloc(sizeof(char) * (val_len + 1));
- memcpy(val, req + SIZE_LEN + SIZE_LEN + name_len, val_len);
- val[val_len] = '\0'; // end string
- *val_1 = val;
-
- return TS_ERR_OKAY;
-}
-
-
-/**********************************************************************
- * parse_diags_request
- *
- * purpose: parses a diags request
- * input: diag_msg - the diag msg to be outputted
- * mode - indicates what type of diag message
- * output: TS_ERR_xx
- * notes: request format = <TSDiagsT> <diag_msg_len> <diag_msg>
- **********************************************************************/
-TSMgmtError
-parse_diags_request(char *req, TSDiagsT * mode, char **diag_msg)
-{
- int16_t diag_t;
- int32_t msg_len;
-
- // check input is non-NULL
- if (!req || !mode || !diag_msg)
- return TS_ERR_PARAMS;
-
- // get diags type - copy first 2 bytes of request
- memcpy(&diag_t, req, SIZE_DIAGS_T);
- *mode = (TSDiagsT) diag_t;
-
- // get msg size - copy next 4 bytes
- memcpy(&msg_len, req + SIZE_DIAGS_T, SIZE_LEN);
-
- // get msg
- *diag_msg = (char *)ats_malloc(sizeof(char) * (msg_len + 1));
- memcpy(*diag_msg, req + SIZE_DIAGS_T + SIZE_LEN, msg_len);
- (*diag_msg)[msg_len] = '\0'; // end buffer
-
- return TS_ERR_OKAY;
-}
-
-/**********************************************************************
- * parse_proxy_state_request
- *
- * purpose: parses a request to set the proxy state
- * input: diag_msg - the diag msg to be outputted
- * mode - indicates what type of diag message
- * output: TS_ERR_xx
- * notes: request format = <TSProxyStateT> <TSCacheClearT>
- **********************************************************************/
-TSMgmtError
-parse_proxy_state_request(char *req, TSProxyStateT * state, TSCacheClearT * clear)
-{
- int16_t state_t, cache_t;
-
- // check input is non-NULL
- if (!req || !state || !clear)
- return TS_ERR_PARAMS;
-
- // get proxy on/off
- memcpy(&state_t, req, SIZE_PROXY_T);
- *state = (TSProxyStateT) state_t;
-
- // get cahce-clearing type
- memcpy(&cache_t, req + SIZE_PROXY_T, SIZE_TS_ARG_T);
- *clear = (TSCacheClearT) cache_t;
-
- return TS_ERR_OKAY;
-}
-
-/**********************************************************************
- * Marshal Replies
- **********************************************************************/
-/* NOTE: if the send function "return"s before writing to the socket
- then that means that an error occurred, and so the calling function
- must send_reply with the error that occurred. */
-
-/**********************************************************************
- * send_reply
- *
- * purpose: sends a simple TS_ERR_* reply to the request made
- * input: return value - could be extended to support more complex
- * error codes but for now use only TS_ERR_FAIL, TS_ERR_OKAY
- * int fd - socket fd to use.
- * output: TS_ERR_*
- * notes: this function does not need to go through the internal structure
- * so no cleaning up is done.
- **********************************************************************/
-TSMgmtError
-send_reply(struct SocketInfo sock_info, TSMgmtError retval)
-{
- TSMgmtError ret;
- char msg[SIZE_ERR_T];
- int16_t ret_val;
-
- // write the return value
- ret_val = (int16_t) retval;
- memcpy(msg, (void *) &ret_val, SIZE_ERR_T);
-
- // now push it to the socket
- ret = socket_write_n(sock_info, msg, SIZE_ERR_T);
-
- return ret;
-}
-
-/**********************************************************************
- * send_reply_list
- *
- * purpose: sends the reply in response to a request to get list of string
- * tokens (delimited by REMOTE_DELIM_STR)
- * input: sock_info -
- * retval - TSMgmtError return type for the CoreAPI call
- * list - string delimited list of string tokens
- * output: TS_ERR_*
- * notes:
- * format: <TSMgmtError> <string_list_len> <delimited_string_list>
- **********************************************************************/
-TSMgmtError
-send_reply_list(struct SocketInfo sock_info, TSMgmtError retval, char *list)
-{
- TSMgmtError ret;
- int msg_pos = 0, total_len;
- char *msg;
- int16_t ret_val;
- int32_t list_size; // to be safe, typecast
-
- if (!list) {
- return TS_ERR_PARAMS;
- }
-
- total_len = SIZE_ERR_T + SIZE_LEN + strlen(list);
- msg = (char *)ats_malloc(sizeof(char) * total_len);
-
- // write the return value
- ret_val = (int16_t) retval;
- memcpy(msg, (void *) &ret_val, SIZE_ERR_T);
- msg_pos += SIZE_ERR_T;
-
- // write the length of the string list
- list_size = (int32_t) strlen(list);
- memcpy(msg + msg_pos, (void *) &list_size, SIZE_LEN);
- msg_pos += SIZE_LEN;
-
- // write the event string list
- memcpy(msg + msg_pos, list, list_size);
-
- // now push it to the socket
- ret = socket_write_n(sock_info, msg, total_len);
- ats_free(msg);
-
- return ret;
-}
-
-
-/**********************************************************************
- * send_record_get_reply
- *
- * purpose: sends reply to the record_get request made
- * input: retval - result of the record get request
- * int fd - socket fd to use.
- * val - the value of the record requested
- * val_size - num bytes the value occupies
- * rec_type - the type of the record value requested
- * output: TS_ERR_*
- * notes: this function does not need to go through the internal structure
- * so no cleaning up is done.
- * format = <TSMgmtError> <rec_val_len> <name_size> <rec_type> <rec_val> <rec_name>
- **********************************************************************/
-TSMgmtError
-send_record_get_reply(struct SocketInfo sock_info, TSMgmtError retval, void *val, int val_size,
- TSRecordT rec_type, const char *rec_name)
-{
- TSMgmtError ret;
- int msg_pos = 0, total_len;
- char *msg;
- int16_t record_t, ret_val;
- int32_t v_size = (int32_t) val_size; // to be safe, typecast
- int32_t n_size = rec_name ? (int32_t)strlen(rec_name) : 0;
-
- total_len = SIZE_ERR_T + SIZE_LEN + SIZE_LEN + SIZE_REC_T + v_size + n_size;
- msg = (char *)ats_malloc(sizeof(char) * total_len);
-
- // write the return value
- ret_val = (int16_t) retval;
- memcpy(msg, (void *) &ret_val, SIZE_ERR_T);
- msg_pos += SIZE_ERR_T;
-
- // write the size of the record value
- memcpy(msg + msg_pos, (void *) &v_size, SIZE_LEN);
- msg_pos += SIZE_LEN;
-
- // write the size of the record name
- memcpy(msg + msg_pos, (void *) &n_size, SIZE_LEN);
- msg_pos += SIZE_LEN;
-
- // write the record type
- record_t = (int16_t) rec_type;
- memcpy(msg + msg_pos, (void *) &record_t, SIZE_REC_T);
- msg_pos += SIZE_REC_T;
-
- // write the record value
- if (v_size) {
- memcpy(msg + msg_pos, val, v_size);
- msg_pos += v_size;
- }
-
- // write the record name
- if (n_size) {
- memcpy(msg + msg_pos, rec_name, n_size);
- }
-
- // now push it to the socket
- ret = socket_write_n(sock_info, msg, total_len);
- ats_free(msg);
-
- return ret;
-}
-
-/**********************************************************************
- * send_record_set_reply
- *
- * purpose: sends reply to the record_set request made
- * input:
- * output: TS_ERR_*
- * notes: this function does not need to go through the internal structure
- * so no cleaning up is done.
- * format =
- **********************************************************************/
-TSMgmtError
-send_record_set_reply(struct SocketInfo sock_info, TSMgmtError retval, TSActionNeedT action_need)
-{
- TSMgmtError ret;
- int total_len;
- char *msg;
- int16_t action_t, ret_val;
-
- total_len = SIZE_ERR_T + SIZE_ACTION_T;
- msg = (char *)ats_malloc(sizeof(char) * total_len);
-
- // write the return value
- ret_val = (int16_t) retval;
- memcpy(msg, (void *) &ret_val, SIZE_ERR_T);
-
- // write the action needed
- action_t = (int16_t) action_need;
- memcpy(msg + SIZE_ERR_T, (void *) &action_t, SIZE_ACTION_T);
-
- // now push it to the socket
- ret = socket_write_n(sock_info, msg, total_len);
- ats_free(msg);
-
- return ret;
-}
-
-
-/**********************************************************************
- * send_file_read_reply
- *
- * purpose: sends the reply in response to a file read request
- * input: return value - could be extended to support more complex
- * error codes but for now use only TS_ERR_FAIL, TS_ERR_OKAY
- * int fd - socket fd to use.
- * output: TS_ERR_*
- * notes: this function does not need to go through the internal structure
- * so no cleaning up is done.
- * reply format = <TSMgmtError> <file_ver> <file_size> <file_text>
- **********************************************************************/
-TSMgmtError
-send_file_read_reply(struct SocketInfo sock_info, TSMgmtError retval, int ver, int size, char *text)
-{
- TSMgmtError ret;
- int msg_pos = 0, msg_len;
- char *msg;
- int16_t ret_val, f_ver;
- int32_t f_size; // to be safe
-
- if (!text)
- return TS_ERR_PARAMS;
-
- // allocate space for buffer
- msg_len = SIZE_ERR_T + SIZE_VER + SIZE_LEN + size;
- msg = (char *)ats_malloc(sizeof(char) * msg_len);
-
- // write the return value
- ret_val = (int16_t) retval;
- memcpy(msg, (void *) &ret_val, SIZE_ERR_T);
- msg_pos += SIZE_ERR_T;
-
- // write file version
- f_ver = (int16_t) ver;
- memcpy(msg + msg_pos, (void *) &f_ver, SIZE_VER);
- msg_pos += SIZE_VER;
-
- // write file size
- f_size = (int32_t) size;
- memcpy(msg + msg_pos, (void *) &f_size, SIZE_LEN);
- msg_pos += SIZE_LEN;
-
- // write the file text
- memcpy(msg + msg_pos, text, size);
-
- // now push it to the socket
- ret = socket_write_n(sock_info, msg, msg_len);
- ats_free(msg);
-
- return ret;
-}
-
-
-/**********************************************************************
- * send_proxy_state_get_reply
- *
- * purpose: sends the reply in response to a request to get state of proxy
- * input:
- * int fd - socket fd to use.
- * output: TS_ERR_*
- * notes: this function DOES NOT HAVE IT"S OWN TSMgmtError TO SEND!!!!
- * reply format = <TSProxyStateT>
- **********************************************************************/
-TSMgmtError
-send_proxy_state_get_reply(struct SocketInfo sock_info, TSProxyStateT state)
-{
- TSMgmtError ret;
- char msg[SIZE_PROXY_T];
- int16_t state_t;
-
- // write the state
- state_t = (int16_t) state;
- memcpy(msg, (void *) &state_t, SIZE_PROXY_T);
-
- // now push it to the socket
- ret = socket_write_n(sock_info, msg, SIZE_PROXY_T);
-
- return ret;
-}
-
-
-/**********************************************************************
- * send_event_active_reply
- *
- * purpose: sends the reply in response to a request check if event is active
- * input: sock_info -
- * retval - TSMgmtError return type for the EventIsActive core call
- * active - is the requested event active or not?
- * output: TS_ERR_*
- * notes:
- * format: <TSMgmtError> <bool>
- **********************************************************************/
-TSMgmtError
-send_event_active_reply(struct SocketInfo sock_info, TSMgmtError retval, bool active)
-{
- TSMgmtError ret;
- int total_len;
- char *msg;
- int16_t is_active, ret_val;
-
- total_len = SIZE_ERR_T + SIZE_BOOL;
- msg = (char *)ats_malloc(sizeof(char) * total_len);
-
- // write the return value
- ret_val = (int16_t) retval;
- memcpy(msg, (void *) &ret_val, SIZE_ERR_T);
-
- // write the boolean active state
- is_active = (int16_t) active;
- memcpy(msg + SIZE_ERR_T, (void *) &is_active, SIZE_BOOL);
-
- // now push it to the socket
- ret = socket_write_n(sock_info, msg, total_len);
- ats_free(msg);
-
- return ret;
-}
-
-/**********************************************************************
- * send_event_notification
- *
- * purpose: sends to the client a msg indicating that a certain event
- * has occurred (this msg will be received by the event_poll_thread)
- * input: fd - file descriptor to use for writing
- * event - the event that was signalled on TM side
- * output: TS_ERR_xx
- * note: format: <OpType> <event_name_len> <event_name> <desc_len> <desc>
- **********************************************************************/
-TSMgmtError
-send_event_notification(struct SocketInfo sock_info, TSMgmtEvent * event)
-{
- TSMgmtError ret;
- int total_len, name_len, desc_len;
- char *msg;
- int16_t op_t;
- int32_t len;
-
- if (!event || !event->name || !event->description)
- return TS_ERR_PARAMS;
-
- name_len = strlen(event->name);
- desc_len = strlen(event->description);
- total_len = SIZE_OP_T + (SIZE_LEN * 2) + name_len + desc_len;
- msg = (char *)ats_malloc(sizeof(char) * total_len);
-
- // write the operation
- op_t = (int16_t) EVENT_NOTIFY;
- memcpy(msg, (void *) &op_t, SIZE_OP_T);
-
- // write the size of the event name
- len = (int32_t) name_len;
- memcpy(msg + SIZE_OP_T, (void *) &len, SIZE_LEN);
-
- // write the event name
- memcpy(msg + SIZE_OP_T + SIZE_LEN, event->name, name_len);
-
- // write size of description
- len = (int32_t) desc_len;
- memcpy(msg + SIZE_OP_T + SIZE_LEN + name_len, (void *) &len, SIZE_LEN);
-
- // write the description
- memcpy(msg + SIZE_OP_T + SIZE_LEN + name_len + SIZE_LEN, event->description, desc_len);
-
- // now push it to the socket
- ret = socket_write_n(sock_info, msg, total_len);
- ats_free(msg);
-
- return ret;
}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/6250f431/mgmt/api/NetworkUtilsLocal.h
----------------------------------------------------------------------
diff --git a/mgmt/api/NetworkUtilsLocal.h b/mgmt/api/NetworkUtilsLocal.h
index b2d75f4..06d2c17 100644
--- a/mgmt/api/NetworkUtilsLocal.h
+++ b/mgmt/api/NetworkUtilsLocal.h
@@ -21,61 +21,15 @@
limitations under the License.
*/
-/***************************************************************************
- * NetworkUtils.h
- *
- * Defines interface for marshalling requests and unmarshalling responses
- * between the remote API client and Traffic Manager
- *
- *
- ***************************************************************************/
-
-/*****************************************************************************
- * NetworkUtils.h
- *
- * Defines interface for marshalling requests and unmarshalling responses
- * between the remote API client and Traffic Manager
- *****************************************************************************/
-
-#ifndef _NETWORK_UTILS_H_
-#define _NETWORK_UTILS_H_
+#ifndef _NETWORK_UTILS_LOCAL_H_
+#define _NETWORK_UTILS_LOCAL_H_
#include "ink_defs.h"
-#include "WebUtils.h" // for SocketInfo, socket_read, socket_write
-
#include "mgmtapi.h"
-#include "NetworkUtilsDefs.h"
-
-/*****************************************************************************
- * general socket functions
- *****************************************************************************/
-TSMgmtError socket_flush(struct SocketInfo sock_info);
-TSMgmtError socket_read_n(struct SocketInfo sock_info, char *buf, int bytes);
-TSMgmtError socket_write_n(struct SocketInfo sock_info, const char *buf, int bytes);
/*****************************************************************************
* Unmarshalling/marshalling
*****************************************************************************/
-TSMgmtError preprocess_msg(struct SocketInfo sock_info, OpType * op_t, char **msg);
-
-TSMgmtError parse_request_name_value(char *req, char **name, char **val);
-TSMgmtError parse_record_get_request(char *req, char **rec_name);
-TSMgmtError parse_file_read_request(char *req, TSFileNameT * file);
-TSMgmtError parse_file_write_request(char *req, TSFileNameT * file, int *ver, int *size, char **text);
-TSMgmtError parse_diags_request(char *req, TSDiagsT * mode, char **diag_msg);
-TSMgmtError parse_proxy_state_request(char *req, TSProxyStateT * state, TSCacheClearT * clear);
-
-TSMgmtError send_reply(struct SocketInfo sock_info, TSMgmtError retval);
-TSMgmtError send_reply_list(struct SocketInfo sock_info, TSMgmtError retval, char *list);
-
-TSMgmtError send_record_get_reply(struct SocketInfo sock_info, TSMgmtError retval, void *val, int val_size,
- TSRecordT rec_type, const char *rec_name);
-TSMgmtError send_record_set_reply(struct SocketInfo sock_info, TSMgmtError retval, TSActionNeedT action_need);
-TSMgmtError send_file_read_reply(struct SocketInfo sock_info, TSMgmtError retval, int ver, int size, char *text);
-TSMgmtError send_proxy_state_get_reply(struct SocketInfo sock_info, TSProxyStateT state);
-
-TSMgmtError send_event_active_reply(struct SocketInfo sock_info, TSMgmtError retval, bool active);
-
-TSMgmtError send_event_notification(struct SocketInfo sock_info, TSMgmtEvent * event);
+TSMgmtError preprocess_msg(int fd, void ** req, size_t * reqlen);
-#endif
+#endif /* _NETWORK_UTILS_LOCAL_H_ */
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/6250f431/mgmt/api/NetworkUtilsRemote.cc
----------------------------------------------------------------------
diff --git a/mgmt/api/NetworkUtilsRemote.cc b/mgmt/api/NetworkUtilsRemote.cc
index f7877d0..8c0f544 100644
--- a/mgmt/api/NetworkUtilsRemote.cc
+++ b/mgmt/api/NetworkUtilsRemote.cc
@@ -21,17 +21,6 @@
limitations under the License.
*/
-/*****************************************************************************
- * Filename: NetworkUtilsRemote.cc
- * Purpose: This file contains functions used by remote api client to
- * marshal requests to TM and unmarshal replies from TM.
- * Also stores the information about the client's current
- * socket connection to Traffic Manager
- * Created: 8/9/00
- *
- *
- ***************************************************************************/
-
#include "ink_config.h"
#include "ink_defs.h"
#include "ink_sock.h"
@@ -41,10 +30,10 @@
#include "NetworkUtilsRemote.h"
#include "CoreAPI.h"
#include "CoreAPIShared.h"
-#include "EventRegistration.h"
#include "MgmtSocket.h"
+#include "MgmtMarshall.h"
-extern CallbackTable *remote_event_callbacks;
+CallbackTable *remote_event_callbacks;
int main_socket_fd = -1;
int event_socket_fd = -1;
@@ -53,11 +42,7 @@ int event_socket_fd = -1;
char *main_socket_path = NULL; // "<path>/mgmtapisocket"
char *event_socket_path = NULL; // "<path>/eventapisocket"
-// From CoreAPIRemote.cc
-extern ink_thread ts_test_thread;
-extern ink_thread ts_event_thread;
-extern TSInitOptionT ts_init_options;
-
+static void * event_callback_thread(void * arg);
/**********************************************************************
* Socket Helper Functions
@@ -87,45 +72,23 @@ set_socket_paths(const char *path)
*
* purpose: performs socket write to check status of other end of connection
* input: None
- * output: return 0 if other end of connection closed;
- * return < 0 if socket write failed due to some other error
- * return > 0 if socket write successful
- * notes: send the test msg: UNDEFINED_OP 0(=msg_len)
+ * output: return false if socket write failed due to some other error
+ * return true if socket write successful
+ * notes: send the API_PING test msg
**********************************************************************/
-int
+static bool
socket_test(int fd)
{
- char msg[6]; /* 6 = SIZE_OP + SIZE_LEN */
- int16_t op;
- int32_t msg_len = 0;
- int ret, amount_read = 0;
-
- // write the op
- op = (int16_t) UNDEFINED_OP;
- memcpy(msg, (void *) &op, SIZE_OP_T);
-
- // write msg-len = 0
- memcpy(msg + SIZE_OP_T, &msg_len, SIZE_LEN);
-
- while (amount_read < 6) {
- ret = write(fd, msg + amount_read, 6 - amount_read);
- if (ret < 0) {
- if (errno == EAGAIN)
- continue;
-
- if (errno == EPIPE || errno == ENOTCONN) { // other socket end is closed
- return 0;
- }
+ MgmtMarshallInt optype = API_PING;
+ MgmtMarshallInt now = time(NULL);
- return -1;
- }
- amount_read += ret;
+ if (MGMTAPI_SEND_MESSAGE(fd, API_PING, &optype, &now) == TS_ERR_OKAY) {
+ return true; // write was successful; connection still open
}
- return 1; // write was successful; connection still open
+ return false;
}
-
/***************************************************************************
* connect
*
@@ -267,7 +230,7 @@ reconnect()
// relaunch a new event thread since socket_fd changed
if (0 == (ts_init_options & TS_MGMT_OPT_NO_EVENTS)) {
- ts_event_thread = ink_thread_create(event_poll_thread_main, &event_socket_fd, 0, DEFAULT_STACK_SIZE);
+ ts_event_thread = ink_thread_create(event_poll_thread_main, &event_socket_fd);
// reregister the callbacks on the TM side for this new client connection
if (remote_event_callbacks) {
err = send_register_all_callbacks(event_socket_fd, remote_event_callbacks);
@@ -335,11 +298,10 @@ reconnect_loop(int num_attempts)
* which is not open; which will by default terminate the process;
* client needs to "ignore" the SIGPIPE signal
**************************************************************************/
-TSMgmtError
-connect_and_send(const char *msg, int msg_len)
+static TSMgmtError
+main_socket_reconnect()
{
TSMgmtError err;
- int total_wrote = 0, ret;
// connects to TM and does all necessary event updates required
err = reconnect();
@@ -350,110 +312,36 @@ connect_and_send(const char *msg, int msg_len)
if (mgmt_write_timeout(main_socket_fd, MAX_TIME_WAIT, 0) <= 0) {
return TS_ERR_NET_TIMEOUT;
}
- // connection successfully established; resend msg
- // socket_fd should be new fd
- while (total_wrote < msg_len) {
- ret = write(main_socket_fd, msg + total_wrote, msg_len - total_wrote);
-
- if (ret == 0) {
- return TS_ERR_NET_EOF;
- }
-
- if (ret < 0) {
- if (errno == EAGAIN)
- continue;
- else if (errno == EPIPE || errno == ENOTCONN) {
- // clean-up sockets
- close(main_socket_fd);
- close(event_socket_fd);
- main_socket_fd = -1;
- event_socket_fd = -1;
-
- return TS_ERR_NET_ESTABLISH; // can't establish connection
-
- } else
- return TS_ERR_NET_WRITE; // general socket writing error
-
- }
-
- total_wrote += ret;
- }
-
- return TS_ERR_OKAY;
-}
-
-static TSMgmtError
-socket_read_conn(int fd, uint8_t * buf, size_t needed)
-{
- size_t consumed = 0;
- ssize_t ret;
-
- while (needed > consumed) {
- ret = read(fd, buf, needed - consumed);
-
- if (ret < 0) {
- if (errno == EAGAIN) {
- continue;
- } else {
- return TS_ERR_NET_READ;
- }
- }
-
- if (ret == 0) {
- return TS_ERR_NET_EOF;
- }
-
- buf += ret;
- consumed += ret;
- }
return TS_ERR_OKAY;
}
-/**************************************************************************
- * socket_write_conn
- *
- * purpose: guarantees writing of n bytes; if connection error, tries
- * reconnecting to TM again (in case TM was restarted)
- * input: fd to write to, buffer to write from & number of bytes to write
- * output: TS_ERR_xx
- * note: EPIPE - this happens if client makes a call after stopping then
- * starting TM again.
- * ENOTCONN - this happens if the client tries to make a call after
- * stopping TM, but before starting it; then restarts TM and makes a
- * new call
- * In the send_xx_request function, use a special socket writing function
- * which calls connect_and_send() instead of just the basic connect():
- * 1) if the write returns EPIPE error, then call connect_and_send()
- * 2) return the value returned from EPIPE
- *************************************************************************/
static TSMgmtError
-socket_write_conn(int fd, const char *msg_buf, int bytes)
+socket_write_conn(int fd, const void * msg_buf, size_t bytes)
{
- int ret, byte_wrote = 0;
+ size_t byte_wrote = 0;
// makes sure the descriptor is writable
if (mgmt_write_timeout(fd, MAX_TIME_WAIT, 0) <= 0) {
return TS_ERR_NET_TIMEOUT;
}
+
// write until we fulfill the number
while (byte_wrote < bytes) {
- ret = write(fd, msg_buf + byte_wrote, bytes - byte_wrote);
+ ssize_t ret = write(fd, (const char *)msg_buf + byte_wrote, bytes - byte_wrote);
if (ret == 0) {
return TS_ERR_NET_EOF;
}
if (ret < 0) {
- if (errno == EAGAIN)
+ if (mgmt_transient_error()) {
continue;
-
- else if (errno == EPIPE || errno == ENOTCONN) { // other socket end is closed
- // clean-up of sockets is done in reconnect()
- return connect_and_send(msg_buf, bytes);
- } else
+ } else {
return TS_ERR_NET_WRITE;
+ }
}
+
// we are all good here
byte_wrote += ret;
}
@@ -461,6 +349,33 @@ socket_write_conn(int fd, const char *msg_buf, int bytes)
return TS_ERR_OKAY;
}
+TSMgmtError
+mgmtapi_sender::send(void * msg, size_t msglen) const
+{
+ const unsigned tries = 5;
+ TSMgmtError err;
+
+ for (unsigned i = 0; i < tries; ++i) {
+ err = socket_write_conn(this->fd, msg, msglen);
+ if (err == TS_ERR_OKAY) {
+ return err;
+ }
+
+ // clean-up sockets
+ close(main_socket_fd);
+ close(event_socket_fd);
+ main_socket_fd = -1;
+ event_socket_fd = -1;
+
+ err = main_socket_reconnect();
+ if (err != TS_ERR_OKAY) {
+ return err;
+ }
+ }
+
+ return TS_ERR_NET_ESTABLISH; // can't establish connection
+}
+
/**********************************************************************
* socket_test_thread
*
@@ -488,7 +403,7 @@ socket_test_thread(void *)
{
// loop until client process dies
while (1) {
- if (main_socket_fd == -1 || socket_test(main_socket_fd) <= 0) {
+ if (main_socket_fd == -1 || !socket_test(main_socket_fd)) {
// ASSUMES that in between the time the socket_test is made
// and this reconnect call is made, the main_socket_fd remains
// the same (eg. no one else called reconnect to TM successfully!!
@@ -512,387 +427,6 @@ socket_test_thread(void *)
/**********************************************************************
* MARSHALL REQUESTS
**********************************************************************/
-/**********************************************************************
- * send_request
- *
- * purpose: sends file read request to Traffic Manager
- * input: fd - file descriptor to use to send to
- * op - the type of OpType request sending
- * output: TS_ERR_xx
- * notes: used by operations which don't need to send any additional
- * parameters
- * format: <OpType> <msg_len=0>
- **********************************************************************/
-TSMgmtError
-send_request(int fd, OpType op)
-{
- int16_t op_t;
- int32_t msg_len;
- char msg_buf[SIZE_OP_T + SIZE_LEN];
- TSMgmtError err;
-
- // fill in op type
- op_t = (int16_t) op;
- memcpy(msg_buf, &op_t, SIZE_OP_T);
-
- // fill in msg_len == 0
- msg_len = 0;
- memcpy(msg_buf + SIZE_OP_T, &msg_len, SIZE_LEN);
-
- // send message
- err = socket_write_conn(fd, msg_buf, SIZE_OP_T + SIZE_LEN);
- return err;
-}
-
-/**********************************************************************
- * send_request_name (helper fn)
- *
- * purpose: sends generic request with one string argument name
- * input: fd - file descriptor to use
- * op - .
- * output: TS_ERR_xx
- * note: format: <OpType> <str_len> <string>
- **********************************************************************/
-TSMgmtError
-send_request_name(int fd, OpType op, const char *name)
-{
- char *msg_buf;
- int16_t op_t;
- int32_t msg_len;
- int total_len;
- TSMgmtError err;
-
- if (name == NULL) { //reg callback for all events when op==EVENT_REG_CALLBACK
- msg_len = 0;
- } else {
- msg_len = (int32_t) strlen(name);
- }
-
- total_len = SIZE_OP_T + SIZE_LEN + msg_len;
- msg_buf = (char *)ats_malloc(sizeof(char) * total_len);
-
- // fill in op type
- op_t = (int16_t) op;
- memcpy(msg_buf, (void *) &op_t, SIZE_OP_T);
-
- // fill in msg_len
- memcpy(msg_buf + SIZE_OP_T, (void *) &msg_len, SIZE_LEN);
-
- // fill in name (if NOT NULL)
- if (name)
- memcpy(msg_buf + SIZE_OP_T + SIZE_LEN, name, msg_len);
-
-
- // send message
- err = socket_write_conn(fd, msg_buf, total_len);
- ats_free(msg_buf);
- return err;
-}
-
-/**********************************************************************
- * send_request_name_value (helper fn)
- *
- * purpose: sends generic request with 2 str arguments; a name-value pair
- * input: fd - file descriptor to use
- * op - Op type
- * output: TS_ERR_xx
- * note: format: <OpType> <name-len> <val-len> <name> <val>
- **********************************************************************/
-TSMgmtError
-send_request_name_value(int fd, OpType op, const char *name, const char *value)
-{
- char *msg_buf;
- int msg_pos = 0, total_len;
- int32_t msg_len, name_len, val_size; // these are written to msg
- int16_t op_t;
- TSMgmtError err;
-
- if (!name || !value)
- return TS_ERR_PARAMS;
-
- // set the sizes
- name_len = strlen(name);
- val_size = strlen(value);
- msg_len = (SIZE_LEN * 2) + name_len + val_size;
- total_len = SIZE_OP_T + SIZE_LEN + msg_len;
- msg_buf = (char *)ats_malloc(sizeof(char) * (total_len));
-
- // fill in op type
- op_t = (int16_t) op;
- memcpy(msg_buf + msg_pos, (void *) &op_t, SIZE_OP_T);
- msg_pos += SIZE_OP_T;
-
- // fill in msg length
- memcpy(msg_buf + msg_pos, (void *) &msg_len, SIZE_LEN);
- msg_pos += SIZE_LEN;
-
- // fill in record name length
- memcpy(msg_buf + msg_pos, (void *) &name_len, SIZE_LEN);
- msg_pos += SIZE_LEN;
-
- // fill in record value length
- memcpy(msg_buf + msg_pos, (void *) &val_size, SIZE_LEN);
- msg_pos += SIZE_LEN;
-
- // fill in record name
- memcpy(msg_buf + msg_pos, name, name_len);
- msg_pos += name_len;
-
- // fill in record value
- memcpy(msg_buf + msg_pos, value, val_size);
-
- // send message
- err = socket_write_conn(fd, msg_buf, total_len);
- ats_free(msg_buf);
- return err;
-}
-
-/**********************************************************************
- * send_request_bool (helper)
- *
- * purpose: sends a simple op with a boolean flag argument
- * input: fd - file descriptor to use
- * flag - boolean flag
- * output: TS_ERR_xx
- **********************************************************************/
-TSMgmtError
-send_request_bool(int fd, OpType op, bool flag)
-{
- char msg_buf[SIZE_OP_T + SIZE_LEN + SIZE_BOOL];
- int16_t flag_t;
- int32_t msg_len;
-
- // Fill in the operator
- memcpy(msg_buf, (void *) &op, SIZE_OP_T);
-
- // fill in msg_len = SIZE_BOOL
- msg_len = (int32_t) SIZE_BOOL;
- memcpy(msg_buf + SIZE_OP_T, (void *)&msg_len, SIZE_LEN);
-
- // Fill in the argument (the boolean flag)
- flag_t = flag ? 1 : 0;
- memcpy(msg_buf + SIZE_OP_T + SIZE_LEN, (void *) &flag_t, SIZE_BOOL);
-
- // send message
- return socket_write_conn(fd, msg_buf, SIZE_OP_T + SIZE_LEN + SIZE_BOOL);
-}
-
-/**********************************************************************
- * send_file_read_request
- *
- * purpose: sends file read request to Traffic Manager
- * input: fd - file descriptor to use to send to
- * file - file to read
- * output: TS_ERR_xx
- * notes: first must create the message and then send it across network
- * msg format = <OpType> <msg_len> <TSFileNameT>
- **********************************************************************/
-TSMgmtError
-send_file_read_request(int fd, TSFileNameT file)
-{
- char msg_buf[SIZE_OP_T + SIZE_LEN + SIZE_FILE_T];
- int msg_pos = 0;
- int32_t msg_len = (int32_t) SIZE_FILE_T; //marshalled values
- int16_t op, file_t;
-
- // fill in op type
- op = (int16_t) FILE_READ;
- memcpy(msg_buf + msg_pos, &op, SIZE_OP_T);
- msg_pos += SIZE_OP_T;
-
- // fill in msg length
- memcpy(msg_buf + msg_pos, &msg_len, SIZE_LEN);
- msg_pos += SIZE_LEN;
-
- // fill in file type
- file_t = (int16_t) file;
- memcpy(msg_buf + msg_pos, &file_t, SIZE_FILE_T);
-
- // send message
- return socket_write_conn(fd, msg_buf, SIZE_OP_T + SIZE_LEN + SIZE_FILE_T);
-}
-
-/**********************************************************************
- * send_file_write_request
- *
- * purpose: sends file write request to Traffic Manager
- * input: fd - file descriptor to use
- * file - file to read
- * text - new text to write to specified file
- * size - length of the text
- * ver - version of the file to be written
- * output: TS_ERR_xx
- * notes: format - FILE_WRITE <msg_len> <file_type> <file_ver> <file_size> <text>
- **********************************************************************/
-TSMgmtError
-send_file_write_request(int fd, TSFileNameT file, int ver, int size, char *text)
-{
- char *msg_buf;
- int msg_pos = 0, total_len;
- int32_t msg_len, f_size; //marshalled values
- int16_t op, file_t, f_ver;
- TSMgmtError err;
-
- if (!text)
- return TS_ERR_PARAMS;
-
- msg_len = SIZE_FILE_T + SIZE_VER + SIZE_LEN + size;
- total_len = SIZE_OP_T + SIZE_LEN + msg_len;
- msg_buf = (char *)ats_malloc(sizeof(char) * total_len);
-
- // fill in op type
- op = (int16_t) FILE_WRITE;
- memcpy(msg_buf + msg_pos, &op, SIZE_OP_T);
- msg_pos += SIZE_OP_T;
-
- // fill in msg length
- memcpy(msg_buf + msg_pos, &msg_len, SIZE_LEN);
- msg_pos += SIZE_LEN;
-
- // fill in file type
- file_t = (int16_t) file;
- memcpy(msg_buf + msg_pos, &file_t, SIZE_FILE_T);
- msg_pos += SIZE_FILE_T;
-
- // fill in file version
- f_ver = (int16_t) ver;
- memcpy(msg_buf + msg_pos, &f_ver, SIZE_VER);
- msg_pos += SIZE_VER;
-
- // fill in file size
- f_size = (int32_t) size; //typecase to be safe
- memcpy(msg_buf + msg_pos, &f_size, SIZE_LEN);
- msg_pos += SIZE_LEN;
-
- // fill in text of file
- memcpy(msg_buf + msg_pos, text, size);
-
- // send message
- err = socket_write_conn(fd, msg_buf, total_len);
- ats_free(msg_buf);
- return err;
-}
-
-static TSMgmtError
-send_record_get_x_request(OpType optype, int fd, const char *rec_name)
-{
- char *msg_buf;
- int msg_pos = 0, total_len;
- int16_t op = (int16_t)optype;
- int32_t msg_len;
- TSMgmtError err;
-
- ink_assert(op == RECORD_GET || op == RECORD_MATCH_GET);
-
- if (!rec_name) {
- return TS_ERR_PARAMS;
- }
-
- total_len = SIZE_OP_T + SIZE_LEN + strlen(rec_name);
- msg_buf = (char *)ats_malloc(sizeof(char) * total_len);
-
- // fill in op type
- memcpy(msg_buf + msg_pos, (void *) &op, SIZE_OP_T);
- msg_pos += SIZE_OP_T;
-
- // fill in msg length
- msg_len = (int32_t) strlen(rec_name);
- memcpy(msg_buf + msg_pos, (void *) &msg_len, SIZE_LEN);
- msg_pos += SIZE_LEN;
-
- // fill in record name
- memcpy(msg_buf + msg_pos, rec_name, strlen(rec_name));
-
- // send message
- err = socket_write_conn(fd, msg_buf, total_len);
- ats_free(msg_buf);
- return err;
-}
-
-/**********************************************************************
- * send_record_get_request
- *
- * purpose: sends request to get record value from Traffic Manager
- * input: fd - file descriptor to use
- * rec_name - name of record to retrieve value for
- * output: TS_ERR_xx
- * format: RECORD_GET <msg_len> <rec_name>
- **********************************************************************/
-TSMgmtError
-send_record_get_request(int fd, const char *rec_name)
-{
- return send_record_get_x_request(RECORD_GET, fd, rec_name);
-}
-
-/**********************************************************************
- * send_record_match_request
- *
- * purpose: sends request to get a list of matching record values from Traffic Manager
- * input: fd - file descriptor to use
- * rec_name - regex to match against record names
- * output: TS_ERR_xx
- * format: sequence of RECORD_GET <msg_len> <rec_name>
- **********************************************************************/
-TSMgmtError
-send_record_match_request(int fd, const char *rec_regex)
-{
- return send_record_get_x_request(RECORD_MATCH_GET, fd, rec_regex);
-}
-
-/*------ control functions -------------------------------------------*/
-/**********************************************************************
- * send_proxy_state_get_request
- *
- * purpose: sends request to get the proxy state (on/off)
- * input: fd - file descriptor to use
- * output: TS_ERR_xx
- * note: format: PROXY_STATE_GET 0(=msg_len)
- **********************************************************************/
-TSMgmtError
-send_proxy_state_get_request(int fd)
-{
- TSMgmtError err;
-
- err = send_request(fd, PROXY_STATE_GET);
- return err;
-}
-
-/**********************************************************************
- * send_proxy_state_set_request
- *
- * purpose: sends request to set the proxy state (on/off)
- * input: fd - file descriptor to use
- * state - TS_PROXY_ON, TS_PROXY_OFF
- * output: TS_ERR_xx
- * note: format: PROXY_STATE_SET <msg_len> <TSProxyStateT> <TSCacheClearT>
- **********************************************************************/
-TSMgmtError
-send_proxy_state_set_request(int fd, TSProxyStateT state, TSCacheClearT clear)
-{
- char msg_buf[SIZE_OP_T + SIZE_LEN + SIZE_PROXY_T + SIZE_TS_ARG_T];
- int16_t op, state_t, cache_t;
- int32_t msg_len;
-
- // fill in op type
- op = (int16_t) PROXY_STATE_SET;
- memcpy(msg_buf, (void *) &op, SIZE_OP_T);
-
- // fill in msg_len
- msg_len = (int32_t) (SIZE_PROXY_T + SIZE_TS_ARG_T);
- memcpy(msg_buf + SIZE_OP_T, (void *) &msg_len, SIZE_LEN);
-
- // fill in proxy state
- state_t = (int16_t) state;
- memcpy(msg_buf + SIZE_OP_T + SIZE_LEN, (void *) &state_t, SIZE_PROXY_T);
-
- // fill in cache clearing option
- cache_t = (int16_t) clear;
- memcpy(msg_buf + SIZE_OP_T + SIZE_LEN + SIZE_PROXY_T, (void *) &cache_t, SIZE_TS_ARG_T);
-
- // send message
- return socket_write_conn(fd, msg_buf, SIZE_OP_T + SIZE_LEN + SIZE_PROXY_T + SIZE_TS_ARG_T);
-}
-
/*------ events -------------------------------------------------------*/
@@ -904,11 +438,8 @@ send_proxy_state_set_request(int fd, TSProxyStateT state, TSCacheClearT clear)
* registered for each event
* input: None
* output: return TS_ERR_OKAY only if ALL events sent okay
- * notes: could create a function which just sends a list of all the events to
- * reregister; but actually just reuse the function
- * send_request_name(EVENT_REG_CALLBACK) and call it for each event
* 1) get list of all events with callbacks
- * 2) for each event, call send_request_name
+ * 2) for each event, send a EVENT_REG_CALLBACK message
**********************************************************************/
TSMgmtError
send_register_all_callbacks(int fd, CallbackTable * cb_table)
@@ -920,19 +451,22 @@ send_register_all_callbacks(int fd, CallbackTable * cb_table)
events_with_cb = get_events_with_callbacks(cb_table);
// need to check that the list has all the events registered
if (!events_with_cb) { // all events have registered callback
- err = send_request_name(fd, EVENT_REG_CALLBACK, NULL);
+ MgmtMarshallInt optype = EVENT_REG_CALLBACK;
+ MgmtMarshallString event_name = NULL;
+
+ err = MGMTAPI_SEND_MESSAGE(fd, EVENT_REG_CALLBACK, &optype, &event_name);
if (err != TS_ERR_OKAY)
return err;
} else {
- char *event_name;
- int event_id;
int num_events = queue_len(events_with_cb);
// iterate through the LLQ and send request for each event
for (int i = 0; i < num_events; i++) {
- event_id = *(int *) dequeue(events_with_cb);
- event_name = (char *) get_event_name(event_id);
+ MgmtMarshallInt optype = EVENT_REG_CALLBACK;
+ MgmtMarshallInt event_id = *(int *) dequeue(events_with_cb);
+ MgmtMarshallString event_name = (char *) get_event_name(event_id);
+
if (event_name) {
- err = send_request_name(fd, EVENT_REG_CALLBACK, event_name);
+ err = MGMTAPI_SEND_MESSAGE(fd, EVENT_REG_CALLBACK, &optype, &event_name);
ats_free(event_name); // free memory
if (err != TS_ERR_OKAY) {
send_err = err; // save the type of send error
@@ -960,14 +494,10 @@ send_register_all_callbacks(int fd, CallbackTable * cb_table)
* callbacks registered for that event
* input: None
* output: TS_ERR_OKAY only if all send requests are okay
- * notes: could create a function which just sends a list of all the events to
- * unregister; but actually just reuse the function
- * send_request_name(EVENT_UNREG_CALLBACK) and call it for each event
**********************************************************************/
TSMgmtError
send_unregister_all_callbacks(int fd, CallbackTable * cb_table)
{
- char *event_name;
int event_id;
LLQ *events_with_cb; // list of events with at least one callback
int reg_callback[NUM_EVENTS];
@@ -995,8 +525,10 @@ send_unregister_all_callbacks(int fd, CallbackTable * cb_table)
// send message to TM to mark unregister
for (int k = 0; k < NUM_EVENTS; k++) {
if (reg_callback[k] == 0) { // event has no registered callbacks
- event_name = get_event_name(k);
- err = send_request_name(fd, EVENT_UNREG_CALLBACK, event_name);
+ MgmtMarshallInt optype = EVENT_UNREG_CALLBACK;
+ MgmtMarshallString event_name = get_event_name(k);
+
+ err = MGMTAPI_SEND_MESSAGE(fd, EVENT_UNREG_CALLBACK, &optype, &event_name);
ats_free(event_name);
if (err != TS_ERR_OKAY) {
send_err = err; //save the type of the sending error
@@ -1015,545 +547,165 @@ send_unregister_all_callbacks(int fd, CallbackTable * cb_table)
}
/**********************************************************************
- * send_diags_msg
- *
- * purpose: sends the diag msg across along with they diag msg type
- * input: mode - type of diags msg
- * msg - the diags msg
- * output: TS_ERR_xx
- * note: format: <OpType> <msg_len> <TSDiagsT> <diag_msg_len> <diag_msg>
- **********************************************************************/
-TSMgmtError
-send_diags_msg(int fd, TSDiagsT mode, const char *diag_msg)
-{
- char *msg_buf;
- int16_t op_t, diag_t;
- int32_t msg_len, diag_msg_len;
- int total_len;
- TSMgmtError err;
-
- if (!diag_msg)
- return TS_ERR_PARAMS;
-
- diag_msg_len = (int32_t) strlen(diag_msg);
- msg_len = SIZE_DIAGS_T + SIZE_LEN + diag_msg_len;
- total_len = SIZE_OP_T + SIZE_LEN + msg_len;
- msg_buf = (char *)ats_malloc(sizeof(char) * total_len);
-
- // fill in op type
- op_t = (int16_t) DIAGS;
- memcpy(msg_buf, (void *) &op_t, SIZE_OP_T);
-
- // fill in entire msg len
- memcpy(msg_buf + SIZE_OP_T, (void *) &msg_len, SIZE_LEN);
-
- // fill in TSDiagsT
- diag_t = (int16_t) mode;
- memcpy(msg_buf + SIZE_OP_T + SIZE_LEN, (void *) &diag_t, SIZE_DIAGS_T);
-
- // fill in diags msg_len
- memcpy(msg_buf + SIZE_OP_T + SIZE_LEN + SIZE_DIAGS_T, (void *) &diag_msg_len, SIZE_LEN);
-
- // fill in diags msg
- memcpy(msg_buf + SIZE_OP_T + SIZE_LEN + SIZE_DIAGS_T + SIZE_LEN, diag_msg, diag_msg_len);
-
- // send message
- err = socket_write_conn(fd, msg_buf, total_len);
- ats_free(msg_buf);
- return err;
-}
-
-
-/**********************************************************************
* UNMARSHAL REPLIES
**********************************************************************/
-/* Error handling implementation:
- * All the parsing functions which parse the reply returned from local side
- * also must read the TSERror return value sent from local side; this return
- * value is the same value that will be returned by the parsing function.
- * ALL PARSING FUNCTIONS MUST FIRST CHECK that the retval is TS_ERR_OKAY;
- * if it is not, then DON"T PARSE THE REST OF THE REPLY!!
- */
-
-/* Reading replies:
- * The reading is done in while loop in the parse_xx_reply functions;
- * need to add a timeout so that the function is not left looping and
- * waiting if a msg isn't sent to the socket from local side (eg. TM died)
- */
-
-/**********************************************************************
- * parse_reply
- *
- * purpose: parses a reply from traffic manager. return that error
- * input: fd
- * output: errors on error or fill up class with response &
- * return TS_ERR_xx
- * notes: only returns an TSMgmtError
- **********************************************************************/
-TSMgmtError
-parse_reply(int fd)
-{
- TSMgmtError ret;
- int16_t ret_val;
-
- // check to see if anything to read; wait for specified time = 1 sec
- if (mgmt_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) { // time expires before ready to read
- return TS_ERR_NET_TIMEOUT;
- }
-
- ret = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T);
- if (ret != TS_ERR_OKAY) {
- return ret;
- }
-
- return (TSMgmtError) ret_val;
-}
-
-/**********************************************************************
- * parse_reply_list
- *
- * purpose: parses a TM reply to a request to get a list of string tokens
- * input: fd - socket to read
- * list - will contain delimited string list of tokens
- * output: TS_ERR_xx
- * notes:
- * format: <TSMgmtError> <string_list_len> <delimited_string_list>
- **********************************************************************/
-TSMgmtError
-parse_reply_list(int fd, char **list)
-{
- int16_t ret_val;
- int32_t list_size;
- TSMgmtError err_t;
-
- if (!list) {
- return TS_ERR_PARAMS;
- }
-
- // check to see if anything to read; wait for specified time
- if (mgmt_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) {
- return TS_ERR_NET_TIMEOUT;
- }
-
- // get the return value (TSMgmtError type)
- err_t = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T);
- if (err_t != TS_ERR_OKAY) {
- return err_t;
- }
-
- // if !TS_ERR_OKAY, stop reading rest of msg
- err_t = (TSMgmtError) ret_val;
- if (err_t != TS_ERR_OKAY) {
- return err_t;
- }
-
- // now get size of string event list
- err_t = socket_read_conn(fd, (uint8_t *)&list_size, SIZE_LEN);
- if (err_t != TS_ERR_OKAY) {
- return err_t;
- }
-
- // get the delimited event list string
- *list = (char *)ats_malloc(sizeof(char) * (list_size + 1));
- err_t = socket_read_conn(fd, (uint8_t *)(*list), list_size);
- if (err_t != TS_ERR_OKAY) {
- ats_free(*list);
- *list = NULL;
- return err_t;
- }
-
- // add end of string to end of the record value
- ((char *) (*list))[list_size] = '\0';
- return err_t;
-}
-
-
-/**********************************************************************
- * parse_file_read_reply
- *
- * purpose: parses a file read reply from traffic manager.
- * input: fd
- * ver -
- * size - size of text
- * text -
- * output: errors on error or fill up class with response &
- * return TS_ERR_xx
- * notes: reply format = <TSMgmtError> <file_version> <file_size> <text>
- **********************************************************************/
TSMgmtError
-parse_file_read_reply(int fd, int *ver, int *size, char **text)
+parse_generic_response(OpType optype, int fd)
{
- int32_t f_size;
- int16_t ret_val, f_ver;
- TSMgmtError err_t;
-
- if (!ver || !size || !text)
- return TS_ERR_PARAMS;
-
- // check to see if anything to read; wait for specified time
- if (mgmt_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) { // time expires before ready to read
- return TS_ERR_NET_TIMEOUT;
- }
-
- // get the error return value
- err_t = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T);
- if (err_t != TS_ERR_OKAY) {
- return err_t;
- }
-
- // if !TS_ERR_OKAY, stop reading rest of msg
- err_t = (TSMgmtError) ret_val;
- if (err_t != TS_ERR_OKAY) {
- return err_t;
- }
-
- // now get file version
- err_t = socket_read_conn(fd, (uint8_t *)&f_ver, SIZE_VER);
- if (err_t != TS_ERR_OKAY) {
- return err_t;
- }
-
- *ver = (int) f_ver;
+ TSMgmtError err;
+ MgmtMarshallInt ival;
+ MgmtMarshallData data = { NULL, 0 };
- // now get file size
- err_t = socket_read_conn(fd, (uint8_t *)&f_size, SIZE_LEN);
- if (err_t != TS_ERR_OKAY) {
- return err_t;
+ err = recv_mgmt_message(fd, data);
+ if (err != TS_ERR_OKAY) {
+ return err;
}
- *size = (int) f_size;
-
- // check size before reading text
- if ((*size) <= 0) {
- *text = ats_strndup("", 1); // set to empty string
- return TS_ERR_OKAY;
- }
+ err = recv_mgmt_response(data.ptr, data.len, optype, &ival);
+ ats_free(data.ptr);
- // now we got the size, we can read everything into our msg * then parse it
- *text = (char *)ats_malloc(sizeof(char) * (f_size + 1));
- err_t = socket_read_conn(fd, (uint8_t *)(*text), f_size);
- if (err_t != TS_ERR_OKAY) {
- ats_free(*text);
- *text = NULL;
- return err_t;
+ if (err != TS_ERR_OKAY) {
+ return err;
}
- (*text)[f_size] = '\0'; // end the string
- return TS_ERR_OKAY;
+ return (TSMgmtError)ival;
}
/**********************************************************************
- * parse_record_get_reply
+ * event_poll_thread_main
*
- * purpose: parses a record_get reply from traffic manager.
- * input: fd
- * retval -
- * rec_type - the type of the record
- * rec_value - the value of the record in string format
- * output: errors on error or fill up class with response &
- * return SUCC
- * notes: reply format = <TSMgmtError> <val_size> <name_size> <rec_type> <record_value> <record_name>
- * Zero-length values and names are supported. If the size field is 0, the corresponding
- * value field is not transmitted.
- * It's the responsibility of the calling function to conver the rec_value
- * based on the rec_type!!
+ * purpose: thread listens on the client's event socket connection;
+ * only reads from the event_socket connection and
+ * processes EVENT_NOTIFY messages; each time client
+ * makes new event-socket connection to TM, must launch
+ * a new event_poll_thread_main thread
+ * input: arg - contains the socket_fd to listen on
+ * output: NULL - if error
+ * notes: each time the client's socket connection to TM is reset
+ * a new thread will be launched as old one dies; there are
+ * only two places where a new thread is created:
+ * 1) when client first connects (TSInit call)
+ * 2) client reconnects() due to a TM restart
+ * Uses blocking socket; so blocks until receives an event notification.
+ * Shouldn't need to use select since only waiting for a notification
+ * message from event_callback_main thread!
**********************************************************************/
-TSMgmtError
-parse_record_get_reply(int fd, TSRecordT * rec_type, void **rec_val, char **rec_name)
+void *
+event_poll_thread_main(void *arg)
{
- int16_t ret_val, rec_t;
- int32_t val_size, name_size;
- TSMgmtError err_t;
-
- if (!rec_type || !rec_val) {
- return TS_ERR_PARAMS;
- }
-
- *rec_name = NULL;
- *rec_val = NULL;
-
- // check to see if anything to read; wait for specified time
- if (mgmt_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) { //time expired before ready to read
- return TS_ERR_NET_TIMEOUT;
- }
-
- // get the return value (TSMgmtError type)
- err_t = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T);
- if (err_t != TS_ERR_OKAY) {
- goto fail;
- }
-
- // if !TS_ERR_OKAY, stop reading rest of msg
- err_t = (TSMgmtError) ret_val;
- if (err_t != TS_ERR_OKAY) {
- goto fail;
- }
-
- // now get size of record_value
- err_t = socket_read_conn(fd, (uint8_t *)&val_size, SIZE_LEN);
- if (err_t != TS_ERR_OKAY) {
- goto fail;
- }
-
- // now get size of record name
- err_t = socket_read_conn(fd, (uint8_t *)&name_size, SIZE_LEN);
- if (err_t != TS_ERR_OKAY) {
- goto fail;
- }
+ int sock_fd;
- // get the record type
- err_t = socket_read_conn(fd, (uint8_t *)&rec_t, SIZE_REC_T);
- if (err_t != TS_ERR_OKAY) {
- goto fail;
- }
+ sock_fd = *((int *) arg); // should be same as event_socket_fd
- *rec_type = (TSRecordT) rec_t;
+ // the sock_fd is going to be the one we listen for events on
+ while (1) {
+ TSMgmtError ret;
+ TSMgmtEvent * event = NULL;
- // get record value (if there is one)
- if (val_size) {
- if (*rec_type == TS_REC_STRING) {
- *rec_val = ats_malloc(sizeof(char) * (val_size + 1));
- } else {
- *rec_val = ats_malloc(sizeof(char) * (val_size));
- }
+ MgmtMarshallData reply = { NULL, 0 };
+ MgmtMarshallInt optype;
+ MgmtMarshallString name = NULL;
+ MgmtMarshallString desc = NULL;
- err_t = socket_read_conn(fd, (uint8_t *)(*rec_val), val_size);
- if (err_t != TS_ERR_OKAY) {
- goto fail;
+ // possible sock_fd is invalid if TM restarts and client reconnects
+ if (sock_fd < 0) {
+ break;
}
- // add end of string to end of the record value
- if (*rec_type == TS_REC_STRING) {
- ((char *) (*rec_val))[val_size] = '\0';
+ // Just wait until we get an event or error. The 0 return from select(2)
+ // means we timed out ...
+ if (mgmt_read_timeout(main_socket_fd, MAX_TIME_WAIT, 0) == 0) {
+ continue;
}
- }
- // get the record name (if there is one)
- if (name_size) {
- *rec_name = (char *)ats_malloc(sizeof(char) * (name_size + 1));
- err_t = socket_read_conn(fd, (uint8_t *)(*rec_name), name_size);
- if (err_t != TS_ERR_OKAY) {
- goto fail;
+ ret = recv_mgmt_message(main_socket_fd, reply);
+ if (ret != TS_ERR_OKAY) {
+ break;
}
- (*rec_name)[name_size] = '\0';
- }
-
- return TS_ERR_OKAY;
-
-fail:
- ats_free(*rec_val);
- ats_free(*rec_name);
- *rec_val = NULL;
- *rec_name = NULL;
- return err_t;
-}
-
-/**********************************************************************
- * parse_record_set_reply
- *
- * purpose: parses a record_set reply from traffic manager.
- * input: fd
- * action_need - will contain the type of action needed from the set
- * output: TS_ERR_xx
- * notes: reply format = <TSMgmtError> <val_size> <rec_type> <record_value>
- * It's the responsibility of the calling function to conver the rec_value
- * based on the rec_type!!
- **********************************************************************/
-TSMgmtError
-parse_record_set_reply(int fd, TSActionNeedT * action_need)
-{
- int16_t ret_val, action_t;
- TSMgmtError err_t;
-
- if (!action_need)
- return TS_ERR_PARAMS;
-
- // check to see if anything to read; wait for specified time
- if (mgmt_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) {
- return TS_ERR_NET_TIMEOUT;
- }
-
- // get the return value (TSMgmtError type)
- err_t = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T);
- if (err_t != TS_ERR_OKAY) {
- return err_t;
- }
-
- // if !TS_ERR_OKAY, stop reading rest of msg
- err_t = (TSMgmtError) ret_val;
- if (err_t != TS_ERR_OKAY) {
- return err_t;
- }
-
- // now get the action needed
- err_t = socket_read_conn(fd, (uint8_t *)&action_t, SIZE_ACTION_T);
- if (err_t != TS_ERR_OKAY) {
- return err_t;
- }
-
- *action_need = (TSActionNeedT) action_t;
- return err_t;
-}
+ ret = recv_mgmt_request(reply.ptr, reply.len, EVENT_NOTIFY, &optype, &name, &desc);
+ ats_free(reply.ptr);
+ if (ret != TS_ERR_OKAY) {
+ ats_free(name);
+ ats_free(desc);
+ break;
+ }
-/**********************************************************************
- * parse_proxy_state_get_reply
- *
- * purpose: parses a TM reply to a PROXY_STATE_GET request
- * input: fd
- * state - will contain the state of the proxy
- * output: TS_ERR_xx
- * notes: function is DIFFERENT becuase it has NO TSMgmtError at head of msg
- * format: <TSProxyStateT>
- **********************************************************************/
-TSMgmtError
-parse_proxy_state_get_reply(int fd, TSProxyStateT * state)
-{
- TSMgmtError err_t;
- int16_t state_t;
+ ink_assert(optype == EVENT_NOTIFY);
- if (!state)
- return TS_ERR_PARAMS;
+ // The new event takes ownership of the message strings.
+ event = TSEventCreate();
+ event->name = name;
+ event->id = get_event_id(name);
+ event->description = desc;
- // check to see if anything to read; wait for specified time
- if (mgmt_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) { // time expires before ready to read
- return TS_ERR_NET_TIMEOUT;
+ // got event notice; spawn new thread to handle the event's callback functions
+ ink_thread_create(event_callback_thread, (void *)event);
}
- // now get proxy state
- err_t = socket_read_conn(fd, (uint8_t *)&state_t, SIZE_PROXY_T);
- if (err_t != TS_ERR_OKAY) {
- return err_t;
- }
-
- *state = (TSProxyStateT) state_t;
- return TS_ERR_OKAY;
+ ink_thread_exit(NULL);
+ return NULL;
}
-/*------------- events ---------------------------------------------*/
-
/**********************************************************************
- * parse_event_active_reply
+ * event_callback_thread
*
- * purpose: parses a TM reply to a request to get status of an event
- * input: fd - socket to read
- * is_active - set to true if event is active; false otherwise
- * output: TS_ERR_xx
- * notes:
- * format: reply format = <TSMgmtError> <bool>
+ * purpose: Given an event, determines and calls the registered cb functions
+ * in the CallbackTable for remote events
+ * input: arg - should be an TSMgmtEvent with the event info sent from TM msg
+ * output: returns when done calling all the callbacks
+ * notes: None
**********************************************************************/
-TSMgmtError
-parse_event_active_reply(int fd, bool * is_active)
+static void *
+event_callback_thread(void *arg)
{
- int16_t ret_val, active;
- TSMgmtError err_t;
-
- if (!is_active)
- return TS_ERR_PARAMS;
-
- // check to see if anything to read; wait for specified time
- if (mgmt_read_timeout(fd, MAX_TIME_WAIT, 0) <= 0) {
- return TS_ERR_NET_TIMEOUT;
- }
+ TSMgmtEvent *event_notice;
+ EventCallbackT *event_cb;
+ int index;
- // get the return value (TSMgmtError type)
- err_t = socket_read_conn(fd, (uint8_t *)&ret_val, SIZE_ERR_T);
- if (err_t != TS_ERR_OKAY) {
- return err_t;
- }
-
- // if !TS_ERR_OKAY, stop reading rest of msg
- err_t = (TSMgmtError) ret_val;
- if (err_t != TS_ERR_OKAY) {
- return err_t;
- }
+ event_notice = (TSMgmtEvent *) arg;
+ index = (int) event_notice->id;
+ LLQ *func_q; // list of callback functions need to call
- // now get the boolean
- err_t = socket_read_conn(fd, (uint8_t *)&active, SIZE_BOOL);
- if (err_t != TS_ERR_OKAY) {
- return err_t;
+ func_q = create_queue();
+ if (!func_q) {
+ if (event_notice)
+ TSEventDestroy(event_notice);
+ return NULL;
}
- *is_active = (bool) active;
- return err_t;
-}
+ // obtain lock
+ ink_mutex_acquire(&remote_event_callbacks->event_callback_lock);
-/**********************************************************************
- * parse_event_notification
- *
- * purpose: parses the event notification message from TM when an event
- * is signalled; stores the event info in the TSMgmtEvent
- * input: fd - socket to read
- * event - where the event info from msg is stored
- * output:TS_ERR_OKAY, TS_ERR_NET_READ, TS_ERR_NET_EOF, TS_ERR_PARAMS
- * notes:
- * format: <OpType> <event_name_len> <event_name> <desc_len> <desc>
- **********************************************************************/
-TSMgmtError
-parse_event_notification(int fd, TSMgmtEvent * event)
-{
- OpType msg_type;
- int16_t type_op;
- int32_t msg_len;
- char *event_name = NULL, *desc = NULL;
- TSMgmtError err_t;
-
- if (!event)
- return TS_ERR_PARAMS;
-
- // read the operation type; should be EVENT_NOTIFY
- err_t = socket_read_conn(fd, (uint8_t *)&type_op, SIZE_OP_T);
- if (err_t != TS_ERR_OKAY) {
- return err_t;
- }
+ TSEventSignalFunc cb;
- // got the message type; the msg_type should be EVENT_NOTIFY
- msg_type = (OpType) type_op;
- if (msg_type != EVENT_NOTIFY) {
- return TS_ERR_FAIL;
- }
+ // check if we have functions to call
+ if (remote_event_callbacks->event_callback_l[index] && (!queue_is_empty(remote_event_callbacks->event_callback_l[index]))) {
+ int queue_depth = queue_len(remote_event_callbacks->event_callback_l[index]);
- // read in event name length
- err_t = socket_read_conn(fd, (uint8_t *)&msg_len, SIZE_LEN);
- if (err_t != TS_ERR_OKAY) {
- return err_t;
- }
-
- // read the event name
- event_name = (char *)ats_malloc(sizeof(char) * (msg_len + 1));
- err_t = socket_read_conn(fd, (uint8_t *)event_name, msg_len);
- if (err_t != TS_ERR_OKAY) {
- goto fail;
- }
-
- event_name[msg_len] = '\0'; // end the string
-
- // read in event description length
- err_t = socket_read_conn(fd, (uint8_t *)&msg_len, SIZE_LEN);
- if (err_t != TS_ERR_OKAY) {
- goto fail;
+ for (int i = 0; i < queue_depth; i++) {
+ event_cb = (EventCallbackT *) dequeue(remote_event_callbacks->event_callback_l[index]);
+ cb = event_cb->func;
+ enqueue(remote_event_callbacks->event_callback_l[index], event_cb);
+ enqueue(func_q, (void *) cb); // add callback function only to list
+ }
}
+ // release lock
+ ink_mutex_release(&remote_event_callbacks->event_callback_lock);
- // read the event description
- desc = (char *)ats_malloc(sizeof(char) * (msg_len + 1));
- err_t = socket_read_conn(fd, (uint8_t *)desc, msg_len);
- if (err_t != TS_ERR_OKAY) {
- goto fail;
+ // execute the callback function
+ while (!queue_is_empty(func_q)) {
+ cb = (TSEventSignalFunc) dequeue(func_q);
+ (*cb) (event_notice->name, event_notice->description, event_notice->priority, NULL);
}
- desc[msg_len] = '\0'; // end the string
+ // clean up event notice
+ TSEventDestroy(event_notice);
+ delete_queue(func_q);
- // fill in event info
- event->name = event_name;
- event->id = (int) get_event_id(event_name);
- event->description = desc;
-
- return TS_ERR_OKAY;
-
-fail:
- ats_free(event_name);
- ats_free(desc);
- return err_t;
+ // all done!
+ ink_thread_exit(NULL);
+ return NULL;
}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/6250f431/mgmt/api/NetworkUtilsRemote.h
----------------------------------------------------------------------
diff --git a/mgmt/api/NetworkUtilsRemote.h b/mgmt/api/NetworkUtilsRemote.h
index 9a2ac7d..dfa3e74 100644
--- a/mgmt/api/NetworkUtilsRemote.h
+++ b/mgmt/api/NetworkUtilsRemote.h
@@ -32,28 +32,25 @@
*
***************************************************************************/
-#ifndef _NETWORK_UTILS_H_
-#define _NETWORK_UTILS_H_
-
-#include "ink_defs.h"
-#include "ink_mutex.h"
+#ifndef _NETWORK_UTILS_REMOTE_H_
+#define _NETWORK_UTILS_REMOTE_H_
#include "mgmtapi.h"
-#include "NetworkUtilsDefs.h"
+#include "NetworkMessage.h"
#include "EventCallback.h"
-#ifdef __cplusplus
-extern "C"
-{
-#endif /* __cplusplus */
+extern int main_socket_fd;
+extern int event_socket_fd;
+extern CallbackTable *remote_event_callbacks;
-const int DEFAULT_STACK_SIZE = 1048576; // 1MB stack
+// From CoreAPIRemote.cc
+extern ink_thread ts_event_thread;
+extern TSInitOptionT ts_init_options;
/**********************************************************************
* Socket Helper Functions
**********************************************************************/
void set_socket_paths(const char *path);
-int socket_test(int fd);
/* The following functions are specific for a client connection; uses
* the client connection information stored in the variables in
@@ -63,47 +60,30 @@ TSMgmtError ts_connect(); /* TODO: update documenation, Renamed due to conflict
TSMgmtError disconnect();
TSMgmtError reconnect();
TSMgmtError reconnect_loop(int num_attempts);
-TSMgmtError connect_and_send(const char *msg, int msg_len);
+
void *socket_test_thread(void *arg);
+void *event_poll_thread_main(void *arg);
+
+struct mgmtapi_sender : public mgmt_message_sender
+{
+ explicit mgmtapi_sender(int _fd) : fd(_fd) {}
+ virtual TSMgmtError send(void * msg, size_t msglen) const;
+
+ int fd;
+};
+
+#define MGMTAPI_SEND_MESSAGE(fd, optype, ...) send_mgmt_request(mgmtapi_sender(fd), (optype), __VA_ARGS__)
/*****************************************************************************
* Marshalling (create requests)
*****************************************************************************/
-TSMgmtError send_request(int fd, OpType op);
-TSMgmtError send_request_name(int fd, OpType op, const char *name);
-TSMgmtError send_request_name_value(int fd, OpType op, const char *name, const char *value);
-TSMgmtError send_request_bool(int fd, OpType op, bool flag);
-
-TSMgmtError send_file_read_request(int fd, TSFileNameT file);
-TSMgmtError send_file_write_request(int fd, TSFileNameT file, int ver, int size, char *text);
-TSMgmtError send_record_get_request(int fd, const char *rec_name);
-TSMgmtError send_record_match_request(int fd, const char *rec_regex);
-
-TSMgmtError send_proxy_state_set_request(int fd, TSProxyStateT state, TSCacheClearT clear);
TSMgmtError send_register_all_callbacks(int fd, CallbackTable * cb_table);
TSMgmtError send_unregister_all_callbacks(int fd, CallbackTable * cb_table);
-TSMgmtError send_diags_msg(int fd, TSDiagsT mode, const char *diag_msg);
-
/*****************************************************************************
* Un-marshalling (parse responses)
*****************************************************************************/
-TSMgmtError parse_reply(int fd);
-TSMgmtError parse_reply_list(int fd, char **list);
-
-TSMgmtError parse_file_read_reply(int fd, int *version, int *size, char **text);
-
-TSMgmtError parse_record_get_reply(int fd, TSRecordT * rec_type, void **rec_val, char **rec_name);
-TSMgmtError parse_record_set_reply(int fd, TSActionNeedT * action_need);
-
-TSMgmtError parse_proxy_state_get_reply(int fd, TSProxyStateT * state);
-
-TSMgmtError parse_event_active_reply(int fd, bool * is_active);
-TSMgmtError parse_event_notification(int fd, TSMgmtEvent * event);
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
+TSMgmtError parse_generic_response(OpType optype, int fd);
-#endif
+#endif /* _NETWORK_UTILS_REMOTE_H_ */