You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by cm...@apache.org on 2015/04/30 20:31:13 UTC

incubator-htrace git commit: HTRACE-159. libhtrace.so: use HRPC endpoint of htraced (cmccabe)

Repository: incubator-htrace
Updated Branches:
  refs/heads/master b4c968740 -> a3c25b94a


HTRACE-159. libhtrace.so: use HRPC endpoint of htraced (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/a3c25b94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/a3c25b94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/a3c25b94

Branch: refs/heads/master
Commit: a3c25b94aa5f7c3e04538e4f37ab6dba83b1480d
Parents: b4c9687
Author: Colin P. Mccabe <cm...@apache.org>
Authored: Thu Apr 30 11:23:52 2015 -0700
Committer: Colin P. Mccabe <cm...@apache.org>
Committed: Thu Apr 30 11:28:04 2015 -0700

----------------------------------------------------------------------
 htrace-c/src/CMakeLists.txt                     |  11 +-
 htrace-c/src/core/conf.c                        |   4 +-
 htrace-c/src/core/htrace.h                      |  14 +-
 htrace-c/src/receiver/curl.c                    | 124 -----
 htrace-c/src/receiver/curl.h                    |  60 ---
 htrace-c/src/receiver/hrpc.c                    | 512 +++++++++++++++++++
 htrace-c/src/receiver/hrpc.h                    |  90 ++++
 htrace-c/src/receiver/htraced.c                 | 227 ++++----
 htrace-c/src/test/htraced_rcv-unit.c            |   2 +-
 htrace-c/src/test/mini_htraced.c                |  17 +-
 htrace-c/src/test/mini_htraced.h                |   5 +
 htrace-c/src/test/rtest.c                       |  10 +-
 htrace-c/src/test/string-unit.c                 |  45 ++
 htrace-c/src/util/string.c                      |  55 ++
 htrace-c/src/util/string.h                      |  22 +
 htrace-c/src/util/time.c                        |   8 +
 htrace-c/src/util/time.h                        |   9 +
 .../go/src/org/apache/htrace/client/client.go   |  21 +-
 .../go/src/org/apache/htrace/client/hclient.go  |  12 +-
 .../go/src/org/apache/htrace/htraced/hrpc.go    |  89 +++-
 20 files changed, 983 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/htrace-c/src/CMakeLists.txt b/htrace-c/src/CMakeLists.txt
index f3465f6..3d51968 100644
--- a/htrace-c/src/CMakeLists.txt
+++ b/htrace-c/src/CMakeLists.txt
@@ -48,8 +48,6 @@ get_filename_component(HTRACE_ABSPATH "../../htrace-htraced/src/go/build/htrace"
 get_filename_component(HTRACED_ABSPATH "../../htrace-htraced/src/go/build/htraced" ABSOLUTE)
 CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/test/test_config.h.cmake ${CMAKE_BINARY_DIR}/test/test_config.h)
 
-find_package(CURL REQUIRED)
-
 find_package(PkgConfig)
 pkg_check_modules(PC_JSON-C QUIET json-c)
 find_path(JSON_C_INCLUDE_DIR "json.h"
@@ -61,8 +59,7 @@ ELSE(JSON_C_INCLUDE_DIR AND JSON_C_LIBRARY)
     MESSAGE(FATAL_ERROR "Failed to find libjson-c. Try installing libjson-c with apt-get or yum, or install it manually from http://oss.metaparadigm.com/json-c/")
 ENDIF(JSON_C_INCLUDE_DIR AND JSON_C_LIBRARY)
 
-include_directories(${CURL_INCLUDE_DIR}
-                    ${CMAKE_BINARY_DIR}
+include_directories(${CMAKE_BINARY_DIR}
                     ${CMAKE_SOURCE_DIR})
 
 if (${CMAKE_SYSTEM_NAME} STREQUAL "Linux")
@@ -77,7 +74,7 @@ set(SRC_ALL
     core/htracer.c
     core/scope.c
     core/span.c
-    receiver/curl.c
+    receiver/hrpc.c
     receiver/htraced.c
     receiver/local_file.c
     receiver/noop.c
@@ -94,9 +91,7 @@ set(SRC_ALL
     util/time.c
 )
 
-set(DEPS_ALL
-    ${CURL_LIBRARY}
-    pthread)
+set(DEPS_ALL pthread)
 
 # The unit test version of the library, which exposes all symbols.
 add_library(htrace_test STATIC

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/core/conf.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/core/conf.c b/htrace-c/src/core/conf.c
index 936e224..a561906 100644
--- a/htrace-c/src/core/conf.c
+++ b/htrace-c/src/core/conf.c
@@ -29,7 +29,9 @@
 #define HTRACE_DEFAULT_CONF_KEYS (\
      HTRACE_PROB_SAMPLER_FRACTION_KEY "=0.01"\
      ";" HTRACED_BUFFER_SIZE_KEY "=67108864"\
-     ";" HTRACED_SEND_TIMEOUT_MS_KEY "=120000"\
+     ";" HTRACED_FLUSH_INTERVAL_MS_KEY "=120000"\
+     ";" HTRACED_WRITE_TIMEO_MS_KEY "=60000"\
+     ";" HTRACED_READ_TIMEO_MS_KEY "=60000"\
      ";" HTRACE_PROCESS_ID "=%{tname}/%{ip}"\
      ";" HTRACED_ADDRESS_KEY "=localhost:9095"\
     )

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/core/htrace.h
----------------------------------------------------------------------
diff --git a/htrace-c/src/core/htrace.h b/htrace-c/src/core/htrace.h
index cdebd31..4e2f1f9 100644
--- a/htrace-c/src/core/htrace.h
+++ b/htrace-c/src/core/htrace.h
@@ -115,9 +115,19 @@ extern  "C" {
 #define HTRACED_ADDRESS_KEY "htraced.address"
 
 /**
- * The timeout to use when sending spans to the htraced server.
+ * The maximum length of time to go before flushing spans to the htraced server.
  */
-#define HTRACED_SEND_TIMEOUT_MS_KEY "htraced.send.timeout.ms"
+#define HTRACED_FLUSH_INTERVAL_MS_KEY "htraced.flush.interval.ms"
+
+/**
+ * The TCP write timeout to use when communicating with the htraced server.
+ */
+#define HTRACED_WRITE_TIMEO_MS_KEY "htraced.write.timeo.ms"
+
+/**
+ * The TCP read timeout to use when communicating with the htraced server.
+ */
+#define HTRACED_READ_TIMEO_MS_KEY "htraced.read.timeo.ms"
 
 /**
  * The size of the circular buffer to use in the htraced receiver.

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/receiver/curl.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/receiver/curl.c b/htrace-c/src/receiver/curl.c
deleted file mode 100644
index 0905dd4..0000000
--- a/htrace-c/src/receiver/curl.c
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "core/conf.h"
-#include "util/log.h"
-
-#include <curl/curl.h>
-#include <pthread.h>
-#include <stdint.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-/**
- * Unfortunately, libcurl requires a non-threadsafe initialization function to
- * be called before it is usable.  This is unfortunate for a library like
- * libhtrace, which is designed to be used in a multi-threaded context.
- *
- * This mutex protects us against an application creating two htraced receivers
- * at around the same time, and calling that non-threadsafe initialization
- * function.
- *
- * Of course, this doesn't protect us against the application also initializing
- * libcurl.  We can protect against that by statically linking a private copy of
- * libcurl into libhtrace, so that we will be initializing and using our own
- * private copy of libcurl rather than the application's.
- */
-static pthread_mutex_t g_curl_refcnt_lock = PTHREAD_MUTEX_INITIALIZER;
-
-/**
- * The current number of CURL handles that are open.
- */
-static int64_t g_curl_refcnt;
-
-static int curl_addref(struct htrace_log *lg)
-{
-    int success = 0;
-    CURLcode curl_err = 0;
-
-    pthread_mutex_lock(&g_curl_refcnt_lock);
-    if (g_curl_refcnt >= 1) {
-        g_curl_refcnt++;
-        success = 1;
-        goto done;
-    }
-    curl_err = curl_global_init(CURL_GLOBAL_ALL);
-    if (curl_err) {
-        htrace_log(lg, "curl_global_init failed: error %d (%s)\n",
-                   curl_err, curl_easy_strerror(curl_err));
-        goto done;
-    }
-    htrace_log(lg, "successfully initialized libcurl...\n");
-    g_curl_refcnt = 1;
-    success = 1;
-
-done:
-    pthread_mutex_unlock(&g_curl_refcnt_lock);
-    return success;
-}
-
-static void curl_unref(struct htrace_log *lg)
-{
-    pthread_mutex_lock(&g_curl_refcnt_lock);
-    g_curl_refcnt--;
-    if (g_curl_refcnt > 0) {
-        goto done;
-    }
-    curl_global_cleanup();
-    htrace_log(lg, "shut down libcurl...\n");
-done:
-    pthread_mutex_unlock(&g_curl_refcnt_lock);
-}
-
-CURL* htrace_curl_init(struct htrace_log *lg, const struct htrace_conf *conf)
-{
-    CURL *curl = NULL;
-    int success = 0;
-
-    if (!curl_addref(lg)) {
-        return NULL;
-    }
-    curl = curl_easy_init();
-    if (!curl) {
-        htrace_log(lg, "curl_easy_init failed.\n");
-        goto done;
-    }
-    success = 1;
-
-done:
-    if (!success) {
-        if (curl) {
-            curl_easy_cleanup(curl);
-        }
-        curl_unref(lg);
-        return NULL;
-    }
-    return curl;
-}
-
-void htrace_curl_free(struct htrace_log *lg, CURL *curl)
-{
-    if (!curl) {
-        return;
-    }
-    curl_easy_cleanup(curl);
-    curl_unref(lg);
-}
-
-// vim:ts=4:sw=4:et

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/receiver/curl.h
----------------------------------------------------------------------
diff --git a/htrace-c/src/receiver/curl.h b/htrace-c/src/receiver/curl.h
deleted file mode 100644
index 9249cd3..0000000
--- a/htrace-c/src/receiver/curl.h
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef APACHE_HTRACE_RECEIVER_CURL_H
-#define APACHE_HTRACE_RECEIVER_CURL_H
-
-/**
- * @file curl.h
- *
- * Utility functions wrapping libcurl.
- *
- * This is an internal header, not intended for external use.
- */
-
-#include <curl/curl.h> // for the CURL type
-
-struct htrace_conf;
-struct htrace_log;
-
-/**
- * Initialize a libcurl handle.
- *
- * This function also takes care of calling curl_global_init if necessary.
- *
- * @param lg            The HTrace log to use for error messages.
- * @param conf          The HTrace configuration to use.
- *
- * @return              A libcurl handle, or NULL on failure.
- */
-CURL* htrace_curl_init(struct htrace_log *lg, const struct htrace_conf *conf);
-
-/**
- * Free a libcurl handle.
- *
- * This function also takes care of calling curl_global_cleanup if necessary.
- *
- * @param lg            The HTrace log to use for error messages.
- *
- * @param curl          The libcurl handle.
- */
-void htrace_curl_free(struct htrace_log *lg, CURL *curl);
-
-#endif
-
-// vim: ts=4: sw=4: et

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/receiver/hrpc.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/receiver/hrpc.c b/htrace-c/src/receiver/hrpc.c
new file mode 100644
index 0000000..32a1f4a
--- /dev/null
+++ b/htrace-c/src/receiver/hrpc.c
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "receiver/hrpc.h"
+#include "util/log.h"
+#include "util/string.h"
+#include "util/time.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <inttypes.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <pthread.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#if defined(__OpenBSD__)
+#include <sys/types.h>
+#define be16toh(x) betoh16(x)
+#define be32toh(x) betoh32(x)
+#define be64toh(x) betoh64(x)
+#elif defined(__NetBSD__) || defined(__FreeBSD__)
+#include <sys/endian.h>
+#else
+#include <endian.h>
+#endif
+
+/**
+ * @file hrpc.c
+ *
+ * Implements sending messages via HRPC.
+ */
+
+#define HRPC_MAGIC 0x48545243U
+
+#define MAX_HRPC_ERROR_LENGTH (4 * 1024 * 1024)
+
+#define MAX_HRPC_BODY_LENGTH (64 * 1024 * 1024)
+
+#define DEFAULT_HTRACED_HRPC_PORT 9075
+
+#define ADDR_STR_MAX (2 + INET6_ADDRSTRLEN + sizeof(":65536"))
+
+struct hrpc_client {
+    /**
+     * The HTrace log object.
+     */
+    struct htrace_log *lg;
+
+    /**
+     * The tcp write timeout in milliseconds.
+     */
+    uint64_t write_timeo_ms;
+
+    /**
+     * The tcp read timeout in milliseconds.
+     */
+    uint64_t read_timeo_ms;
+
+    /**
+     * The hostname or IP address.  Malloced.
+     */
+    char *host;
+
+    /**
+     * The port.
+     */
+    int port;
+
+    /**
+     * The host:port string.  Malloced.
+     */
+    char *endpoint;
+
+    /**
+     * Socket of current open connection, or -1 if there is no currently open
+     * connection.
+     */
+    int sock;
+
+    /**
+     * The sequence number on the connection.
+     */
+    uint64_t seq;
+
+    /**
+     * The remote IP address.
+     */
+    char addr_str[ADDR_STR_MAX];
+};
+
+struct hrpc_req_header {
+    uint32_t magic;
+    uint32_t method_id;
+    uint64_t seq;
+    uint32_t length;
+} __attribute__((packed,aligned(4)));
+
+struct hrpc_resp_header {
+    uint64_t seq;
+    uint32_t method_id;
+    uint32_t err_length;
+    uint32_t length;
+} __attribute__((packed,aligned(4)));
+
+
+static int hrpc_client_open_conn(struct hrpc_client *hcli);
+static int try_connect(struct hrpc_client *hcli, struct addrinfo *p);
+static int set_socket_read_and_write_timeout(struct hrpc_client *hcli,
+                                             int sock);
+static int hrpc_client_send_req(struct hrpc_client *hcli, uint32_t method_id,
+                        const void *req, size_t req_len, uint64_t *seq);
+static int hrpc_client_rcv_resp(struct hrpc_client *hcli, uint32_t method_id,
+                       uint64_t seq, char **err, void **resp,
+                       size_t *resp_len);
+
+struct hrpc_client *hrpc_client_alloc(struct htrace_log *lg,
+                uint64_t write_timeo_ms, uint64_t read_timeo_ms,
+                const char *endpoint)
+{
+    struct hrpc_client *hcli;
+
+    hcli = calloc(1, sizeof(*hcli));
+    if (!hcli) {
+        htrace_log(lg, "Failed to allocate memory for the HRPC client.\n");
+        goto error;
+    }
+    hcli->lg = lg;
+    hcli->write_timeo_ms = write_timeo_ms;
+    hcli->read_timeo_ms = read_timeo_ms;
+    hcli->sock = -1;
+    hcli->endpoint = strdup(endpoint);
+    if (!hcli->endpoint) {
+        htrace_log(lg, "Failed to allocate memory for the endpoint string.\n");
+        goto error;
+    }
+    if (!parse_endpoint(lg, endpoint, DEFAULT_HTRACED_HRPC_PORT,
+                   &hcli->host, &hcli->port)) {
+        goto error;
+    }
+    return hcli;
+
+error:
+    if (hcli) {
+        free(hcli->host);
+        free(hcli->endpoint);
+        free(hcli);
+    }
+    return NULL;
+}
+
+void hrpc_client_free(struct hrpc_client *hcli)
+{
+    if (!hcli) {
+        return;
+    }
+    if (hcli->sock >= 0) {
+        close(hcli->sock);
+        hcli->sock = -1;
+    }
+    free(hcli->host);
+    free(hcli->endpoint);
+    free(hcli);
+}
+
+int hrpc_client_call(struct hrpc_client *hcli, uint32_t method_id,
+                    const void *req, size_t req_len,
+                    char **err, void **resp, size_t *resp_len)
+{
+    uint64_t seq;
+
+    if (hcli->sock < 0) {
+        if (!hrpc_client_open_conn(hcli)) {
+            goto error;
+        }
+        htrace_log(hcli->lg, "hrpc_client_call: successfully opened connection\n");
+    } else {
+        htrace_log(hcli->lg, "hrpc_client_call: connection was already open\n");
+    }
+    if (!hrpc_client_send_req(hcli, method_id, req, req_len, &seq)) {
+        goto error;
+    }
+    htrace_log(hcli->lg, "hrpc_client_call: waiting for response\n");
+    if (!hrpc_client_rcv_resp(hcli, method_id, seq, err, resp, resp_len)) {
+        goto error;
+    }
+    return 1;
+
+error:
+    if (hcli->sock >= 0) {
+        close(hcli->sock);
+        hcli->sock = -1;
+    }
+    return 0;
+}
+
+static int hrpc_client_open_conn(struct hrpc_client *hcli)
+{
+    int res, sock = -1;
+    struct addrinfo hints, *list, *info;
+
+    memset(&hints, 0, sizeof hints);
+    hints.ai_family = AF_UNSPEC;
+    hints.ai_socktype = SOCK_STREAM;
+    res = getaddrinfo(hcli->host, NULL, &hints, &list);
+    if (res) {
+        htrace_log(hcli->lg, "hrpc_client_open_conn: "
+                   "getaddrinfo(%s) error %d: %s\n",
+                   hcli->host, res, gai_strerror(res));
+        return 0;
+    }
+    for (info = list; info; info = info->ai_next) {
+        sock = try_connect(hcli, info);
+        if (sock >= 0) {
+            break;
+        }
+    }
+    freeaddrinfo(list);
+    if (!info) {
+        htrace_log(hcli->lg, "hrpc_client_open_conn(%s): failed to connect.\n",
+                   hcli->host);
+        return 0;
+    }
+    hcli->sock = sock;
+    return 1;
+}
+
+static int set_port(struct hrpc_client *hcli, struct sockaddr *addr,
+                    int ai_family)
+{
+    switch (ai_family) {
+    case AF_INET: {
+        struct sockaddr_in *in4 = (struct sockaddr_in*)addr;
+        in4->sin_port = htons(hcli->port);
+        return 1;
+    }
+    case AF_INET6: {
+        struct sockaddr_in6 *in6 = (struct sockaddr_in6*)addr;
+        in6->sin6_port = htons(hcli->port);
+        return 1;
+    }
+    default:
+        htrace_log(hcli->lg, "try_connect(%s): set_port %d failed: unknown "
+                   "ai_family %d\n", hcli->addr_str, hcli->port, ai_family);
+        return 0;
+    }
+}
+
+static int try_connect(struct hrpc_client *hcli, struct addrinfo *p)
+{
+    int e, sock = -1;
+    char ip[INET6_ADDRSTRLEN];
+
+    e = getnameinfo(p->ai_addr, p->ai_addrlen,
+                ip, sizeof(ip), 0, 0, NI_NUMERICHOST);
+    if (e) {
+        htrace_log(hcli->lg, "try_connect: getnameinfo failed.  error "
+                   "%d: %s\n", e, gai_strerror(e));
+        return 0;
+    }
+    snprintf(hcli->addr_str, ADDR_STR_MAX, "%s:%d", ip, hcli->port);
+    if (!set_port(hcli, p->ai_addr, p->ai_family)) {
+        goto error;
+    }
+    sock = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
+    if (sock < 0) {
+        e = errno;
+        htrace_log(hcli->lg, "try_connect(%s): failed to create new "
+                   "socket: error %d (%s)\n", hcli->addr_str, e, terror(e));
+        goto error;
+    }
+    if (fcntl(sock, F_SETFD, FD_CLOEXEC) < 0) {
+        e = errno;
+        htrace_log(hcli->lg, "try_connect(%s): fcntl(FD_CLOEXEC) "
+                   "failed: error %d (%s)\n", hcli->addr_str, e, terror(e));
+        goto error;
+    }
+    if (!set_socket_read_and_write_timeout(hcli, sock)) {
+        goto error;
+    }
+    if (connect(sock, p->ai_addr, p->ai_addrlen) < 0) {
+        e = errno;
+        htrace_log(hcli->lg, "try_connect(%s): connect "
+                   "failed: error %d (%s)\n", hcli->addr_str, e, terror(e));
+        goto error;
+    }
+    return sock;
+
+error:
+    if (sock >= 0) {
+        close(sock);
+    }
+    return -1;
+}
+
+static int set_socket_read_and_write_timeout(struct hrpc_client *hcli,
+                                             int sock)
+{
+    struct timeval tv;
+
+    ms_to_timeval(hcli->read_timeo_ms, &tv);
+    if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
+        int e = errno;
+        htrace_log(hcli->lg, "setsockopt(%d, SO_RCVTIMEO, %"PRId64") failed: "
+                  "error %d (%s)\n", sock, hcli->read_timeo_ms, e, terror(e));
+        return 0;
+    }
+
+    ms_to_timeval(hcli->write_timeo_ms, &tv);
+    if (setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) < 0) {
+        int e = errno;
+        htrace_log(hcli->lg, "setsockopt(%d, SO_SNDTIMEO, %"PRId64") failed: "
+                  "error %d (%s)\n", sock, hcli->write_timeo_ms, e, terror(e));
+        return 0;
+    }
+    return 1;
+}
+
+static int hrpc_client_send_req(struct hrpc_client *hcli, uint32_t method_id,
+                        const void *req, size_t req_len, uint64_t *seq)
+{
+    struct hrpc_req_header hdr;
+    struct iovec iov[2];
+
+    hdr.magic = htole64(HRPC_MAGIC);
+    hdr.method_id = htole32(method_id);
+    *seq = hcli->seq++;
+    hdr.seq = htole64(*seq);
+    hdr.length = htole32(req_len);
+    iov[0].iov_base = &hdr;
+    iov[0].iov_len = sizeof(hdr);
+    iov[1].iov_base = (void*)req;
+    iov[1].iov_len = req_len;
+
+    while (1) {
+        ssize_t res = writev(hcli->sock, iov, sizeof(iov)/sizeof(iov[0]));
+        int i;
+        if (res < 0) {
+            int e = errno;
+            if (e == EINTR) {
+                continue;
+            }
+            htrace_log(hcli->lg, "hrpc_client_send_req: writev error: "
+                       "error %d: %s\n", e, terror(e));
+            return 0;
+        }
+        i = 0;
+        while (res > 0) {
+            if (iov[i].iov_len < res) {
+                res -= iov[i].iov_len;
+                iov[i].iov_len = 0;
+            } else {
+                iov[i].iov_len -= res;
+                res = 0;
+            }
+            if (++i >= (sizeof(iov)/sizeof(iov[0]))) {
+                if (res == 0) {
+                    return 1;
+                }
+                htrace_log(hcli->lg, "hrpc_client_send_req: unexpectedly "
+                           "large writev return.\n");
+                return 0;
+            }
+        }
+    }
+}
+
+static int safe_read(int fd, void *buf, size_t amt)
+{
+    uint8_t *b = buf;
+    int e, res, nread = 0;
+
+    while (1) {
+        res = read(fd, b + nread, amt - nread);
+        if (res <= 0) {
+            if (res == 0) {
+                return nread;
+            }
+            e = errno;
+            if (e == EINTR) {
+                continue;
+            }
+            return -e;
+        }
+        nread += res;
+        if (nread >= amt) {
+            return nread;
+        }
+    }
+}
+
+static int hrpc_client_rcv_resp(struct hrpc_client *hcli, uint32_t method_id,
+                                uint64_t seq, char **err_out, void **resp_out,
+                                size_t *resp_len)
+{
+    int res;
+    struct hrpc_resp_header hdr;
+    uint64_t resp_seq;
+    uint32_t resp_method_id, err_length, length;
+    char *err = NULL, *resp = NULL;
+
+    res = safe_read(hcli->sock, &hdr, sizeof(hdr));
+    if (res < 0) {
+        htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): error reading "
+                   "response header: %d (%s)\n", hcli->addr_str, -res,
+                   terror(-res));
+        goto error;
+    }
+    if (res != sizeof(hdr)) {
+        htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): unexpected EOF "
+                   "reading response header.\n", hcli->addr_str);
+        goto error;
+    }
+    resp_seq = le64toh(hdr.seq);
+    if (resp_seq != seq) {
+        htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): expected sequence "
+                   "ID 0x%"PRIx64", but got sequence ID 0x%"PRId64".\n",
+                   hcli->addr_str, seq, resp_seq);
+        goto error;
+    }
+    resp_method_id = le32toh(hdr.method_id);
+    if (resp_method_id != method_id) {
+        htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): expected method "
+                   "ID 0x%"PRIx32", but got method ID 0x%"PRId32".\n",
+                   hcli->addr_str, method_id, resp_method_id);
+        goto error;
+    }
+    err_length = le32toh(hdr.err_length);
+    if (err_length > MAX_HRPC_ERROR_LENGTH) {
+        htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): error length was "
+                   "%"PRId32", but the maximum error length is %"PRId32".",
+                   hcli->addr_str, err_length, MAX_HRPC_ERROR_LENGTH);
+        goto error;
+    }
+    if (err_length > 0) {
+        err = malloc(err_length + 1);
+        res = safe_read(hcli->sock, err, err_length);
+        if (res < 0) {
+            htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): error reading "
+                       "error string: %d (%s)\n", hcli->addr_str, -res,
+                       terror(-res));
+            goto error;
+        }
+        if (res != err_length) {
+            htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): unexpected EOF "
+                       "reading error string.\n", hcli->addr_str);
+            goto error;
+        }
+        err[err_length] = '\0';
+    }
+    length = le32toh(hdr.length);
+    if (length > MAX_HRPC_BODY_LENGTH) {
+        htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): body length was "
+                   "%"PRId32", but the maximum body length is %"PRId32".",
+                   hcli->addr_str, length, MAX_HRPC_BODY_LENGTH);
+        goto error;
+    }
+    if (length > 0) {
+        resp = malloc(length);
+        res = safe_read(hcli->sock, resp, length);
+        if (res < 0) {
+            htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): error reading "
+                       "body: %d (%s)\n", hcli->addr_str, -res, terror(-res));
+            goto error;
+        }
+        if (res != length) {
+            htrace_log(hcli->lg, "hrpc_client_rcv_resp(%s): unexpected EOF "
+                       "reading body.\n", hcli->addr_str);
+            goto error;
+        }
+    }
+    *err_out = err;
+    *resp_out = resp;
+    *resp_len = length;
+    return 1;
+
+error:
+    free(err);
+    free(resp);
+    *err_out = NULL;
+    *resp_out = NULL;
+    *resp_len = 0;
+    return 0;
+}
+
+const char *hrpc_client_get_endpoint(struct hrpc_client *hcli)
+{
+    return hcli->endpoint;
+}
+
+// vim:ts=4:sw=4:et

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/receiver/hrpc.h
----------------------------------------------------------------------
diff --git a/htrace-c/src/receiver/hrpc.h b/htrace-c/src/receiver/hrpc.h
new file mode 100644
index 0000000..8ec20be
--- /dev/null
+++ b/htrace-c/src/receiver/hrpc.h
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef APACHE_HTRACE_RECEIVER_HRPC
+#define APACHE_HTRACE_RECEIVER_HRPC
+
+/**
+ * @file hrpc.h
+ *
+ * Functions related to HRPC.
+ *
+ * This is an internal header, not intended for external use.
+ */
+
+#include <stdint.h>
+#include <unistd.h>
+
+#define METHOD_ID_WRITE_SPANS 0x1
+
+struct htrace_log;
+
+/**
+ * Create an HRPC client.
+ *
+ * @param lg                The log object to use for the HRPC client.
+ * @param write_timeo_ms    The TCP write timeout to use.
+ * @param read_timeo_ms     The TCP read timeout to use.
+ * @param hostpost          The hostname and port, separated by a colon.
+ *
+ * @param                   NULL on OOM; the hrpc_client otherwise.
+ */
+struct hrpc_client *hrpc_client_alloc(struct htrace_log *lg,
+                uint64_t write_timeo_ms, uint64_t read_timeo_ms,
+                const char *endpoint);
+
+/**
+ * Free the HRPC client.
+ *
+ * @param hcli              The HRPC client.
+ */
+void hrpc_client_free(struct hrpc_client *hcli);
+
+/**
+ * Make a blocking call using the HRPC client.
+ *
+ * @param hcli              The HRPC client.
+ * @param method_id         The method ID to use.
+ * @param req               The request buffer to send.
+ * @param req_len           The size of the request buffer to send.
+ * @param err               (out param) Will be set to a malloced
+ *                              NULL-terminated string if the server returned an
+ *                              error response.  NULL otherwise.
+ * @param resp              (out param) The response body.  Will be set to the
+ *                              response body if the function returns nonzero.
+ * @param resp_len          (out param) The length of the response body.
+ *
+ * @return                  0 on failure, 1 on success.
+ */
+int hrpc_client_call(struct hrpc_client *hcli, uint32_t method_id,
+                     const void *req, size_t req_len,
+                     char **err, void **resp, size_t *resp_len);
+
+/**
+ * Get the endpoint for this HRPC client.
+ *
+ * @param hcli              The HRPC client.
+ *
+ * @return                  The endpoint.  This string will be valid for the
+ *                              lifetime of the HRPC client.
+ */
+const char *hrpc_client_get_endpoint(struct hrpc_client *hcli);
+
+#endif
+
+// vim: ts=4: sw=4: et

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/receiver/htraced.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/receiver/htraced.c b/htrace-c/src/receiver/htraced.c
index ea61541..7e4af81 100644
--- a/htrace-c/src/receiver/htraced.c
+++ b/htrace-c/src/receiver/htraced.c
@@ -20,14 +20,14 @@
 #include "core/htrace.h"
 #include "core/htracer.h"
 #include "core/span.h"
-#include "receiver/curl.h"
+#include "receiver/hrpc.h"
 #include "receiver/receiver.h"
 #include "test/test.h"
 #include "util/log.h"
+#include "util/string.h"
 #include "util/time.h"
 
 #include <errno.h>
-#include <curl/curl.h>
 #include <inttypes.h>
 #include <pthread.h>
 #include <stdint.h>
@@ -56,15 +56,25 @@
 #define HTRACED_MAX_BUFFER_SIZE 0x7ffffffffffffffLL
 
 /**
- * The minimum number of milliseconds to allow for send_timeo_ms.
+ * The minimum number of milliseconds to allow for flush_interval_ms.
  */
-#define HTRACED_SEND_TIMEO_MS_MIN 30000LL
+#define HTRACED_FLUSH_INTERVAL_MS_MIN 30000LL
 
 /**
- * The maximum number of milliseconds to allow for send_timeo_ms.
+ * The maximum number of milliseconds to allow for flush_interval_ms.
  * This is mainly to avoid overflow.
  */
-#define HTRACED_SEND_TIMEO_MS_MAX 86400000LL
+#define HTRACED_FLUSH_INTERVAL_MS_MAX 86400000LL
+
+/**
+ * The minimum number of milliseconds to allow for tcp write timeouts.
+ */
+#define HTRACED_WRITE_TIMEO_MS_MIN 50LL
+
+/**
+ * The minimum number of milliseconds to allow for tcp read timeouts.
+ */
+#define HTRACED_READ_TIMEO_MS_MIN 50LL
 
 /**
  * The maximum size of the message we will send over the wire.
@@ -108,16 +118,10 @@ struct htraced_rcv {
     struct htracer *tracer;
 
     /**
-     * The HTraced server URL.
-     * Dynamically allocated.
-     */
-    char *url;
-
-    /**
      * Buffered span data becomes eligible to be sent even if there isn't much
      * in the buffer after this timeout elapses.
      */
-    uint64_t send_timeo_ms;
+    uint64_t flush_interval_ms;
 
     /**
      * The maximum number of bytes we will buffer before waking the sending
@@ -127,9 +131,9 @@ struct htraced_rcv {
     uint64_t send_threshold;
 
     /**
-     * The CURL handle.
+     * The HRPC client.
      */
-    CURL *curl;
+    struct hrpc_client *hcli;
 
     /**
      * Length of the circular buffer.
@@ -184,15 +188,35 @@ static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now);
 static uint64_t cbuf_used(const struct htraced_rcv *rcv);
 static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv);
 
+static uint64_t htraced_get_bounded_u64(struct htrace_log *lg,
+                const struct htrace_conf *cnf, const char *prop,
+                uint64_t min, uint64_t max)
+{
+    uint64_t val = htrace_conf_get_u64(lg, cnf, prop);
+    if (val < min) {
+        htrace_log(lg, "htraced_rcv_create: can't set %s to %"PRId64
+                   ".  Using minimum value of %"PRId64 " instead.\n",
+                   prop, val, min);
+        return min;
+    } else if (val > max) {
+        htrace_log(lg, "htraced_rcv_create: can't set %s to %"PRId64
+                   ".  Using maximum value of %"PRId64 " instead.\n",
+                   prop, val, max);
+        return max;
+    }
+    return val;
+}
+
 static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer,
                                              const struct htrace_conf *conf)
 {
     struct htraced_rcv *rcv;
-    const char *url;
+    const char *endpoint;
     int ret;
+    uint64_t write_timeo_ms, read_timeo_ms;
 
-    url = htrace_conf_get(conf, HTRACED_ADDRESS_KEY);
-    if (!url) {
+    endpoint = htrace_conf_get(conf, HTRACED_ADDRESS_KEY);
+    if (!endpoint) {
         htrace_log(tracer->lg, "htraced_rcv_create: no value found for %s. "
                    "You must set this configuration key to the "
                    "hostname:port identifying the htraced server.\n",
@@ -208,27 +232,21 @@ static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer,
     rcv->base.ty = &g_htraced_rcv_ty;
     rcv->shutdown = 0;
     rcv->tracer = tracer;
-    if (asprintf(&rcv->url, "%s/writeSpans", url) < 0) {
-        rcv->url = NULL;
+
+    rcv->flush_interval_ms = htraced_get_bounded_u64(tracer->lg, conf,
+                HTRACED_FLUSH_INTERVAL_MS_KEY, HTRACED_FLUSH_INTERVAL_MS_MIN,
+                HTRACED_FLUSH_INTERVAL_MS_MAX);
+    write_timeo_ms = htraced_get_bounded_u64(tracer->lg, conf,
+                HTRACED_WRITE_TIMEO_MS_KEY, HTRACED_WRITE_TIMEO_MS_MIN,
+                0x7fffffffffffffffULL);
+    read_timeo_ms = htraced_get_bounded_u64(tracer->lg, conf,
+                HTRACED_READ_TIMEO_MS_KEY, HTRACED_READ_TIMEO_MS_MIN,
+                0x7fffffffffffffffULL);
+    rcv->hcli = hrpc_client_alloc(tracer->lg, write_timeo_ms,
+                                  read_timeo_ms, endpoint);
+    if (!rcv->hcli) {
         goto error_free_rcv;
     }
-    rcv->send_timeo_ms = htrace_conf_get_u64(tracer->lg, conf,
-                    HTRACED_SEND_TIMEOUT_MS_KEY);
-    if (rcv->send_timeo_ms < HTRACED_SEND_TIMEO_MS_MIN) {
-        htrace_log(tracer->lg, "htraced_rcv_create: invalid send timeout of %"
-                   PRId64 " ms.  Setting the minimum timeout of %lld"
-                   " ms instead.\n", rcv->send_timeo_ms, HTRACED_SEND_TIMEO_MS_MIN);
-        rcv->send_timeo_ms = HTRACED_SEND_TIMEO_MS_MIN;
-    } else if (rcv->send_timeo_ms > HTRACED_SEND_TIMEO_MS_MAX) {
-        htrace_log(tracer->lg, "htraced_rcv_create: invalid send timeout of %"
-                   PRId64 " ms.  Setting the maximum timeout of %lld"
-                   " ms instead.\n", rcv->send_timeo_ms, HTRACED_SEND_TIMEO_MS_MAX);
-        rcv->send_timeo_ms = HTRACED_SEND_TIMEO_MS_MAX;
-    }
-    rcv->curl = htrace_curl_init(tracer->lg, conf);
-    if (!rcv->curl) {
-        goto error_free_url;
-    }
     rcv->clen = htrace_conf_get_u64(tracer->lg, conf, HTRACED_BUFFER_SIZE_KEY);
     if (rcv->clen < HTRACED_MIN_BUFFER_SIZE) {
         htrace_log(tracer->lg, "htraced_rcv_create: invalid buffer size %" PRId64
@@ -245,7 +263,7 @@ static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer,
     if (!rcv->cbuf) {
         htrace_log(tracer->lg, "htraced_rcv_create: failed to malloc %"PRId64
                    " bytes for the htraced circular buffer.\n", rcv->clen);
-        goto error_free_curl;
+        goto error_free_hcli;
     }
     // Send when the buffer gets 1/4 full.
     rcv->send_threshold = rcv->clen * 0.25;
@@ -274,10 +292,12 @@ static struct htrace_rcv *htraced_rcv_create(struct htracer *tracer,
                    "error %d: %s\n", ret, terror(ret));
         goto error_free_cvar;
     }
-    htrace_log(tracer->lg, "Initialized htraced receiver with url=%s, "
-               "send_timeo_ms=%" PRId64 ", send_threshold=%" PRId64 ", clen=%"
-               PRId64 ".\n", rcv->url, rcv->send_timeo_ms, rcv->send_threshold,
-               rcv->clen);
+    htrace_log(tracer->lg, "Initialized htraced receiver for %s"
+                ", flush_interval_ms=%" PRId64 ", send_threshold=%" PRId64
+                ", write_timeo_ms=%" PRId64 ", read_timeo_ms=%" PRId64
+                ", clen=%" PRId64 ".\n", hrpc_client_get_endpoint(rcv->hcli),
+                rcv->flush_interval_ms, rcv->send_threshold,
+                write_timeo_ms, read_timeo_ms, rcv->clen);
     return (struct htrace_rcv*)rcv;
 
 error_free_cvar:
@@ -288,10 +308,8 @@ error_free_sbuf:
     free(rcv->sbuf);
 error_free_cbuf:
     free(rcv->cbuf);
-error_free_curl:
-    htrace_curl_free(tracer->lg, rcv->curl);
-error_free_url:
-    free(rcv->url);
+error_free_hcli:
+    hrpc_client_free(rcv->hcli);
 error_free_rcv:
     free(rcv);
 error:
@@ -324,7 +342,7 @@ void* run_htraced_xmit_manager(void *data)
         //      because of send_timeo_ms.
         // * A writer to signal that we should wake up because enough bytes are
         //      buffered.
-        wakeup = now + (rcv->send_timeo_ms / 2);
+        wakeup = now + (rcv->flush_interval_ms / 2);
         ms_to_timespec(wakeup, &wakeup_ts);
         ret = pthread_cond_timedwait(&rcv->cond, &rcv->lock, &wakeup_ts);
         if ((ret != 0) && (ret != ETIMEDOUT)) {
@@ -356,7 +374,7 @@ static int should_xmit(struct htraced_rcv *rcv, uint64_t now)
         // We have buffered a lot of bytes, so let's send.
         return 1;
     }
-    if (now - rcv->last_send_ms > rcv->send_timeo_ms) {
+    if (now - rcv->last_send_ms > rcv->flush_interval_ms) {
         // It's been too long since the last transmission, so let's send.
         if (used > 0) {
             return 1;
@@ -376,55 +394,25 @@ static int should_xmit(struct htraced_rcv *rcv, uint64_t now)
 static int htraced_xmit_impl(struct htraced_rcv *rcv, int32_t slen)
 {
     struct htrace_log *lg = rcv->tracer->lg;
-    CURLcode res;
-    char *pid_header = NULL;
-    struct curl_slist *headers = NULL;
-    int ret = 0;
-
-    // Disable the use of SIGALARM to interrupt DNS lookups.
-    curl_easy_setopt(rcv->curl, CURLOPT_NOSIGNAL, 1);
-    // Do not use a global DNS cache.
-    curl_easy_setopt(rcv->curl, CURLOPT_DNS_USE_GLOBAL_CACHE, 0);
-    // Disable verbosity.
-    curl_easy_setopt(rcv->curl, CURLOPT_VERBOSE, 0);
-    // The user agent is libhtraced.
-    curl_easy_setopt(rcv->curl, CURLOPT_USERAGENT, "libhtraced");
-    // Set URL
-    curl_easy_setopt(rcv->curl, CURLOPT_URL, rcv->url);
-    // Set POST
-    curl_easy_setopt(rcv->curl, CURLOPT_POST, 1L);
-    // Set the size that we're copying from rcv->sbuf
-    curl_easy_setopt(rcv->curl, CURLOPT_POSTFIELDSIZE, (long)slen);
-    if (asprintf(&pid_header, "htrace-pid: %s", rcv->tracer->prid) < 0) {
-        htrace_log(lg, "htraced_xmit(%s) failed: OOM allocating htrace-pid\n",
-                   rcv->url);
-        goto done;
-    }
-    curl_easy_setopt(rcv->curl, CURLOPT_POSTFIELDS, rcv->sbuf);
-    headers = curl_slist_append(headers, pid_header);
-    if (!headers) {
-        htrace_log(lg, "htraced_xmit(%s) failed: OOM allocating headers\n",
-                   rcv->url);
-        return 0;
-    }
-    headers = curl_slist_append(headers, "Content-Type: application/json");
-    if (!headers) {
-        htrace_log(lg, "htraced_xmit(%s) failed: OOM allocating headers\n",
-                   rcv->url);
-        return 0;
-    }
-    curl_easy_setopt(rcv->curl, CURLOPT_HTTPHEADER, headers);
-    res = curl_easy_perform(rcv->curl);
-    if (res != CURLE_OK) {
-        htrace_log(lg, "htraced_xmit(%s) failed: error %lld (%s)\n",
-                   rcv->url, (long long)res, curl_easy_strerror(res));
+    int res, retval = 0;
+    char *prequel = NULL, *err = NULL, *resp = NULL;
+    size_t resp_len = 0;
+
+    res = hrpc_client_call(rcv->hcli, METHOD_ID_WRITE_SPANS,
+                     rcv->sbuf, slen, &err, (void**)&resp, &resp_len);
+    if (!res) {
+        htrace_log(lg, "htrace_xmit_impl: hrpc_client_call failed.\n");
+        retval = 0;
+    } else if (err) {
+        htrace_log(lg, "htrace_xmit_impl: server returned error: %s\n", err);
+        retval = 0;
+    } else {
+        retval = 1;
     }
-    ret = res == CURLE_OK;
-done:
-    curl_easy_reset(rcv->curl);
-    free(pid_header);
-    curl_slist_free_all(headers);
-    return ret;
+    free(prequel);
+    free(err);
+    free(resp);
+    return retval;
 }
 
 static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now)
@@ -446,7 +434,7 @@ static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now)
         tries++;
         retry = (tries < HTRACED_MAX_SEND_TRIES);
         htrace_log(rcv->tracer->lg, "htraced_xmit(%s) failed on try %d.  %s\n",
-                   rcv->url, tries,
+                   hrpc_client_get_endpoint(rcv->hcli), tries,
                    (retry ? "Retrying after a delay." : "Giving up."));
         if (!retry) {
             break;
@@ -471,15 +459,21 @@ static void htraced_xmit(struct htraced_rcv *rcv, uint64_t now)
  */
 static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv)
 {
-    int32_t rem = HTRACED_MAX_MSG_LEN;
+    const char * const SUFFIX = "]}";
+    int SUFFIX_LEN = sizeof(SUFFIX) - 1;
+    int rem = HTRACED_MAX_MSG_LEN - SUFFIX_LEN;
     size_t amt;
+    char *sbuf = (char*)rcv->sbuf;
 
+    fwdprintf(&sbuf, &rem, "{\"DefaultPid\":\"%s\",\"Spans\":[",
+             rcv->tracer->prid);
     if (rcv->cstart < rcv->cend) {
         amt = rcv->cend - rcv->cstart;
         if (amt > rem) {
             amt = rem;
         }
-        memcpy(rcv->sbuf, rcv->cbuf + rcv->cstart, amt);
+        memcpy(sbuf, rcv->cbuf + rcv->cstart, amt);
+        sbuf += amt;
         rem -= amt;
         rcv->cstart += amt;
     } else {
@@ -487,7 +481,8 @@ static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv)
         if (amt > rem) {
             amt = rem;
         }
-        memcpy(rcv->sbuf, rcv->cbuf + rcv->cstart, amt);
+        memcpy(sbuf, rcv->cbuf + rcv->cstart, amt);
+        sbuf += amt;
         rem -= amt;
         rcv->cstart += amt;
         if (rem > 0) {
@@ -495,11 +490,17 @@ static int32_t cbuf_to_sbuf(struct htraced_rcv *rcv)
             if (amt > rem) {
                 amt = rem;
             }
-            memcpy(rcv->sbuf, rcv->cbuf, amt);
+            memcpy(sbuf, rcv->cbuf, amt);
+            sbuf += amt;
             rem -= amt;
             rcv->cstart = amt;
         }
     }
+    // overwrite last comma
+    rem++;
+    sbuf--;
+    rem += SUFFIX_LEN;
+    fwdprintf(&sbuf, &rem, "%s", SUFFIX);
     return HTRACED_MAX_MSG_LEN - rem;
 }
 
@@ -527,11 +528,6 @@ static void htraced_rcv_add_span(struct htrace_rcv *r,
     struct htraced_rcv *rcv = (struct htraced_rcv *)r;
     struct htrace_log *lg = rcv->tracer->lg;
 
-    {
-        char buf[4096];
-        span_json_sprintf(span, sizeof(buf), buf);
-    }
-
     json_len = span_json_size(span);
     tries = 0;
     do {
@@ -559,21 +555,28 @@ static void htraced_rcv_add_span(struct htrace_rcv *r,
     if (rem < json_len) {
         // Handle a 'torn write' where the circular buffer loops around to the
         // beginning in the middle of the write.
-        char *temp = alloca(json_len);
+        char *temp = malloc(json_len);
+        if (!temp) {
+            htrace_log(lg, "htraced_rcv_add_span: failed to malloc %d byte "
+                       "buffer for torn write.\n", json_len);
+            goto done;
+        }
         span_json_sprintf(span, json_len, temp);
-        temp[json_len - 1] = '\n';
+        temp[json_len - 1] = ',';
         memcpy(rcv->cbuf + rcv->cend, temp, rem);
         memcpy(rcv->cbuf, temp + rem, json_len - rem);
         rcv->cend = json_len - rem;
+        free(temp);
     } else {
         span_json_sprintf(span, json_len, rcv->cbuf + rcv->cend);
-        rcv->cbuf[rcv->cend + json_len - 1] = '\n';
+        rcv->cbuf[rcv->cend + json_len - 1] = ',';
         rcv->cend += json_len;
     }
     used += json_len;
     if (used > rcv->send_threshold) {
         pthread_cond_signal(&rcv->cond);
     }
+done:
     pthread_mutex_unlock(&rcv->lock);
 }
 
@@ -611,7 +614,8 @@ static void htraced_rcv_free(struct htrace_rcv *r)
         return;
     }
     lg = rcv->tracer->lg;
-    htrace_log(lg, "Shutting down htraced receiver with url=%s\n", rcv->url);
+    htrace_log(lg, "Shutting down htraced receiver for %s\n",
+               hrpc_client_get_endpoint(rcv->hcli));
     pthread_mutex_lock(&rcv->lock);
     rcv->shutdown = 1;
     pthread_cond_signal(&rcv->cond);
@@ -621,10 +625,9 @@ static void htraced_rcv_free(struct htrace_rcv *r)
         htrace_log(lg, "htraced_rcv_free: pthread_join "
                    "error %d: %s\n", ret, terror(ret));
     }
-    free(rcv->url);
     free(rcv->cbuf);
     free(rcv->sbuf);
-    htrace_curl_free(lg, rcv->curl);
+    hrpc_client_free(rcv->hcli);
     ret = pthread_mutex_destroy(&rcv->lock);
     if (ret) {
         htrace_log(lg, "htraced_rcv_free: pthread_mutex_destroy "

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/test/htraced_rcv-unit.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/test/htraced_rcv-unit.c b/htrace-c/src/test/htraced_rcv-unit.c
index 29f62d1..d5c83ec 100644
--- a/htrace-c/src/test/htraced_rcv-unit.c
+++ b/htrace-c/src/test/htraced_rcv-unit.c
@@ -51,7 +51,7 @@ static int htraced_rcv_test(struct rtest *rt)
                 ht->root_dir, "spans.json"));
     EXPECT_INT_GE(0, asprintf(&conf_str, "%s=%s;%s=%s",
                 HTRACE_SPAN_RECEIVER_KEY, "htraced",
-                HTRACED_ADDRESS_KEY, ht->htraced_http_addr));
+                HTRACED_ADDRESS_KEY, ht->htraced_hrpc_addr));
     EXPECT_INT_ZERO(rt->run(rt, conf_str));
     start_ms = monotonic_now_ms(NULL);
     //

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/test/mini_htraced.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/test/mini_htraced.c b/htrace-c/src/test/mini_htraced.c
index 31b5484..d97a01b 100644
--- a/htrace-c/src/test/mini_htraced.c
+++ b/htrace-c/src/test/mini_htraced.c
@@ -318,6 +318,9 @@ static void mini_htraced_write_conf_file(struct mini_htraced *ht,
     if (mini_htraced_write_conf_key(fp, "web.address", "127.0.0.1:0")) {
         goto ioerror;
     }
+    if (mini_htraced_write_conf_key(fp, "hrpc.address", "127.0.0.1:0")) {
+        goto ioerror;
+    }
     if (mini_htraced_write_conf_key(fp, "data.store.directories",
             "%s%c%s", ht->data_dir[0], PATH_LIST_SEP, ht->data_dir[1])) {
         goto ioerror;
@@ -546,7 +549,7 @@ static void parse_startup_notification(struct mini_htraced *ht,
                                        char *err, size_t err_len)
 {
     struct json_tokener *tok = NULL;
-    struct json_object *root = NULL, *http_addr, *process_id;
+    struct json_object *root = NULL, *http_addr, *process_id, *hrpc_addr;
     int32_t pid;
 
     err[0] = '\0';
@@ -574,6 +577,18 @@ static void parse_startup_notification(struct mini_htraced *ht,
         snprintf(err, err_len, "OOM");
         goto done;
     }
+    // Find the HRPC address, in the form of hostname:port, which the htraced
+    // is listening on.
+    if (!json_object_object_get_ex(root, "HrpcAddr", &hrpc_addr)) {
+        snprintf(err, err_len, "Failed to find HrpcAddr in the startup "
+                 "notification.");
+        goto done;
+    }
+    ht->htraced_hrpc_addr = strdup(json_object_get_string(hrpc_addr));
+    if (!ht->htraced_hrpc_addr) {
+        snprintf(err, err_len, "OOM");
+        goto done;
+    }
     // Check that the process ID from the startup notification matches the
     // process ID from the fork.
     if (!json_object_object_get_ex(root, "ProcessId", &process_id)) {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/test/mini_htraced.h
----------------------------------------------------------------------
diff --git a/htrace-c/src/test/mini_htraced.h b/htrace-c/src/test/mini_htraced.h
index a803f55..df999cc 100644
--- a/htrace-c/src/test/mini_htraced.h
+++ b/htrace-c/src/test/mini_htraced.h
@@ -106,6 +106,11 @@ struct mini_htraced {
      * The HTTP address of the htraced, in hostname:port format.
      */
     char *htraced_http_addr;
+
+    /**
+     * The HRPC address of the htraced, in hostname:port format.
+     */
+    char *htraced_hrpc_addr;
 };
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/test/rtest.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/test/rtest.c b/htrace-c/src/test/rtest.c
index 0ec5272..b299376 100644
--- a/htrace-c/src/test/rtest.c
+++ b/htrace-c/src/test/rtest.c
@@ -52,10 +52,16 @@ static void get_receiver_test_prid(char *prid, size_t prid_len)
 
 static int rtest_data_init(const char *conf_str, struct rtest_data **out)
 {
+    char *econf_str = NULL;
     struct rtest_data *rdata = calloc(1, sizeof(*(rdata)));
     EXPECT_NONNULL(rdata);
-    rdata->cnf = htrace_conf_from_strs(conf_str,
-            HTRACE_PROCESS_ID"=%{tname}/%{pid};sampler=always");
+    if (asprintf(&econf_str, HTRACE_PROCESS_ID"=%%{tname}/%%{pid};sampler=always;"
+              "%s", conf_str) < 0) {
+        fprintf(stderr, "asprintf(econf_str) failed: OOM\n");
+        return EXIT_FAILURE;
+    }
+    rdata->cnf = htrace_conf_from_str(econf_str);
+    free(econf_str);
     EXPECT_NONNULL(rdata->cnf);
     rdata->tracer = htracer_create(RECEIVER_TEST_TNAME, rdata->cnf);
     EXPECT_NONNULL(rdata->tracer);

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/test/string-unit.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/test/string-unit.c b/htrace-c/src/test/string-unit.c
index 56155ea..f5ec729 100644
--- a/htrace-c/src/test/string-unit.c
+++ b/htrace-c/src/test/string-unit.c
@@ -16,7 +16,10 @@
  * limitations under the License.
  */
 
+#include "core/conf.h"
+#include "core/htrace.h"
 #include "test/test.h"
+#include "util/log.h"
 #include "util/string.h"
 
 #include <errno.h>
@@ -63,10 +66,52 @@ static int test_validate_json_string(void)
     return EXIT_SUCCESS;
 }
 
+static int test_parse_endpoint(struct htrace_log *lg, const char *eremote,
+                               int eport, const char *endpoint)
+{
+    char *remote = NULL;
+    int port = 0;
+
+    EXPECT_INT_EQ(1, parse_endpoint(lg, endpoint, 80, &remote, &port));
+    EXPECT_NONNULL(remote);
+    EXPECT_STR_EQ(eremote, remote);
+    EXPECT_INT_EQ(eport, port);
+    free(remote);
+    return EXIT_SUCCESS;
+}
+
+static int test_parse_endpoints(void)
+{
+    struct htrace_conf *cnf;
+    struct htrace_log *lg;
+
+    cnf = htrace_conf_from_str("");
+    EXPECT_NONNULL(cnf);
+    lg = htrace_log_alloc(cnf);
+    EXPECT_NONNULL(lg);
+    EXPECT_INT_ZERO(test_parse_endpoint(lg, "", 80,
+                                        ""));
+    EXPECT_INT_ZERO(test_parse_endpoint(lg, "127.0.0.1", 8080,
+                                        "127.0.0.1:8080"));
+    EXPECT_INT_ZERO(test_parse_endpoint(lg, "127.0.0.1", 80,
+                                        "127.0.0.1"));
+    EXPECT_INT_ZERO(test_parse_endpoint(lg, "foobar.example.com", 99,
+                                        "foobar.example.com:99"));
+    EXPECT_INT_ZERO(test_parse_endpoint(lg, "foobar", 80,
+                                        "foobar"));
+    EXPECT_INT_ZERO(test_parse_endpoint(lg,
+        "2001:db8:85a3:8d3:1319:8a2e:370:7348", 9075,
+        "[2001:db8:85a3:8d3:1319:8a2e:370:7348]:9075"));
+    htrace_log_free(lg);
+    htrace_conf_free(cnf);
+    return EXIT_SUCCESS;
+}
+
 int main(void)
 {
     EXPECT_INT_ZERO(test_fwdprintf());
     EXPECT_INT_ZERO(test_validate_json_string());
+    EXPECT_INT_ZERO(test_parse_endpoints());
     return EXIT_SUCCESS;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/util/string.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/util/string.c b/htrace-c/src/util/string.c
index e9a4e91..b465092 100644
--- a/htrace-c/src/util/string.c
+++ b/htrace-c/src/util/string.c
@@ -21,6 +21,7 @@
 
 #include <stdarg.h>
 #include <stdio.h>
+#include <stdlib.h>
 #include <string.h>
 
 int fwdprintf(char **buf, int* rem, const char *fmt, ...)
@@ -104,4 +105,58 @@ int validate_json_string(struct htrace_log *lg, const char *str)
     return 1;
 }
 
+int parse_endpoint(struct htrace_log *lg, const char *endpoint,
+                   int default_port, char **remote_out, int *port)
+{
+    const char *remotestr;
+    const char *portstr;
+    char *remote = NULL;
+    int remote_len;
+
+    if (endpoint[0] == '[') {
+        remotestr = endpoint + 1;
+        remote_len = strcspn(remotestr, "]");
+        if (remotestr[remote_len] != ']') {
+            htrace_log(lg, "parse_hostport: found open square bracket, but "
+                       "not matching close square bracket.\n");
+            return 0;
+        }
+        if (remotestr[remote_len + 1] == ':') {
+            portstr = remotestr + remote_len + 2;
+        } else {
+            portstr = NULL;
+        }
+    } else {
+        remotestr = endpoint;
+        remote_len = strcspn(remotestr, ":");
+        if (remotestr[remote_len] == ':') {
+            portstr = remotestr + remote_len + 1;
+        } else {
+            portstr = NULL;
+        }
+    }
+    remote = malloc(remote_len + 1);
+    if (!remote) {
+        htrace_log(lg, "parse_hostport: unable to allocate %d-byte string.\n",
+                   remote_len);
+        return 0;
+    }
+    memcpy(remote, remotestr, remote_len);
+    remote[remote_len] = '\0';
+    if (!portstr) {
+        *port = default_port;
+    } else {
+        int p = atoi(portstr);
+        if ((p <= 0) || (p > 0xffff)) {
+            free(remote);
+            htrace_log(lg, "parse_hostport: parse port string '%s'\n",
+                       portstr);
+            return 0;
+        }
+        *port = p;
+    }
+    *remote_out = remote;
+    return 1;
+}
+
 // vim: ts=4:sw=4:et

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/util/string.h
----------------------------------------------------------------------
diff --git a/htrace-c/src/util/string.h b/htrace-c/src/util/string.h
index 77e764b..a4c80b0 100644
--- a/htrace-c/src/util/string.h
+++ b/htrace-c/src/util/string.h
@@ -59,6 +59,28 @@ int fwdprintf(char **buf, int* rem, const char *fmt, ...)
  */
 int validate_json_string(struct htrace_log *lg, const char *str);
 
+/**
+ * Parse an endpoint string.
+ *
+ * We support a few different formats:
+ * Hostname/port:
+ *      my.hostname:9075
+ * ipv4/port:
+ *      127.0.0.1:9075
+ * ipv6/port:
+ *      [2001:db8:85a3:8d3:1319:8a2e:370:7348]:9075
+ *
+ * @param lg                    The htrace log object.
+ * @param endpoint              The endpoint string.
+ * @param default_port          The default port to use if no port is given.
+ * @param remote                (out param) The remote name.  Malloced.
+ * @param port                  (out param) The port.
+ *
+ * @return                      0 on failure; 1 on success.
+ */
+int parse_endpoint(struct htrace_log *lg, const char *endpoint,
+                   int default_port, char **remote, int *port);
+
 #endif
 
 // vim: ts=4:sw=4:et

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/util/time.c
----------------------------------------------------------------------
diff --git a/htrace-c/src/util/time.c b/htrace-c/src/util/time.c
index d4ad8ed..7d02772 100644
--- a/htrace-c/src/util/time.c
+++ b/htrace-c/src/util/time.c
@@ -44,6 +44,14 @@ void ms_to_timespec(uint64_t ms, struct timespec *ts)
     ts->tv_nsec = ms * 1000000LLU;
 }
 
+void ms_to_timeval(uint64_t ms, struct timeval *tv)
+{
+    uint64_t sec = ms / 1000LLU;
+    tv->tv_sec = sec;
+    ms -= (sec * 1000LLU);
+    tv->tv_usec = ms * 1000LLU;
+}
+
 uint64_t now_ms(struct htrace_log *lg)
 {
     struct timespec ts;

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-c/src/util/time.h
----------------------------------------------------------------------
diff --git a/htrace-c/src/util/time.h b/htrace-c/src/util/time.h
index 9b4f5d4..21f4841 100644
--- a/htrace-c/src/util/time.h
+++ b/htrace-c/src/util/time.h
@@ -31,6 +31,7 @@
 
 struct htrace_log;
 struct timespec;
+struct timeval;
 
 /**
  * Convert a timespec into a time in milliseconds.
@@ -50,6 +51,14 @@ uint64_t timespec_to_ms(const struct timespec *ts);
 void ms_to_timespec(uint64_t ms, struct timespec *ts);
 
 /**
+ * Convert a time in milliseconds into a timeval.
+ *
+ * @param ms            The time in milliseconds to convert.
+ * @param tv            (out param) The timeval to fill.
+ */
+void ms_to_timeval(uint64_t ms, struct timeval *tv);
+
+/**
  * Get the current wall-clock time in milliseconds.
  *
  * @param log           The log to use for error messsages.

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-htraced/src/go/src/org/apache/htrace/client/client.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/client/client.go b/htrace-htraced/src/go/src/org/apache/htrace/client/client.go
index 44e2f69..6a62e81 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/client/client.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/client/client.go
@@ -37,13 +37,7 @@ import (
 func NewClient(cnf *conf.Config) (*Client, error) {
 	hcl := Client{}
 	hcl.restAddr = cnf.Get(conf.HTRACE_WEB_ADDRESS)
-	if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
-		var err error
-		hcl.hcr, err = newHClient(cnf)
-		if err != nil {
-			return nil, err
-		}
-	}
+	hcl.hrpcAddr = cnf.Get(conf.HTRACE_HRPC_ADDRESS)
 	return &hcl, nil
 }
 
@@ -51,6 +45,9 @@ type Client struct {
 	// REST address of the htraced server.
 	restAddr string
 
+	// HRPC address of the htraced server.
+	hrpcAddr string
+
 	// The HRPC client, or null if it is not enabled.
 	hcr *hClient
 }
@@ -89,11 +86,15 @@ func (hcl *Client) FindSpan(sid common.SpanId) (*common.Span, error) {
 }
 
 func (hcl *Client) WriteSpans(req *common.WriteSpansReq) error {
-	if hcl.hcr != nil {
-		return hcl.hcr.writeSpans(req)
-	} else {
+	if hcl.hrpcAddr == "" {
 		return hcl.writeSpansHttp(req)
 	}
+	hcr, err := newHClient(hcl.hrpcAddr)
+	if err != nil {
+		return err
+	}
+	defer hcr.Close()
+	return hcr.writeSpans(req)
 }
 
 func (hcl *Client) writeSpansHttp(req *common.WriteSpansReq) error {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go b/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go
index 1730c02..5406d73 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/client/hclient.go
@@ -28,7 +28,6 @@ import (
 	"net"
 	"net/rpc"
 	"org/apache/htrace/common"
-	"org/apache/htrace/conf"
 )
 
 type hClient struct {
@@ -62,7 +61,7 @@ func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) erro
 		Seq:      req.Seq,
 		Length:   uint32(len(buf)),
 	}
-	err = binary.Write(cdc.rwc, binary.BigEndian, &hdr)
+	err = binary.Write(cdc.rwc, binary.LittleEndian, &hdr)
 	if err != nil {
 		return errors.New(fmt.Sprintf("Error writing header bytes: %s",
 			err.Error()))
@@ -77,7 +76,7 @@ func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) erro
 
 func (cdc *HrpcClientCodec) ReadResponseHeader(resp *rpc.Response) error {
 	hdr := common.HrpcResponseHeader{}
-	err := binary.Read(cdc.rwc, binary.BigEndian, &hdr)
+	err := binary.Read(cdc.rwc, binary.LittleEndian, &hdr)
 	if err != nil {
 		return errors.New(fmt.Sprintf("Error reading response header "+
 			"bytes: %s", err.Error()))
@@ -129,13 +128,12 @@ func (cdc *HrpcClientCodec) Close() error {
 	return cdc.rwc.Close()
 }
 
-func newHClient(cnf *conf.Config) (*hClient, error) {
+func newHClient(hrpcAddr string) (*hClient, error) {
 	hcr := hClient{}
-	addr := cnf.Get(conf.HTRACE_HRPC_ADDRESS)
-	conn, err := net.Dial("tcp", addr)
+	conn, err := net.Dial("tcp", hrpcAddr)
 	if err != nil {
 		return nil, errors.New(fmt.Sprintf("Error contacting the HRPC server "+
-			"at %s: %s", addr, err.Error()))
+			"at %s: %s", hrpcAddr, err.Error()))
 	}
 	hcr.rpcClient = rpc.NewClientWithCodec(&HrpcClientCodec{rwc: conn})
 	return &hcr, nil

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a3c25b94/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go
index 9696cbc..eede69e 100644
--- a/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/src/go/src/org/apache/htrace/htraced/hrpc.go
@@ -47,27 +47,50 @@ type HrpcServer struct {
 
 // Codec which encodes HRPC data via JSON
 type HrpcServerCodec struct {
-	rwc    io.ReadWriteCloser
+	lg     *common.Logger
+	conn   net.Conn
 	length uint32
 }
 
+func asJson(val interface{}) string {
+	js, err := json.Marshal(val)
+	if err != nil {
+		return "encoding error: " + err.Error()
+	}
+	return string(js)
+}
+
+func createErrAndLog(lg *common.Logger, val string) error {
+	lg.Warnf("%s\n", val)
+	return errors.New(val)
+}
+
 func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
 	hdr := common.HrpcRequestHeader{}
-	err := binary.Read(cdc.rwc, binary.BigEndian, &hdr)
+	if cdc.lg.TraceEnabled() {
+		cdc.lg.Tracef("Reading HRPC request header from %s\n", cdc.conn.RemoteAddr())
+	}
+	err := binary.Read(cdc.conn, binary.LittleEndian, &hdr)
 	if err != nil {
-		return errors.New(fmt.Sprintf("Error reading header bytes: %s", err.Error()))
+		return createErrAndLog(cdc.lg, fmt.Sprintf("Error reading header bytes: %s",
+			err.Error()))
+	}
+	if cdc.lg.TraceEnabled() {
+		cdc.lg.Tracef("Read HRPC request header %s from %s\n",
+			asJson(&hdr), cdc.conn.RemoteAddr())
 	}
 	if hdr.Magic != common.HRPC_MAGIC {
-		return errors.New(fmt.Sprintf("Invalid request header: expected "+
+		return createErrAndLog(cdc.lg, fmt.Sprintf("Invalid request header: expected "+
 			"magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic))
 	}
 	if hdr.Length > common.MAX_HRPC_BODY_LENGTH {
-		return errors.New(fmt.Sprintf("Length prefix was too long.  Maximum "+
-			"length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH, hdr.Length))
+		return createErrAndLog(cdc.lg, fmt.Sprintf("Length prefix was too long.  "+
+			"Maximum length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH,
+			hdr.Length))
 	}
 	req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
 	if req.ServiceMethod == "" {
-		return errors.New(fmt.Sprintf("Unknown MethodID code 0x%04x",
+		return createErrAndLog(cdc.lg, fmt.Sprintf("Unknown MethodID code 0x%04x",
 			hdr.MethodId))
 	}
 	req.Seq = hdr.Seq
@@ -76,11 +99,19 @@ func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
 }
 
 func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
-	dec := json.NewDecoder(io.LimitReader(cdc.rwc, int64(cdc.length)))
+	if cdc.lg.TraceEnabled() {
+		cdc.lg.Tracef("Reading HRPC %d-byte request body from %s\n",
+			cdc.length, cdc.conn.RemoteAddr())
+	}
+	dec := json.NewDecoder(io.LimitReader(cdc.conn, int64(cdc.length)))
 	err := dec.Decode(body)
 	if err != nil {
-		return errors.New(fmt.Sprintf("Failed to read request body: %s",
-			err.Error()))
+		return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to read request "+
+			"body from %s: %s", cdc.conn.RemoteAddr(), err.Error()))
+	}
+	if cdc.lg.TraceEnabled() {
+		cdc.lg.Tracef("Read body from %s: %s\n",
+			cdc.conn.RemoteAddr(), asJson(&body))
 	}
 	return nil
 }
@@ -93,8 +124,8 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) e
 	if msg != nil {
 		buf, err = json.Marshal(msg)
 		if err != nil {
-			return errors.New(fmt.Sprintf("Failed to marshal response message: %s",
-				err.Error()))
+			return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to marshal "+
+				"response message: %s", err.Error()))
 		}
 	}
 	hdr := common.HrpcResponseHeader{}
@@ -102,41 +133,41 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) e
 	hdr.Seq = resp.Seq
 	hdr.ErrLength = uint32(len(resp.Error))
 	hdr.Length = uint32(len(buf))
-	writer := bufio.NewWriterSize(cdc.rwc, 256)
-	err = binary.Write(writer, binary.BigEndian, &hdr)
+	writer := bufio.NewWriterSize(cdc.conn, 256)
+	err = binary.Write(writer, binary.LittleEndian, &hdr)
 	if err != nil {
-		return errors.New(fmt.Sprintf("Failed to write response header: %s",
-			err.Error()))
+		return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to write response "+
+			"header: %s", err.Error()))
 	}
 	if hdr.ErrLength > 0 {
 		_, err = io.WriteString(writer, resp.Error)
 		if err != nil {
-			return errors.New(fmt.Sprintf("Failed to write error string: %s",
-				err.Error()))
+			return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to write error "+
+				"string: %s", err.Error()))
 		}
 	}
 	if hdr.Length > 0 {
 		var length int
 		length, err = writer.Write(buf)
 		if err != nil {
-			return errors.New(fmt.Sprintf("Failed to write response "+
+			return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to write response "+
 				"message: %s", err.Error()))
 		}
 		if uint32(length) != hdr.Length {
-			return errors.New(fmt.Sprintf("Failed to write all of response "+
-				"message: %s", err.Error()))
+			return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to write all of "+
+				"response message: %s", err.Error()))
 		}
 	}
 	err = writer.Flush()
 	if err != nil {
-		return errors.New(fmt.Sprintf("Failed to write the response bytes: "+
-			"%s", err.Error()))
+		return createErrAndLog(cdc.lg, fmt.Sprintf("Failed to write the response "+
+			"bytes: %s", err.Error()))
 	}
 	return nil
 }
 
 func (cdc *HrpcServerCodec) Close() error {
-	return cdc.rwc.Close()
+	return cdc.conn.Close()
 }
 
 func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
@@ -148,7 +179,9 @@ func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
 		if span.ProcessId == "" {
 			span.ProcessId = req.DefaultPid
 		}
-		hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson())
+		if hand.lg.TraceEnabled() {
+			hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson())
+		}
 		hand.store.WriteSpan(span)
 	}
 	return nil
@@ -182,8 +215,12 @@ func (hsv *HrpcServer) run() {
 			lg.Errorf("HRPC Accept error: %s\n", err.Error())
 			continue
 		}
+		if lg.TraceEnabled() {
+			lg.Tracef("Accepted HRPC connection from %s\n", conn.RemoteAddr())
+		}
 		go hsv.ServeCodec(&HrpcServerCodec{
-			rwc: conn,
+			lg:   lg,
+			conn: conn,
 		})
 	}
 }