You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by cm...@apache.org on 2015/04/30 20:31:13 UTC
incubator-htrace git commit: HTRACE-159. libhtrace.so: use HRPC
endpoint of htraced (cmccabe)
Repository: incubator-htrace
Updated Branches:
refs/heads/master b4c968740 -> a3c25b94a
HTRACE-159. libhtrace.so: use HRPC endpoint of htraced (cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/a3c25b94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/a3c25b94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/a3c25b94
Branch: refs/heads/master
Commit: a3c25b94aa5f7c3e04538e4f37ab6dba83b1480d
Parents: b4c9687
Author: Colin P. Mccabe <cm...@apache.org>
Authored: Thu Apr 30 11:23:52 2015 -0700
Committer: Colin P. Mccabe <cm...@apache.org>
Committed: Thu Apr 30 11:28:04 2015 -0700
----------------------------------------------------------------------
htrace-c/src/CMakeLists.txt | 11 +-
htrace-c/src/core/conf.c | 4 +-
htrace-c/src/core/htrace.h | 14 +-
htrace-c/src/receiver/curl.c | 124 -----
htrace-c/src/receiver/curl.h | 60 ---
htrace-c/src/receiver/hrpc.c | 512 +++++++++++++++++++
htrace-c/src/receiver/hrpc.h | 90 ++++
htrace-c/src/receiver/htraced.c | 227 ++++----
htrace-c/src/test/htraced_rcv-unit.c | 2 +-
htrace-c/src/test/mini_htraced.c | 17 +-
htrace-c/src/test/mini_htraced.h | 5 +
htrace-c/src/test/rtest.c | 10 +-
htrace-c/src/test/string-unit.c | 45 ++
htrace-c/src/util/string.c | 55 ++
htrace-c/src/util/string.h | 22 +
htrace-c/src/util/time.c | 8 +
htrace-c/src/util/time.h | 9 +
.../go/src/org/apache/htrace/client/client.go | 21 +-
.../go/src/org/apache/htrace/client/hclient.go | 12 +-
.../go/src/org/apache/htrace/htraced/hrpc.go | 89 +++-
20 files changed, 983 insertions(+), 354 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/htrace-c/src/CMakeLists.txt b/htrace-c/src/CMakeLists.txt
index f3465f6..3d51968 100644
--- a/htrace-c/src/CMakeLists.txt
+++ b/htrace-c/src/CMakeLists.txt
@@ -48,8 +48,6 @@ get_filename_component(HTRACE_ABSPATH "../../htrace-htraced/src/go/build/htrace"
get_filename_component(HTRACED_ABSPATH "../../htrace-htraced/src/go/build/htraced" ABSOLUTE)
CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/test/test_config.h.cmake ${CMAKE_BINARY_DIR}/test/test_config.h)
-find_package(CURL REQUIRED)
-
find_package(PkgConfig)
pkg_check_modules(PC_JSON-C QUIET json-c)
find_path(JSON_C_INCLUDE_DIR "json.h"
@@ -61,8 +59,7 @@ ELSE(JSON_C_INCLUDE_DIR AND JSON_C_LIBRARY)
MESSAGE(FATAL_ERROR "Failed to find libjson-c. Try installing libjson-c with apt-get or yum, or install it manually from http://oss.metaparadigm.com/json-c/")
ENDIF(JSON_C_INCLUDE_DIR AND JSON_C_LIBRARY)
-include_directories(${CURL_INCLUDE_DIR}
- ${CMAKE_BINARY_DIR}
+include_directories(${CMAKE_BINARY_DIR}
${CMAKE_SOURCE_DIR})
if (${CMAKE_SYSTEM_NAME} STREQUAL "Linux")
@@ -77,7 +74,7 @@ set(SRC_ALL
core/htracer.c
core/scope.c
core/span.c
- receiver/curl.c
+ receiver/hrpc.c
receiver/htraced.c
receiver/local_file.c
receiver/noop.c
@@ -94,9 +91,7 @@ set(SRC_ALL
util/time.c
)
-set(DEPS_ALL
- ${CURL_LIBRARY}
- pthread)
+set(DEPS_ALL pthread)
# The unit test version of the library, which exposes all symbols.
add_library(htrace_test STATIC
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/core/conf.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/core/conf.c b/htrace-c/src/core/conf.c
index 936e224..a561906 100644
--- a/htrace-c/src/core/conf.c
+++ b/htrace-c/src/core/conf.c
@@ -29,7 +29,9 @@
#define HTRACE_DEFAULT_CONF_KEYS (\
HTRACE_PROB_SAMPLER_FRACTION_KEY "=0.01"\
";" HTRACED_BUFFER_SIZE_KEY "=67108864"\
- ";" HTRACED_SEND_TIMEOUT_MS_KEY "=120000"\
+ ";" HTRACED_FLUSH_INTERVAL_MS_KEY "=120000"\
+ ";" HTRACED_WRITE_TIMEO_MS_KEY "=60000"\
+ ";" HTRACED_READ_TIMEO_MS_KEY "=60000"\
";" HTRACE_PROCESS_ID "=%{tname}/%{ip}"\
";" HTRACED_ADDRESS_KEY "=localhost:9095"\
)
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/core/htrace.h
----------------------------------------------------------------------
diff --git a/htrace-c/src/core/htrace.h b/htrace-c/src/core/htrace.h
index cdebd31..4e2f1f9 100644
--- a/htrace-c/src/core/htrace.h
+++ b/htrace-c/src/core/htrace.h
@@ -115,9 +115,19 @@ extern "C" {
#define HTRACED_ADDRESS_KEY "htraced.address"
/**
- * The timeout to use when sending spans to the htraced server.
+ * The maximum length of time to go before flushing spans to the htraced server.
*/
-#define HTRACED_SEND_TIMEOUT_MS_KEY "htraced.send.timeout.ms"
+#define HTRACED_FLUSH_INTERVAL_MS_KEY "htraced.flush.interval.ms"
+
+/**
+ * The TCP write timeout to use when communicating with the htraced server.
+ */
+#define HTRACED_WRITE_TIMEO_MS_KEY "htraced.write.timeo.ms"
+
+/**
+ * The TCP read timeout to use when communicating with the htraced server.
+ */
+#define HTRACED_READ_TIMEO_MS_KEY "htraced.read.timeo.ms"
/**
* The size of the circular buffer to use in the htraced receiver.
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/receiver/curl.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/receiver/curl.c b/htrace-c/src/receiver/curl.c
deleted file mode 100644
index 0905dd4..0000000
--- a/htrace-c/src/receiver/curl.c
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "core/conf.h"
-#include "util/log.h"
-
-#include <curl/curl.h>
-#include <pthread.h>
-#include <stdint.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-/**
- * Unfortunately, libcurl requires a non-threadsafe initialization function to
- * be called before it is usable. This is unfortunate for a library like
- * libhtrace, which is designed to be used in a multi-threaded context.
- *
- * This mutex protects us against an application creating two htraced receivers
- * at around the same time, and calling that non-threadsafe initialization
- * function.
- *
- * Of course, this doesn't protect us against the application also initializing
- * libcurl. We can protect against that by statically linking a private copy of
- * libcurl into libhtrace, so that we will be initializing and using our own
- * private copy of libcurl rather than the application's.
- */
-static pthread_mutex_t g_curl_refcnt_lock = PTHREAD_MUTEX_INITIALIZER;
-
-/**
- * The current number of CURL handles that are open.
- */
-static int64_t g_curl_refcnt;
-
-static int curl_addref(struct htrace_log *lg)
-{
- int success = 0;
- CURLcode curl_err = 0;
-
- pthread_mutex_lock(&g_curl_refcnt_lock);
- if (g_curl_refcnt >= 1) {
- g_curl_refcnt++;
- success = 1;
- goto done;
- }
- curl_err = curl_global_init(CURL_GLOBAL_ALL);
- if (curl_err) {
- htrace_log(lg, "curl_global_init failed: error %d (%s)\n",
- curl_err, curl_easy_strerror(curl_err));
- goto done;
- }
- htrace_log(lg, "successfully initialized libcurl...\n");
- g_curl_refcnt = 1;
- success = 1;
-
-done:
- pthread_mutex_unlock(&g_curl_refcnt_lock);
- return success;
-}
-
-static void curl_unref(struct htrace_log *lg)
-{
- pthread_mutex_lock(&g_curl_refcnt_lock);
- g_curl_refcnt--;
- if (g_curl_refcnt > 0) {
- goto done;
- }
- curl_global_cleanup();
- htrace_log(lg, "shut down libcurl...\n");
-done:
- pthread_mutex_unlock(&g_curl_refcnt_lock);
-}
-
-CURL* htrace_curl_init(struct htrace_log *lg, const struct htrace_conf *conf)
-{
- CURL *curl = NULL;
- int success = 0;
-
- if (!curl_addref(lg)) {
- return NULL;
- }
- curl = curl_easy_init();
- if (!curl) {
- htrace_log(lg, "curl_easy_init failed.\n");
- goto done;
- }
- success = 1;
-
-done:
- if (!success) {
- if (curl) {
- curl_easy_cleanup(curl);
- }
- curl_unref(lg);
- return NULL;
- }
- return curl;
-}
-
-void htrace_curl_free(struct htrace_log *lg, CURL *curl)
-{
- if (!curl) {
- return;
- }
- curl_easy_cleanup(curl);
- curl_unref(lg);
-}
-
-// vim:ts=4:sw=4:et
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/receiver/curl.h
----------------------------------------------------------------------
diff --git a/htrace-c/src/receiver/curl.h b/htrace-c/src/receiver/curl.h
deleted file mode 100644
index 9249cd3..0000000
--- a/htrace-c/src/receiver/curl.h
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef APACHE_HTRACE_RECEIVER_CURL_H
-#define APACHE_HTRACE_RECEIVER_CURL_H
-
-/**
- * @file curl.h
- *
- * Utility functions wrapping libcurl.
- *
- * This is an internal header, not intended for external use.
- */
-
-#include <curl/curl.h> // for the CURL type
-
-struct htrace_conf;
-struct htrace_log;
-
-/**
- * Initialize a libcurl handle.
- *
- * This function also takes care of calling curl_global_init if necessary.
- *
- * @param lg The HTrace log to use for error messages.
- * @param conf The HTrace configuration to use.
- *
- * @return A libcurl handle, or NULL on failure.
- */
-CURL* htrace_curl_init(struct htrace_log *lg, const struct htrace_conf *conf);
-
-/**
- * Free a libcurl handle.
- *
- * This function also takes care of calling curl_global_cleanup if necessary.
- *
- * @param lg The HTrace log to use for error messages.
- *
- * @param curl The libcurl handle.
- */
-void htrace_curl_free(struct htrace_log *lg, CURL *curl);
-
-#endif
-
-// vim: ts=4: sw=4: et
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/receiver/hrpc.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/receiver/hrpc.c b/htrace-c/src/receiver/hrpc.c
new file mode 100644
index 0000000..32a1f4a
--- /dev/null
+++ b/htrace-c/src/receiver/hrpc.c
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "receiver/hrpc.h"
+#include "util/log.h"
+#include "util/string.h"
+#include "util/time.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <inttypes.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <pthread.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#if defined(__OpenBSD__)
+#include <sys/types.h>
+#define be16toh(x) betoh16(x)
+#define be32toh(x) betoh32(x)
+#define be64toh(x) betoh64(x)
+#elif defined(__NetBSD__) || defined(__FreeBSD__)
+#include <sys/endian.h>
+#else
+#include <endian.h>
+#endif
+
+/**
+ * @file hrpc.c
+ *
+ * Implements sending messages via HRPC.
+ */
+
+#define HRPC_MAGIC 0x48545243U
+
+#define MAX_HRPC_ERROR_LENGTH (4 * 1024 * 1024)
+
+#define MAX_HRPC_BODY_LENGTH (64 * 1024 * 1024)
+
+#define DEFAULT_HTRACED_HRPC_PORT 9075
+
+#define ADDR_STR_MAX (2 + INET6_ADDRSTRLEN + sizeof(":65536"))
+
+struct hrpc_client {
+ /**
+ * The HTrace log object.
+ */
+ struct htrace_log *lg;
+
+ /**
+ * The tcp write timeout in milliseconds.
+ */
+ uint64_t write_timeo_ms;
+
+ /**
+ * The tcp read timeout in milliseconds.
+ */
+ uint64_t read_timeo_ms;
+
+ /**
+ * The hostname or IP address. Malloced.
+ */
+ char *host;
+
+ /**
+ * The port.
+ */
+ int port;
+
+ /**
+ * The host:port string. Malloced.
+ */
+ char *endpoint;
+
+ /**
+ * Socket of current open connection, or -1 if there is no currently open
+ * connection.
+ */
+ int sock;
+
+ /**
+ * The sequence number on the connection.
+ */
+ uint64_t seq;
+
+ /**
+ * The remote IP address.
+ */
+ char addr_str[ADDR_STR_MAX];
+};
+
+struct hrpc_req_header {
+ uint32_t magic;
+ uint32_t method_id;
+ uint64_t seq;
+ uint32_t length;
+} __attribute__((packed,aligned(4)));
+
+struct hrpc_resp_header {
+ uint64_t seq;
+ uint32_t method_id;
+ uint32_t err_length;
+ uint32_t length;
+} __attribute__((packed,aligned(4)));
+
+
+static int hrpc_client_open_conn(struct hrpc_client *hcli);
+static int try_connect(struct hrpc_client *hcli, struct addrinfo *p);
+static int set_socket_read_and_write_timeout(struct hrpc_client *hcli,
+ int sock);
+static int hrpc_client_send_req(struct hrpc_client *hcli, uint32_t method_id,
+ const void *req, size_t req_len, uint64_t *seq);
+static int hrpc_client_rcv_resp(struct hrpc_client *hcli, uint32_t method_id,
+ uint64_t seq, char **err, void **resp,
+ size_t *resp_len);
+
+struct hrpc_client *hrpc_client_alloc(struct htrace_log *lg,
+ uint64_t write_timeo_ms, uint64_t read_timeo_ms,
+ const char *endpoint)
+{
+ struct hrpc_client *hcli;
+
+ hcli = calloc(1, sizeof(*hcli));
+ if (!hcli) {
+ htrace_log(lg, "Failed to allocate memory for the HRPC client.\n");
+ goto error;
+ }
+ hcli->lg = lg;
+ hcli->write_timeo_ms = write_timeo_ms;
+ hcli->read_timeo_ms = read_timeo_ms;
+ hcli->sock = -1;
+ hcli->endpoint = strdup(endpoint);
+ if (!hcli->endpoint) {
+ htrace_log(lg, "Failed to allocate memory for the endpoint string.\n");
+ goto error;
+ }
+ if (!parse_endpoint(lg, endpoint, DEFAULT_HTRACED_HRPC_PORT,
+ &hcli->host, &hcli->port)) {
+ goto error;
+ }
+ return hcli;
+
+error:
+ if (hcli) {
+ free(hcli->host);
+ free(hcli->endpoint);
+ free(hcli);
+ }
+ return NULL;
+}
+
+void hrpc_client_free(struct hrpc_client *hcli)
+{
+ if (!hcli) {
+ return;
+ }
+ if (hcli->sock >= 0) {
+ close(hcli->sock);
+ hcli->sock = -1;
+ }
+ free(hcli->host);
+ free(hcli->endpoint);
+ free(hcli);
+}
+
+int hrpc_client_call(struct hrpc_client *hcli, uint32_t method_id,
+ const void *req, size_t req_len,
+ char **err, void **resp, size_t *resp_len)
+{
+ uint64_t seq;
+
+ if (hcli->sock < 0) {
+ if (!hrpc_client_open_conn(hcli)) {
+ goto error;
+ }
+ htrace_log(hcli->lg, "hrpc_client_call: successfully opened connection\n");
+ } else {
+ htrace_log(hcli->lg, "hrpc_client_call: connection was already open\n");
+ }
+ if (!hrpc_client_send_req(hcli, method_id, req, req_len, &seq)) {
+ goto error;
+ }
+ htrace_log(hcli->lg, "hrpc_client_call: waiting for response\n");
+ if (!hrpc_client_rcv_resp(hcli, method_id, seq, err, resp, resp_len)) {
+ goto error;
+ }
+ return 1;
+
+error:
+ if (hcli->sock >= 0) {
+ close(hcli->sock);
+ hcli->sock = -1;
+ }
+ return 0;
+}
+
+static int hrpc_client_open_conn(struct hrpc_client *hcli)
+{
+ int res, sock = -1;
+ struct addrinfo hints, *list, *info;
+
+ memset(&hints, 0, sizeof hints);
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ res = getaddrinfo(hcli->host, NULL, &hints, &list);
+ if (res) {
+ htrace_log(hcli->lg, "hrpc_client_open_conn: "
+ "getaddrinfo(%s) error %d: %s\n",
+ hcli->host, res, gai_strerror(res));
+ return 0;
+ }
+ for (info = list; info; info = info->ai_next) {
+ sock = try_connect(hcli, info);
+ if (sock >= 0) {
+ break;
+ }
+ }
+ freeaddrinfo(list);
+ if (!info) {
+ htrace_log(hcli->lg, "hrpc_client_open_conn(%s): failed to connect.\n",
+ hcli->host);
+ return 0;
+ }
+ hcli->sock = sock;
+ return 1;
+}
+
+static int set_port(struct hrpc_client *hcli, struct sockaddr *addr,
+ int ai_family)
+{
+ switch (ai_family) {
+ case AF_INET: {
+ struct sockaddr_in *in4 = (struct sockaddr_in*)addr;
+ in4->sin_port = htons(hcli->port);
+ return 1;
+ }
+ case AF_INET6: {
+ struct sockaddr_in6 *in6 = (struct sockaddr_in6*)addr;
+ in6->sin6_port = htons(hcli->port);
+ return 1;
+ }
+ default:
+ htrace_log(hcli->lg, "try_connect(%s): set_port %d failed: unknown "
+ "ai_family %d\n", hcli->addr_str, hcli->port, ai_family);
+ return 0;
+ }
+}
+
+static int try_connect(struct hrpc_client *hcli, struct addrinfo *p)
+{
+ int e, sock = -1;
+ char ip[INET6_ADDRSTRLEN];
+
+ e = getnameinfo(p->ai_addr, p->ai_addrlen,
+ ip, sizeof(ip), 0, 0, NI_NUMERICHOST);
+ if (e) {
+ htrace_log(hcli->lg, "try_connect: getnameinfo failed. error "
+ "%d: %s\n", e, gai_strerror(e));
+ return 0;
+ }
+ snprintf(hcli->addr_str, ADDR_STR_MAX, "%s:%d", ip, hcli->port);
+ if (!set_port(hcli, p->ai_addr, p->ai_family)) {
+ goto error;
+ }
+ sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
+ if (sock < 0) {
+ e = errno;
+ htrace_log(hcli->lg, "try_connect(%s): failed to create new "
+ "socket: error %d (%s)\n", hcli->addr_str, e, terror(e));
+ goto error;
+ }
+ if (fcntl(sock, F_SETFD, FD_CLOEXEC) < 0) {
+ e = errno;
+ htrace_log(hcli->lg, "try_connect(%s): fcntl(FD_CLOEXEC) "
+ "failed: error %d (%s)\n", hcli->addr_str, e, terror(e));
+ goto error;
+ }
+ if (!set_socket_read_and_write_timeout(hcli, sock)) {
+ goto error;
+ }
+ if (connect(sock, p->ai_addr, p->ai_addrlen) < 0) {
+ e = errno;
+ htrace_log(hcli->lg, "try_connect(%s): connect "
+ "failed: error %d (%s)\n", hcli->addr_str, e, terror(e));
+ goto error;
+ }
+ return sock;
+
+error:
+ if (sock >= 0) {
+ close(sock);
+ }
+ return -1;
+}
+
+static int set_socket_read_and_write_timeout(struct hrpc_client *hcli,
+ int sock)
+{
+ struct timeval tv;
+
+ ms_to_timeval(hcli->read_timeo_ms, &tv);
+ if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
+ int e = errno;
+ htrace_log(hcli->lg, "setsockopt(%d, SO_RCVTIMEO, %"PRId64") failed: "
+ "error %d (%s)\n", sock, hcli->read_timeo_ms, e, terror(e));
+ return 0;
+ }
+
+ ms_to_timeval(hcli->write_timeo_ms, &tv);
+ if (setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) < 0) {
+ int e = errno;
+ htrace_log(hcli->lg, "setsockopt(%d, SO_SNDTIMEO, %"PRId64") failed: "
+ "error %d (%s)\n", sock, hcli->write_timeo_ms, e, terror(e));
+ return 0;
+ }
+ return 1;
+}
+
+static int hrpc_client_send_req(struct hrpc_client *hcli, uint32_t method_id,
+ const void *req, size_t req_len, uint64_t *seq)
+{
+ struct hrpc_req_header hdr;
+ struct iovec iov[2];
+
+ hdr.magic = htole64(HRPC_MAGIC);
+ hdr.method_id = htole32(method_id);
+ *seq = hcli->seq++;
+ hdr.seq = htole64(*seq);
+ hdr.length = htole32(req_len);
+ iov[0].iov_base = &hdr;
+ iov[0].iov_len = sizeof(hdr);
+ iov[1].iov_base = (void*)req;
+ iov[1].iov_len = req_len;
+
+ while (1) {
+ ssize_t res = writev(hcli->sock, iov, sizeof(iov)/sizeof(iov[0]));
+ int i;
+ if (res < 0) {
+ int e = errno;
+ if (e == EINTR) {
+ continue;
+ }
+ htrace_log(hcli->lg, "hrpc_client_send_req: writev error: "
+ "error %d: %s\n", e, terror(e));
+ return 0;
+ }
+ i = 0;
+ while (res > 0) {
+ if (iov[i].iov_len < res) {
+ res -= iov[i].iov_len;
+ iov[i].iov_len = 0;
+ } else {
+ iov[i].iov_len -= res;
+ res = 0;
+ }
+ if (++i >= (sizeof(iov)/sizeof(iov[0]))) {
+ if (res == 0) {
+ return 1;
+ }
+ htrace_log(hcli->lg, "hrpc_client_send_req: unexpectedly "
+ "large writev return.\n");
+ return 0;
+ }
+ }
+ }
+}
+
+static int safe_read(int fd, void *buf, size_t amt)
+{
+ uint8_t *b = buf;
+ int e, res, nread = 0;
+
+ while (1) {
+ res = read(fd, b + nread, amt - nread);
+ if (res <= 0) {
+ if (res == 0) {
+ return nread;
+ }
+ e = errno;
+ if (e == EINTR) {
+ continue;
+ }
+ return -e;
+ }
+ nread += res;
+ if (nread >= amt) {
+ return nread;
+ }
+ }
+}
+
+static int hrpc_client_rcv_resp(struct hrpc_client *hcli, uint32_t method_id,
+ uint64_t seq, char **err_out, void **resp_out,
+ size_t *resp_len)
+{
+ int res;
+ struct hrpc_resp_header hdr;
+ uint64_t resp_seq;
+ uint32_t resp_method_id, err_length, length;
+ char *err = NULL, *resp = NULL;
+
+ res = safe_read(hcli->sock, &hdr, sizeof(hdr));
+ if (res < 0) {
+ htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): error reading "
+ "response header: %d (%s)\n", hcli->addr_str, -res,
+ terror(-res));
+ goto error;
+ }
+ if (res != sizeof(hdr)) {
+ htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): unexpected EOF "
+ "reading response header.\n", hcli->addr_str);
+ goto error;
+ }
+ resp_seq = le64toh(hdr.seq);
+ if (resp_seq != seq) {
+ htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): expected sequence "
+ "ID 0x%"PRIx64", but got sequence ID 0x%"PRId64".\n",
+ hcli->addr_str, seq, resp_seq);
+ goto error;
+ }
+ resp_method_id = le32toh(hdr.method_id);
+ if (resp_method_id != method_id) {
+ htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): expected method "
+ "ID 0x%"PRIx32", but got method ID 0x%"PRId32".\n",
+ hcli->addr_str, method_id, resp_method_id);
+ goto error;
+ }
+ err_length = le32toh(hdr.err_length);
+ if (err_length > MAX_HRPC_ERROR_LENGTH) {
+ htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): error length was "
+ "%"PRId32", but the maximum error length is %"PRId32".",
+ hcli->addr_str, err_length, MAX_HRPC_ERROR_LENGTH);
+ goto error;
+ }
+ if (err_length > 0) {
+ err = malloc(err_length + 1);
+ res = safe_read(hcli->sock, err, err_length);
+ if (res < 0) {
+ htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): error reading "
+ "error string: %d (%s)\n", hcli->addr_str, -res,
+ terror(-res));
+ goto error;
+ }
+ if (res != err_length) {
+ htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): unexpected EOF "
+ "reading error string.\n", hcli->addr_str);
+ goto error;
+ }
+ err[err_length] = '\0';
+ }
+ length = le32toh(hdr.length);
+ if (length > MAX_HRPC_BODY_LENGTH) {
+ htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): body length was "
+ "%"PRId32", but the maximum body length is %"PRId32".",
+ hcli->addr_str, length, MAX_HRPC_BODY_LENGTH);
+ goto error;
+ }
+ if (length > 0) {
+ resp = malloc(length);
+ res = safe_read(hcli->sock, resp, length);
+ if (res < 0) {
+ htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): error reading "
+ "body: %d (%s)\n", hcli->addr_str, -res, terror(-res));
+ goto error;
+ }
+ if (res != length) {
+ htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): unexpected EOF "
+ "reading body.\n", hcli->addr_str);
+ goto error;
+ }
+ }
+ *err_out = err;
+ *resp_out = resp;
+ *resp_len = length;
+ return 1;
+
+error:
+ free(err);
+ free(resp);
+ *err_out = NULL;
+ *resp_out = NULL;
+ *resp_len = 0;
+ return 0;
+}
+
+const char *hrpc_client_get_endpoint(struct hrpc_client *hcli)
+{
+ return hcli->endpoint;
+}
+
+// vim:ts=4:sw=4:et
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/receiver/hrpc.h
----------------------------------------------------------------------
diff --git a/htrace-c/src/receiver/hrpc.h b/htrace-c/src/receiver/hrpc.h
new file mode 100644
index 0000000..8ec20be
--- /dev/null
+++ b/htrace-c/src/receiver/hrpc.h
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef APACHE_HTRACE_RECEIVER_HRPC
+#define APACHE_HTRACE_RECEIVER_HRPC
+
+/**
+ * @file hrpc.h
+ *
+ * Functions related to HRPC.
+ *
+ * This is an internal header, not intended for external use.
+ */
+
+#include <stdint.h>
+#include <unistd.h>
+
+#define METHOD_ID_WRITE_SPANS 0x1
+
+struct htrace_log;
+
+/**
+ * Create an HRPC client.
+ *
+ * @param lg The log object to use for the HRPC client.
+ * @param write_timeo_ms The TCP write timeout to use.
+ * @param read_timeo_ms The TCP read timeout to use.
+ * @param hostpost The hostname and port, separated by a colon.
+ *
+ * @param NULL on OOM; the hrpc_client otherwise.
+ */
+struct hrpc_client *hrpc_client_alloc(struct htrace_log *lg,
+ uint64_t write_timeo_ms, uint64_t read_timeo_ms,
+ const char *endpoint);
+
+/**
+ * Free the HRPC client.
+ *
+ * @param hcli The HRPC client.
+ */
+void hrpc_client_free(struct hrpc_client *hcli);
+
+/**
+ * Make a blocking call using the HRPC client.
+ *
+ * @param hcli The HRPC client.
+ * @param method_id The method ID to use.
+ * @param req The request buffer to send.
+ * @param req_len The size of the request buffer to send.
+ * @param err (out param) Will be set to a malloced
+ * NULL-terminated string if the server returned an
+ * error response. NULL otherwise.
+ * @param resp (out param) The response body. Will be set to the
+ * response body if the function returns nonzero.
+ * @param resp_len (out param) The length of the response body.
+ *
+ * @return 0 on failure, 1 on success.
+ */
+int hrpc_client_call(struct hrpc_client *hcli, uint32_t method_id,
+ const void *req, size_t req_len,
+ char **err, void **resp, size_t *resp_len);
+
+/**
+ * Get the endpoint for this HRPC client.
+ *
+ * @param hcli The HRPC client.
+ *
+ * @return The endpoint. This string will be valid for the
+ * lifetime of the HRPC client.
+ */
+const char *hrpc_client_get_endpoint(struct hrpc_client *hcli);
+
+#endif
+
+// vim: ts=4: sw=4: et
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/receiver/htraced.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/receiver/htraced.c b/htrace-c/src/receiver/htraced.c
index ea61541..7e4af81 100644
--- a/htrace-c/src/receiver/htraced.c
+++ b/htrace-c/src/receiver/htraced.c
@@ -20,14 +20,14 @@
#include "core/htrace.h"
#include "core/htracer.h"
#include "core/span.h"
-#include "receiver/curl.h"
+#include "receiver/hrpc.h"
#include "receiver/receiver.h"
#include "test/test.h"
#include "util/log.h"
+#include "util/string.h"
#include "util/time.h"
#include <errno.h>
-#include <curl/curl.h>
#include <inttypes.h>
#include <pthread.h>
#include <stdint.h>
@@ -56,15 +56,25 @@
#define HTRACED_MAX_BUFFER_SIZE 0x7ffffffffffffffLL
/**
- * The minimum number of milliseconds to allow for send_timeo_ms.
+ * The minimum number of milliseconds to allow for flush_interval_ms.
*/
-#define HTRACED_SEND_TIMEO_MS_MIN 30000LL
+#define HTRACED_FLUSH_INTERVAL_MS_MIN 30000LL
/**
- * The maximum number of milliseconds to allow for send_timeo_ms.
+ * The maximum number of milliseconds to allow for flush_interval_ms.
* This is mainly to avoid overflow.
*/
-#define HTRACED_SEND_TIMEO_MS_MAX 86400000LL
+#define HTRACED_FLUSH_INTERVAL_MS_MAX 86400000LL
+
+/**
+ * The minimum number of milliseconds to allow for tcp write timeouts.
+ */
+#define HTRACED_WRITE_TIMEO_MS_MIN 50LL
+
+/**
+ * The minimum number of milliseconds to allow for tcp read timeouts.
+ */
+#define HTRACED_READ_TIMEO_MS_MIN 50LL
/**
* The maximum size of the message we will send over the wire.
@@ -108,16 +118,10 @@ struct htraced_rcv {
struct htracer *tracer;
/**
- * The HTraced server URL.
- * Dynamically allocated.
- */
- char *url;
-
- /**
* Buffered span data becomes eligible to be sent even if there isn't much
* in the buffer after this timeout elapses.
*/
- uint64_t send_timeo_ms;
+ uint64_t flush_interval_ms;
/**
* The maximum number of bytes we will buffer before waking the sending
@@ -127,9 +131,9 @@ struct htraced_rcv {
uint64_t send_threshold;
/**
- * The CURL handle.
+ * The HRPC client.
*/
- CURL *curl;
+ struct hrpc_client *hcli;
/**
* Length of the circular buffer.
@@ -184,15 +188,35 @@ static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now);
static uint64_t cbuf_used(const struct htraced_rcv *rcv);
static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv);
+static uint64_t htraced_get_bounded_u64(struct htrace_log *lg,
+ const struct htrace_conf *cnf, const char *prop,
+ uint64_t min, uint64_t max)
+{
+ uint64_t val = htrace_conf_get_u64(lg, cnf, prop);
+ if (val < min) {
+ htrace_log(lg, "htraced_rcv_create: can't set %s to %"PRId64
+ ". Using minimum value of %"PRId64 " instead.\n",
+ prop, val, min);
+ return min;
+ } else if (val > max) {
+ htrace_log(lg, "htraced_rcv_create: can't set %s to %"PRId64
+ ". Using maximum value of %"PRId64 " instead.\n",
+ prop, val, max);
+ return max;
+ }
+ return val;
+}
+
static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer,
const struct htrace_conf *conf)
{
struct htraced_rcv *rcv;
- const char *url;
+ const char *endpoint;
int ret;
+ uint64_t write_timeo_ms, read_timeo_ms;
- url = htrace_conf_get(conf, HTRACED_ADDRESS_KEY);
- if (!url) {
+ endpoint = htrace_conf_get(conf, HTRACED_ADDRESS_KEY);
+ if (!endpoint) {
htrace_log(tracer->lg, "htraced_rcv_create: no value found for %s. "
"You must set this configuration key to the "
"hostname:port identifying the htraced server.\n",
@@ -208,27 +232,21 @@ static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer,
rcv->base.ty = &g_htraced_rcv_ty;
rcv->shutdown = 0;
rcv->tracer = tracer;
- if (asprintf(&rcv->url, "%s/writeSpans", url) < 0) {
- rcv->url = NULL;
+
+ rcv->flush_interval_ms = htraced_get_bounded_u64(tracer->lg, conf,
+ HTRACED_FLUSH_INTERVAL_MS_KEY, HTRACED_FLUSH_INTERVAL_MS_MIN,
+ HTRACED_FLUSH_INTERVAL_MS_MAX);
+ write_timeo_ms = htraced_get_bounded_u64(tracer->lg, conf,
+ HTRACED_WRITE_TIMEO_MS_KEY, HTRACED_WRITE_TIMEO_MS_MIN,
+ 0x7fffffffffffffffULL);
+ read_timeo_ms = htraced_get_bounded_u64(tracer->lg, conf,
+ HTRACED_READ_TIMEO_MS_KEY, HTRACED_READ_TIMEO_MS_MIN,
+ 0x7fffffffffffffffULL);
+ rcv->hcli = hrpc_client_alloc(tracer->lg, write_timeo_ms,
+ read_timeo_ms, endpoint);
+ if (!rcv->hcli) {
goto error_free_rcv;
}
- rcv->send_timeo_ms = htrace_conf_get_u64(tracer->lg, conf,
- HTRACED_SEND_TIMEOUT_MS_KEY);
- if (rcv->send_timeo_ms < HTRACED_SEND_TIMEO_MS_MIN) {
- htrace_log(tracer->lg, "htraced_rcv_create: invalid send timeout of %"
- PRId64 " ms. Setting the minimum timeout of %lld"
- " ms instead.\n", rcv->send_timeo_ms, HTRACED_SEND_TIMEO_MS_MIN);
- rcv->send_timeo_ms = HTRACED_SEND_TIMEO_MS_MIN;
- } else if (rcv->send_timeo_ms > HTRACED_SEND_TIMEO_MS_MAX) {
- htrace_log(tracer->lg, "htraced_rcv_create: invalid send timeout of %"
- PRId64 " ms. Setting the maximum timeout of %lld"
- " ms instead.\n", rcv->send_timeo_ms, HTRACED_SEND_TIMEO_MS_MAX);
- rcv->send_timeo_ms = HTRACED_SEND_TIMEO_MS_MAX;
- }
- rcv->curl = htrace_curl_init(tracer->lg, conf);
- if (!rcv->curl) {
- goto error_free_url;
- }
rcv->clen = htrace_conf_get_u64(tracer->lg, conf, HTRACED_BUFFER_SIZE_KEY);
if (rcv->clen < HTRACED_MIN_BUFFER_SIZE) {
htrace_log(tracer->lg, "htraced_rcv_create: invalid buffer size %" PRId64
@@ -245,7 +263,7 @@ static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer,
if (!rcv->cbuf) {
htrace_log(tracer->lg, "htraced_rcv_create: failed to malloc %"PRId64
" bytes for the htraced circular buffer.\n", rcv->clen);
- goto error_free_curl;
+ goto error_free_hcli;
}
// Send when the buffer gets 1/4 full.
rcv->send_threshold = rcv->clen * 0.25;
@@ -274,10 +292,12 @@ static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer,
"error %d: %s\n", ret, terror(ret));
goto error_free_cvar;
}
- htrace_log(tracer->lg, "Initialized htraced receiver with url=%s, "
- "send_timeo_ms=%" PRId64 ", send_threshold=%" PRId64 ", clen=%"
- PRId64 ".\n", rcv->url, rcv->send_timeo_ms, rcv->send_threshold,
- rcv->clen);
+ htrace_log(tracer->lg, "Initialized htraced receiver for %s"
+ ", flush_interval_ms=%" PRId64 ", send_threshold=%" PRId64
+ ", write_timeo_ms=%" PRId64 ", read_timeo_ms=%" PRId64
+ ", clen=%" PRId64 ".\n", hrpc_client_get_endpoint(rcv->hcli),
+ rcv->flush_interval_ms, rcv->send_threshold,
+ write_timeo_ms, read_timeo_ms, rcv->clen);
return (struct htrace_rcv*)rcv;
error_free_cvar:
@@ -288,10 +308,8 @@ error_free_sbuf:
free(rcv->sbuf);
error_free_cbuf:
free(rcv->cbuf);
-error_free_curl:
- htrace_curl_free(tracer->lg, rcv->curl);
-error_free_url:
- free(rcv->url);
+error_free_hcli:
+ hrpc_client_free(rcv->hcli);
error_free_rcv:
free(rcv);
error:
@@ -324,7 +342,7 @@ void* run_htraced_xmit_manager(void *data)
// because of send_timeo_ms.
// * A writer to signal that we should wake up because enough bytes are
// buffered.
- wakeup = now + (rcv->send_timeo_ms / 2);
+ wakeup = now + (rcv->flush_interval_ms / 2);
ms_to_timespec(wakeup, &wakeup_ts);
ret = pthread_cond_timedwait(&rcv->cond, &rcv->lock, &wakeup_ts);
if ((ret != 0) && (ret != ETIMEDOUT)) {
@@ -356,7 +374,7 @@ static int should_xmit(struct htraced_rcv *rcv, uint64_t now)
// We have buffered a lot of bytes, so let's send.
return 1;
}
- if (now - rcv->last_send_ms > rcv->send_timeo_ms) {
+ if (now - rcv->last_send_ms > rcv->flush_interval_ms) {
// It's been too long since the last transmission, so let's send.
if (used > 0) {
return 1;
@@ -376,55 +394,25 @@ static int should_xmit(struct htraced_rcv *rcv, uint64_t now)
static int htraced_xmit_impl(struct htraced_rcv *rcv, int32_t slen)
{
struct htrace_log *lg = rcv->tracer->lg;
- CURLcode res;
- char *pid_header = NULL;
- struct curl_slist *headers = NULL;
- int ret = 0;
-
- // Disable the use of SIGALARM to interrupt DNS lookups.
- curl_easy_setopt(rcv->curl, CURLOPT_NOSIGNAL, 1);
- // Do not use a global DNS cache.
- curl_easy_setopt(rcv->curl, CURLOPT_DNS_USE_GLOBAL_CACHE, 0);
- // Disable verbosity.
- curl_easy_setopt(rcv->curl, CURLOPT_VERBOSE, 0);
- // The user agent is libhtraced.
- curl_easy_setopt(rcv->curl, CURLOPT_USERAGENT, "libhtraced");
- // Set URL
- curl_easy_setopt(rcv->curl, CURLOPT_URL, rcv->url);
- // Set POST
- curl_easy_setopt(rcv->curl, CURLOPT_POST, 1L);
- // Set the size that we're copying from rcv->sbuf
- curl_easy_setopt(rcv->curl, CURLOPT_POSTFIELDSIZE, (long)slen);
- if (asprintf(&pid_header, "htrace-pid: %s", rcv->tracer->prid) < 0) {
- htrace_log(lg, "htraced_xmit(%s) failed: OOM allocating htrace-pid\n",
- rcv->url);
- goto done;
- }
- curl_easy_setopt(rcv->curl, CURLOPT_POSTFIELDS, rcv->sbuf);
- headers = curl_slist_append(headers, pid_header);
- if (!headers) {
- htrace_log(lg, "htraced_xmit(%s) failed: OOM allocating headers\n",
- rcv->url);
- return 0;
- }
- headers = curl_slist_append(headers, "Content-Type: application/json");
- if (!headers) {
- htrace_log(lg, "htraced_xmit(%s) failed: OOM allocating headers\n",
- rcv->url);
- return 0;
- }
- curl_easy_setopt(rcv->curl, CURLOPT_HTTPHEADER, headers);
- res = curl_easy_perform(rcv->curl);
- if (res != CURLE_OK) {
- htrace_log(lg, "htraced_xmit(%s) failed: error %lld (%s)\n",
- rcv->url, (long long)res, curl_easy_strerror(res));
+ int res, retval = 0;
+ char *prequel = NULL, *err = NULL, *resp = NULL;
+ size_t resp_len = 0;
+
+ res = hrpc_client_call(rcv->hcli, METHOD_ID_WRITE_SPANS,
+ rcv->sbuf, slen, &err, (void**)&resp, &resp_len);
+ if (!res) {
+ htrace_log(lg, "htrace_xmit_impl: hrpc_client_call failed.\n");
+ retval = 0;
+ } else if (err) {
+ htrace_log(lg, "htrace_xmit_impl: server returned error: %s\n", err);
+ retval = 0;
+ } else {
+ retval = 1;
}
- ret = res == CURLE_OK;
-done:
- curl_easy_reset(rcv->curl);
- free(pid_header);
- curl_slist_free_all(headers);
- return ret;
+ free(prequel);
+ free(err);
+ free(resp);
+ return retval;
}
static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now)
@@ -446,7 +434,7 @@ static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now)
tries++;
retry = (tries < HTRACED_MAX_SEND_TRIES);
htrace_log(rcv->tracer->lg, "htraced_xmit(%s) failed on try %d. %s\n",
- rcv->url, tries,
+ hrpc_client_get_endpoint(rcv->hcli), tries,
(retry ? "Retrying after a delay." : "Giving up."));
if (!retry) {
break;
@@ -471,15 +459,21 @@ static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now)
*/
static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv)
{
- int32_t rem = HTRACED_MAX_MSG_LEN;
+ const char * const SUFFIX = "]}";
+ int SUFFIX_LEN = sizeof(SUFFIX) - 1;
+ int rem = HTRACED_MAX_MSG_LEN - SUFFIX_LEN;
size_t amt;
+ char *sbuf = (char*)rcv->sbuf;
+ fwdprintf(&sbuf, &rem, "{\"DefaultPid\":\"%s\",\"Spans\":[",
+ rcv->tracer->prid);
if (rcv->cstart < rcv->cend) {
amt = rcv->cend - rcv->cstart;
if (amt > rem) {
amt = rem;
}
- memcpy(rcv->sbuf, rcv->cbuf + rcv->cstart, amt);
+ memcpy(sbuf, rcv->cbuf + rcv->cstart, amt);
+ sbuf += amt;
rem -= amt;
rcv->cstart += amt;
} else {
@@ -487,7 +481,8 @@ static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv)
if (amt > rem) {
amt = rem;
}
- memcpy(rcv->sbuf, rcv->cbuf + rcv->cstart, amt);
+ memcpy(sbuf, rcv->cbuf + rcv->cstart, amt);
+ sbuf += amt;
rem -= amt;
rcv->cstart += amt;
if (rem > 0) {
@@ -495,11 +490,17 @@ static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv)
if (amt > rem) {
amt = rem;
}
- memcpy(rcv->sbuf, rcv->cbuf, amt);
+ memcpy(sbuf, rcv->cbuf, amt);
+ sbuf += amt;
rem -= amt;
rcv->cstart = amt;
}
}
+ // overwrite last comma
+ rem++;
+ sbuf--;
+ rem += SUFFIX_LEN;
+ fwdprintf(&sbuf, &rem, "%s", SUFFIX);
return HTRACED_MAX_MSG_LEN - rem;
}
@@ -527,11 +528,6 @@ static void htraced_rcv_add_span(struct htrace_rcv *r,
struct htraced_rcv *rcv = (struct htraced_rcv *)r;
struct htrace_log *lg = rcv->tracer->lg;
- {
- char buf[4096];
- span_json_sprintf(span, sizeof(buf), buf);
- }
-
json_len = span_json_size(span);
tries = 0;
do {
@@ -559,21 +555,28 @@ static void htraced_rcv_add_span(struct htrace_rcv *r,
if (rem < json_len) {
// Handle a 'torn write' where the circular buffer loops around to the
// beginning in the middle of the write.
- char *temp = alloca(json_len);
+ char *temp = malloc(json_len);
+ if (!temp) {
+ htrace_log(lg, "htraced_rcv_add_span: failed to malloc %d byte "
+ "buffer for torn write.\n", json_len);
+ goto done;
+ }
span_json_sprintf(span, json_len, temp);
- temp[json_len - 1] = '\n';
+ temp[json_len - 1] = ',';
memcpy(rcv->cbuf + rcv->cend, temp, rem);
memcpy(rcv->cbuf, temp + rem, json_len - rem);
rcv->cend = json_len - rem;
+ free(temp);
} else {
span_json_sprintf(span, json_len, rcv->cbuf + rcv->cend);
- rcv->cbuf[rcv->cend + json_len - 1] = '\n';
+ rcv->cbuf[rcv->cend + json_len - 1] = ',';
rcv->cend += json_len;
}
used += json_len;
if (used > rcv->send_threshold) {
pthread_cond_signal(&rcv->cond);
}
+done:
pthread_mutex_unlock(&rcv->lock);
}
@@ -611,7 +614,8 @@ static void htraced_rcv_free(struct htrace_rcv *r)
return;
}
lg = rcv->tracer->lg;
- htrace_log(lg, "Shutting down htraced receiver with url=%s\n", rcv->url);
+ htrace_log(lg, "Shutting down htraced receiver for %s\n",
+ hrpc_client_get_endpoint(rcv->hcli));
pthread_mutex_lock(&rcv->lock);
rcv->shutdown = 1;
pthread_cond_signal(&rcv->cond);
@@ -621,10 +625,9 @@ static void htraced_rcv_free(struct htrace_rcv *r)
htrace_log(lg, "htraced_rcv_free: pthread_join "
"error %d: %s\n", ret, terror(ret));
}
- free(rcv->url);
free(rcv->cbuf);
free(rcv->sbuf);
- htrace_curl_free(lg, rcv->curl);
+ hrpc_client_free(rcv->hcli);
ret = pthread_mutex_destroy(&rcv->lock);
if (ret) {
htrace_log(lg, "htraced_rcv_free: pthread_mutex_destroy "
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/test/htraced_rcv-unit.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/test/htraced_rcv-unit.c b/htrace-c/src/test/htraced_rcv-unit.c
index 29f62d1..d5c83ec 100644
--- a/htrace-c/src/test/htraced_rcv-unit.c
+++ b/htrace-c/src/test/htraced_rcv-unit.c
@@ -51,7 +51,7 @@ static int htraced_rcv_test(struct rtest *rt)
ht->root_dir, "spans.json"));
EXPECT_INT_GE(0, asprintf(&conf_str, "%s=%s;%s=%s",
HTRACE_SPAN_RECEIVER_KEY, "htraced",
- HTRACED_ADDRESS_KEY, ht->htraced_http_addr));
+ HTRACED_ADDRESS_KEY, ht->htraced_hrpc_addr));
EXPECT_INT_ZERO(rt->run(rt, conf_str));
start_ms = monotonic_now_ms(NULL);
//
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/test/mini_htraced.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/test/mini_htraced.c b/htrace-c/src/test/mini_htraced.c
index 31b5484..d97a01b 100644
--- a/htrace-c/src/test/mini_htraced.c
+++ b/htrace-c/src/test/mini_htraced.c
@@ -318,6 +318,9 @@ static void mini_htraced_write_conf_file(struct mini_htraced *ht,
if (mini_htraced_write_conf_key(fp, "web.address", "127.0.0.1:0")) {
goto ioerror;
}
+ if (mini_htraced_write_conf_key(fp, "hrpc.address", "127.0.0.1:0")) {
+ goto ioerror;
+ }
if (mini_htraced_write_conf_key(fp, "data.store.directories",
"%s%c%s", ht->data_dir[0], PATH_LIST_SEP, ht->data_dir[1])) {
goto ioerror;
@@ -546,7 +549,7 @@ static void parse_startup_notification(struct mini_htraced *ht,
char *err, size_t err_len)
{
struct json_tokener *tok = NULL;
- struct json_object *root = NULL, *http_addr, *process_id;
+ struct json_object *root = NULL, *http_addr, *process_id, *hrpc_addr;
int32_t pid;
err[0] = '\0';
@@ -574,6 +577,18 @@ static void parse_startup_notification(struct mini_htraced *ht,
snprintf(err, err_len, "OOM");
goto done;
}
+ // Find the HRPC address, in the form of hostname:port, which the htraced
+ // is listening on.
+ if (!json_object_object_get_ex(root, "HrpcAddr", &hrpc_addr)) {
+ snprintf(err, err_len, "Failed to find HrpcAddr in the startup "
+ "notification.");
+ goto done;
+ }
+ ht->htraced_hrpc_addr = strdup(json_object_get_string(hrpc_addr));
+ if (!ht->htraced_hrpc_addr) {
+ snprintf(err, err_len, "OOM");
+ goto done;
+ }
// Check that the process ID from the startup notification matches the
// process ID from the fork.
if (!json_object_object_get_ex(root, "ProcessId", &process_id)) {
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/test/mini_htraced.h
----------------------------------------------------------------------
diff --git a/htrace-c/src/test/mini_htraced.h b/htrace-c/src/test/mini_htraced.h
index a803f55..df999cc 100644
--- a/htrace-c/src/test/mini_htraced.h
+++ b/htrace-c/src/test/mini_htraced.h
@@ -106,6 +106,11 @@ struct mini_htraced {
* The HTTP address of the htraced, in hostname:port format.
*/
char *htraced_http_addr;
+
+ /**
+ * The HRPC address of the htraced, in hostname:port format.
+ */
+ char *htraced_hrpc_addr;
};
/**
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/test/rtest.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/test/rtest.c b/htrace-c/src/test/rtest.c
index 0ec5272..b299376 100644
--- a/htrace-c/src/test/rtest.c
+++ b/htrace-c/src/test/rtest.c
@@ -52,10 +52,16 @@ static void get_receiver_test_prid(char *prid, size_t prid_len)
static int rtest_data_init(const char *conf_str, struct rtest_data **out)
{
+ char *econf_str = NULL;
struct rtest_data *rdata = calloc(1, sizeof(*(rdata)));
EXPECT_NONNULL(rdata);
- rdata->cnf = htrace_conf_from_strs(conf_str,
- HTRACE_PROCESS_ID"=%{tname}/%{pid};sampler=always");
+ if (asprintf(&econf_str, HTRACE_PROCESS_ID"=%%{tname}/%%{pid};sampler=always;"
+ "%s", conf_str) < 0) {
+ fprintf(stderr, "asprintf(econf_str) failed: OOM\n");
+ return EXIT_FAILURE;
+ }
+ rdata->cnf = htrace_conf_from_str(econf_str);
+ free(econf_str);
EXPECT_NONNULL(rdata->cnf);
rdata->tracer = htracer_create(RECEIVER_TEST_TNAME, rdata->cnf);
EXPECT_NONNULL(rdata->tracer);
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/test/string-unit.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/test/string-unit.c b/htrace-c/src/test/string-unit.c
index 56155ea..f5ec729 100644
--- a/htrace-c/src/test/string-unit.c
+++ b/htrace-c/src/test/string-unit.c
@@ -16,7 +16,10 @@
* limitations under the License.
*/
+#include "core/conf.h"
+#include "core/htrace.h"
#include "test/test.h"
+#include "util/log.h"
#include "util/string.h"
#include <errno.h>
@@ -63,10 +66,52 @@ static int test_validate_json_string(void)
return EXIT_SUCCESS;
}
+static int test_parse_endpoint(struct htrace_log *lg, const char *eremote,
+ int eport, const char *endpoint)
+{
+ char *remote = NULL;
+ int port = 0;
+
+ EXPECT_INT_EQ(1, parse_endpoint(lg, endpoint, 80, &remote, &port));
+ EXPECT_NONNULL(remote);
+ EXPECT_STR_EQ(eremote, remote);
+ EXPECT_INT_EQ(eport, port);
+ free(remote);
+ return EXIT_SUCCESS;
+}
+
+static int test_parse_endpoints(void)
+{
+ struct htrace_conf *cnf;
+ struct htrace_log *lg;
+
+ cnf = htrace_conf_from_str("");
+ EXPECT_NONNULL(cnf);
+ lg = htrace_log_alloc(cnf);
+ EXPECT_NONNULL(lg);
+ EXPECT_INT_ZERO(test_parse_endpoint(lg, "", 80,
+ ""));
+ EXPECT_INT_ZERO(test_parse_endpoint(lg, "127.0.0.1", 8080,
+ "127.0.0.1:8080"));
+ EXPECT_INT_ZERO(test_parse_endpoint(lg, "127.0.0.1", 80,
+ "127.0.0.1"));
+ EXPECT_INT_ZERO(test_parse_endpoint(lg, "foobar.example.com", 99,
+ "foobar.example.com:99"));
+ EXPECT_INT_ZERO(test_parse_endpoint(lg, "foobar", 80,
+ "foobar"));
+ EXPECT_INT_ZERO(test_parse_endpoint(lg,
+ "2001:db8:85a3:8d3:1319:8a2e:370:7348", 9075,
+ "[2001:db8:85a3:8d3:1319:8a2e:370:7348]:9075"));
+ htrace_log_free(lg);
+ htrace_conf_free(cnf);
+ return EXIT_SUCCESS;
+}
+
int main(void)
{
EXPECT_INT_ZERO(test_fwdprintf());
EXPECT_INT_ZERO(test_validate_json_string());
+ EXPECT_INT_ZERO(test_parse_endpoints());
return EXIT_SUCCESS;
}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/util/string.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/util/string.c b/htrace-c/src/util/string.c
index e9a4e91..b465092 100644
--- a/htrace-c/src/util/string.c
+++ b/htrace-c/src/util/string.c
@@ -21,6 +21,7 @@
#include <stdarg.h>
#include <stdio.h>
+#include <stdlib.h>
#include <string.h>
int fwdprintf(char **buf, int* rem, const char *fmt, ...)
@@ -104,4 +105,58 @@ int validate_json_string(struct htrace_log *lg, const char *str)
return 1;
}
+int parse_endpoint(struct htrace_log *lg, const char *endpoint,
+ int default_port, char **remote_out, int *port)
+{
+ const char *remotestr;
+ const char *portstr;
+ char *remote = NULL;
+ int remote_len;
+
+ if (endpoint[0] == '[') {
+ remotestr = endpoint + 1;
+ remote_len = strcspn(remotestr, "]");
+ if (remotestr[remote_len] != ']') {
+ htrace_log(lg, "parse_hostport: found open square bracket, but "
+ "not matching close square bracket.\n");
+ return 0;
+ }
+ if (remotestr[remote_len + 1] == ':') {
+ portstr = remotestr + remote_len + 2;
+ } else {
+ portstr = NULL;
+ }
+ } else {
+ remotestr = endpoint;
+ remote_len = strcspn(remotestr, ":");
+ if (remotestr[remote_len] == ':') {
+ portstr = remotestr + remote_len + 1;
+ } else {
+ portstr = NULL;
+ }
+ }
+ remote = malloc(remote_len + 1);
+ if (!remote) {
+ htrace_log(lg, "parse_hostport: unable to allocate %d-byte string.\n",
+ remote_len);
+ return 0;
+ }
+ memcpy(remote, remotestr, remote_len);
+ remote[remote_len] = '\0';
+ if (!portstr) {
+ *port = default_port;
+ } else {
+ int p = atoi(portstr);
+ if ((p <= 0) || (p > 0xffff)) {
+ free(remote);
+ htrace_log(lg, "parse_hostport: parse port string '%s'\n",
+ portstr);
+ return 0;
+ }
+ *port = p;
+ }
+ *remote_out = remote;
+ return 1;
+}
+
// vim: ts=4:sw=4:et
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/util/string.h
----------------------------------------------------------------------
diff --git a/htrace-c/src/util/string.h b/htrace-c/src/util/string.h
index 77e764b..a4c80b0 100644
--- a/htrace-c/src/util/string.h
+++ b/htrace-c/src/util/string.h
@@ -59,6 +59,28 @@ int fwdprintf(char **buf, int* rem, const char *fmt, ...)
*/
int validate_json_string(struct htrace_log *lg, const char *str);
+/**
+ * Parse an endpoint string.
+ *
+ * We support a few different formats:
+ * Hostname/port:
+ * my.hostname:9075
+ * ipv4/port:
+ * 127.0.0.1:9075
+ * ipv6/port:
+ * [2001:db8:85a3:8d3:1319:8a2e:370:7348]:9075
+ *
+ * @param lg The htrace log object.
+ * @param endpoint The endpoint string.
+ * @param default_port The default port to use if no port is given.
+ * @param remote (out param) The remote name. Malloced.
+ * @param port (out param) The port.
+ *
+ * @return 0 on failure; 1 on success.
+ */
+int parse_endpoint(struct htrace_log *lg, const char *endpoint,
+ int default_port, char **remote, int *port);
+
#endif
// vim: ts=4:sw=4:et
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/util/time.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/util/time.c b/htrace-c/src/util/time.c
index d4ad8ed..7d02772 100644
--- a/htrace-c/src/util/time.c
+++ b/htrace-c/src/util/time.c
@@ -44,6 +44,14 @@ void ms_to_timespec(uint64_t ms, struct timespec *ts)
ts->tv_nsec = ms * 1000000LLU;
}
+void ms_to_timeval(uint64_t ms, struct timeval *tv)
+{
+ uint64_t sec = ms / 1000LLU;
+ tv->tv_sec = sec;
+ ms -= (sec * 1000LLU);
+ tv->tv_usec = ms * 1000LLU;
+}
+
uint64_t now_ms(struct htrace_log *lg)
{
struct timespec ts;
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/util/time.h
----------------------------------------------------------------------
diff --git a/htrace-c/src/util/time.h b/htrace-c/src/util/time.h
index 9b4f5d4..21f4841 100644
--- a/htrace-c/src/util/time.h
+++ b/htrace-c/src/util/time.h
@@ -31,6 +31,7 @@
struct htrace_log;
struct timespec;
+struct timeval;
/**
* Convert a timespec into a time in milliseconds.
@@ -50,6 +51,14 @@ uint64_t timespec_to_ms(const struct timespec *ts);
void ms_to_timespec(uint64_t ms, struct timespec *ts);
/**
+ * Convert a time in milliseconds into a timeval.
+ *
+ * @param ms The time in milliseconds to convert.
+ * @param tv (out param) The timeval to fill.
+ */
+void ms_to_timeval(uint64_t ms, struct timeval *tv);
+
+/**
* Get the current wall-clock time in milliseconds.
*
* @param log The log to use for error messsages.
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-htraced/src/go/src/org/apache/htrace/client/client.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/client/client.go b/htrace-htraced/src/go/src/org/apache/htrace/client/client.go
index 44e2f69..6a62e81 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/client/client.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/client/client.go
@@ -37,13 +37,7 @@ import (
func NewClient(cnf *conf.Config) (*Client, error) {
hcl := Client{}
hcl.restAddr = cnf.Get(conf.HTRACE_WEB_ADDRESS)
- if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
- var err error
- hcl.hcr, err = newHClient(cnf)
- if err != nil {
- return nil, err
- }
- }
+ hcl.hrpcAddr = cnf.Get(conf.HTRACE_HRPC_ADDRESS)
return &hcl, nil
}
@@ -51,6 +45,9 @@ type Client struct {
// REST address of the htraced server.
restAddr string
+ // HRPC address of the htraced server.
+ hrpcAddr string
+
// The HRPC client, or null if it is not enabled.
hcr *hClient
}
@@ -89,11 +86,15 @@ func (hcl *Client) FindSpan(sid common.SpanId) (*common.Span, error) {
}
func (hcl *Client) WriteSpans(req *common.WriteSpansReq) error {
- if hcl.hcr != nil {
- return hcl.hcr.writeSpans(req)
- } else {
+ if hcl.hrpcAddr == "" {
return hcl.writeSpansHttp(req)
}
+ hcr, err := newHClient(hcl.hrpcAddr)
+ if err != nil {
+ return err
+ }
+ defer hcr.Close()
+ return hcr.writeSpans(req)
}
func (hcl *Client) writeSpansHttp(req *common.WriteSpansReq) error {
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go b/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go
index 1730c02..5406d73 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go
@@ -28,7 +28,6 @@ import (
"net"
"net/rpc"
"org/apache/htrace/common"
- "org/apache/htrace/conf"
)
type hClient struct {
@@ -62,7 +61,7 @@ func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) erro
Seq: req.Seq,
Length: uint32(len(buf)),
}
- err = binary.Write(cdc.rwc, binary.BigEndian, &hdr)
+ err = binary.Write(cdc.rwc, binary.LittleEndian, &hdr)
if err != nil {
return errors.New(fmt.Sprintf("Error writing header bytes: %s",
err.Error()))
@@ -77,7 +76,7 @@ func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) erro
func (cdc *HrpcClientCodec) ReadResponseHeader(resp *rpc.Response) error {
hdr := common.HrpcResponseHeader{}
- err := binary.Read(cdc.rwc, binary.BigEndian, &hdr)
+ err := binary.Read(cdc.rwc, binary.LittleEndian, &hdr)
if err != nil {
return errors.New(fmt.Sprintf("Error reading response header "+
"bytes: %s", err.Error()))
@@ -129,13 +128,12 @@ func (cdc *HrpcClientCodec) Close() error {
return cdc.rwc.Close()
}
-func newHClient(cnf *conf.Config) (*hClient, error) {
+func newHClient(hrpcAddr string) (*hClient, error) {
hcr := hClient{}
- addr := cnf.Get(conf.HTRACE_HRPC_ADDRESS)
- conn, err := net.Dial("tcp", addr)
+ conn, err := net.Dial("tcp", hrpcAddr)
if err != nil {
return nil, errors.New(fmt.Sprintf("Error contacting the HRPC server "+
- "at %s: %s", addr, err.Error()))
+ "at %s: %s", hrpcAddr, err.Error()))
}
hcr.rpcClient = rpc.NewClientWithCodec(&HrpcClientCodec{rwc: conn})
return &hcr, nil
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go
index 9696cbc..eede69e 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go
@@ -47,27 +47,50 @@ type HrpcServer struct {
// Codec which encodes HRPC data via JSON
type HrpcServerCodec struct {
- rwc io.ReadWriteCloser
+ lg *common.Logger
+ conn net.Conn
length uint32
}
+func asJson(val interface{}) string {
+ js, err := json.Marshal(val)
+ if err != nil {
+ return "encoding error: " + err.Error()
+ }
+ return string(js)
+}
+
+func createErrAndLog(lg *common.Logger, val string) error {
+ lg.Warnf("%s\n", val)
+ return errors.New(val)
+}
+
func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
hdr := common.HrpcRequestHeader{}
- err := binary.Read(cdc.rwc, binary.BigEndian, &hdr)
+ if cdc.lg.TraceEnabled() {
+ cdc.lg.Tracef("Reading HRPC request header from %s\n", cdc.conn.RemoteAddr())
+ }
+ err := binary.Read(cdc.conn, binary.LittleEndian, &hdr)
if err != nil {
- return errors.New(fmt.Sprintf("Error reading header bytes: %s", err.Error()))
+ return createErrAndLog(cdc.lg, fmt.Sprintf("Error reading header bytes: %s",
+ err.Error()))
+ }
+ if cdc.lg.TraceEnabled() {
+ cdc.lg.Tracef("Read HRPC request header %s from %s\n",
+ asJson(&hdr), cdc.conn.RemoteAddr())
}
if hdr.Magic != common.HRPC_MAGIC {
- return errors.New(fmt.Sprintf("Invalid request header: expected "+
+ return createErrAndLog(cdc.lg, fmt.Sprintf("Invalid request header: expected "+
"magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic))
}
if hdr.Length > common.MAX_HRPC_BODY_LENGTH {
- return errors.New(fmt.Sprintf("Length prefix was too long. Maximum "+
- "length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH, hdr.Length))
+ return createErrAndLog(cdc.lg, fmt.Sprintf("Length prefix was too long. "+
+ "Maximum length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH,
+ hdr.Length))
}
req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
if req.ServiceMethod == "" {
- return errors.New(fmt.Sprintf("Unknown MethodID code 0x%04x",
+ return createErrAndLog(cdc.lg, fmt.Sprintf("Unknown MethodID code 0x%04x",
hdr.MethodId))
}
req.Seq = hdr.Seq
@@ -76,11 +99,19 @@ func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
}
func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
- dec := json.NewDecoder(io.LimitReader(cdc.rwc, int64(cdc.length)))
+ if cdc.lg.TraceEnabled() {
+ cdc.lg.Tracef("Reading HRPC %d-byte request body from %s\n",
+ cdc.length, cdc.conn.RemoteAddr())
+ }
+ dec := json.NewDecoder(io.LimitReader(cdc.conn, int64(cdc.length)))
err := dec.Decode(body)
if err != nil {
- return errors.New(fmt.Sprintf("Failed to read request body: %s",
- err.Error()))
+ return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to read request "+
+ "body from %s: %s", cdc.conn.RemoteAddr(), err.Error()))
+ }
+ if cdc.lg.TraceEnabled() {
+ cdc.lg.Tracef("Read body from %s: %s\n",
+ cdc.conn.RemoteAddr(), asJson(&body))
}
return nil
}
@@ -93,8 +124,8 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) e
if msg != nil {
buf, err = json.Marshal(msg)
if err != nil {
- return errors.New(fmt.Sprintf("Failed to marshal response message: %s",
- err.Error()))
+ return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to marshal "+
+ "response message: %s", err.Error()))
}
}
hdr := common.HrpcResponseHeader{}
@@ -102,41 +133,41 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) e
hdr.Seq = resp.Seq
hdr.ErrLength = uint32(len(resp.Error))
hdr.Length = uint32(len(buf))
- writer := bufio.NewWriterSize(cdc.rwc, 256)
- err = binary.Write(writer, binary.BigEndian, &hdr)
+ writer := bufio.NewWriterSize(cdc.conn, 256)
+ err = binary.Write(writer, binary.LittleEndian, &hdr)
if err != nil {
- return errors.New(fmt.Sprintf("Failed to write response header: %s",
- err.Error()))
+ return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to write response "+
+ "header: %s", err.Error()))
}
if hdr.ErrLength > 0 {
_, err = io.WriteString(writer, resp.Error)
if err != nil {
- return errors.New(fmt.Sprintf("Failed to write error string: %s",
- err.Error()))
+ return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to write error "+
+ "string: %s", err.Error()))
}
}
if hdr.Length > 0 {
var length int
length, err = writer.Write(buf)
if err != nil {
- return errors.New(fmt.Sprintf("Failed to write response "+
+ return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to write response "+
"message: %s", err.Error()))
}
if uint32(length) != hdr.Length {
- return errors.New(fmt.Sprintf("Failed to write all of response "+
- "message: %s", err.Error()))
+ return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to write all of "+
+ "response message: %s", err.Error()))
}
}
err = writer.Flush()
if err != nil {
- return errors.New(fmt.Sprintf("Failed to write the response bytes: "+
- "%s", err.Error()))
+ return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to write the response "+
+ "bytes: %s", err.Error()))
}
return nil
}
func (cdc *HrpcServerCodec) Close() error {
- return cdc.rwc.Close()
+ return cdc.conn.Close()
}
func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
@@ -148,7 +179,9 @@ func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
if span.ProcessId == "" {
span.ProcessId = req.DefaultPid
}
- hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson())
+ if hand.lg.TraceEnabled() {
+ hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson())
+ }
hand.store.WriteSpan(span)
}
return nil
@@ -182,8 +215,12 @@ func (hsv *HrpcServer) run() {
lg.Errorf("HRPC Accept error: %s\n", err.Error())
continue
}
+ if lg.TraceEnabled() {
+ lg.Tracef("Accepted HRPC connection from %s\n", conn.RemoteAddr())
+ }
go hsv.ServeCodec(&HrpcServerCodec{
- rwc: conn,
+ lg: lg,
+ conn: conn,
})
}
}