You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/05/01 04:10:16 UTC

svn commit: r1591534 [3/3] - in /hadoop/common/branches/HADOOP-10388/hadoop-native-core: ./ common/ hdfs/ rpc/

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/protoc-gen-hrpc.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/protoc-gen-hrpc.cc?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/protoc-gen-hrpc.cc (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/protoc-gen-hrpc.cc Thu May  1 02:10:15 2014
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ctype.h>
+#include <google/protobuf/compiler/code_generator.h>
+#include <google/protobuf/compiler/plugin.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/io/printer.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+#include <google/protobuf/stubs/common.h>
+#include <tr1/memory>
+
+#include <iostream>
+#include <map>
+#include <stdio.h>
+#include <string>
+
+#define PROTO_EXTENSION ".proto"
+
+#define APACHE_HEADER \
+"/**\n" \
+" * Licensed to the Apache Software Foundation (ASF) under one\n" \
+" * or more contributor license agreements.  See the NOTICE file\n" \
+" * distributed with this work for additional information\n" \
+" * regarding copyright ownership.  The ASF licenses this file\n" \
+" * to you under the Apache License, Version 2.0 (the\n" \
+" * \"License\"); you may not use this file except in compliance\n" \
+" * with the License.  You may obtain a copy of the License at\n" \
+" *\n" \
+" *     http://www.apache.org/licenses/LICENSE-2.0\n" \
+" *\n" \
+" * Unless required by applicable law or agreed to in writing, software\n" \
+" * distributed under the License is distributed on an \"AS IS\" BASIS,\n" \
+" * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" \
+" * See the License for the specific language governing permissions and\n" \
+" * limitations under the License.\n" \
+" */\n"
+
+using google::protobuf::FileDescriptor;
+using google::protobuf::MethodDescriptor;
+using google::protobuf::ServiceDescriptor;
+using google::protobuf::compiler::GeneratorContext;
+using google::protobuf::io::Printer;
+using std::map;
+using std::string;
+using std::tr1::shared_ptr;
+
+typedef map<string, string> string_map_t;
+
+static string camel_case_to_uscore(const string &in)
+{
+    string out;
+    bool prev_lower = false;
+
+    for (size_t i = 0; i < in.size(); i++) {
+        char c = in[i];
+        if (isupper(c)) {
+            if (prev_lower) {
+                out += "_";
+            }
+            prev_lower = false;
+        } else if (islower(c) || isdigit(c)) {
+            prev_lower = true;
+        } else {
+            prev_lower = false;
+        }
+        out += tolower(c);
+    }
+    return out;
+}
+
+static bool try_strip_suffix(const string &str, const char *suffix,
+                                 string *out)
+{
+    size_t suffix_len = strlen(suffix);
+
+    if (str.size() < suffix_len) {
+        return false;
+    }
+    *out = str.substr(0, str.size() - suffix_len);
+    return true;
+}
+
+static void get_base_name(const string &path, string *base)
+{
+    size_t last_slash = path.find_last_of("/");
+    if (last_slash != string::npos) {
+        *base = path.substr(last_slash + 1);
+    } else {
+        *base = path;
+    }
+}
+
+static string set_path_substitutions(const FileDescriptor *file,
+                                     string_map_t *map)
+{
+    string path = file->name();
+    (*map)["path"] = path;
+    // Initialize path_
+    // If path = /foo/bar/baz_stuff.proto, path_ = /foo/bar/baz_stuff
+    string path_without_extension;
+    if (!try_strip_suffix(path, PROTO_EXTENSION, &path_without_extension)) {
+        return string("file name " + path + " did not end in " +
+                      PROTO_EXTENSION);
+    }
+    (*map)["path_without_extension"] = path_without_extension;
+
+    // If path = /foo/bar/baz_stuff.proto, base_ = baz_stuff
+    string base;
+    get_base_name(path_without_extension, &base);
+    (*map)["path_base"] = base;
+    (*map)["function_prefix"] = base;
+    return "";
+}
+
+static string shorten_service_prefix(const string &prefix)
+{
+    if (prefix == "ClientNamenodeProtocol") {
+        return "cnn";
+    } else if (prefix == "ClientDatanodeProtocolService") {
+        return "cdn";
+    } else if (prefix == "NamenodeProtocolService") {
+        return "nnp";
+    } else if (prefix == "DatanodeProtocolService") {
+        return "dn";
+    } else {
+        return prefix;
+    }
+}
+
+static void set_service_substitutions(const ServiceDescriptor *service,
+                                     string_map_t *map)
+{
+    // Service prefix.
+    // example: cnn
+    (*map)["service_prefix"] = shorten_service_prefix(service->name());
+}
+ 
+/**
+ * Process a dot-separated type name into a protobuf-c type name.
+ *
+ * @param input             The input type name.
+ *
+ * @return                  The protobuf-c type name.
+ */
+static string get_pbc_type_name(string input)
+{
+    char *word, *ptr = NULL;
+    string output, prefix;
+    char line[input.size() + 1];
+    strcpy(line, input.c_str());
+
+    for (word = strtok_r(line, ".", &ptr); word; 
+             word = strtok_r(NULL, ".", &ptr)) {
+        //fprintf(stderr, "word = %s\n", word);
+        if (!isupper(word[0])) {
+            word[0] = toupper(word[0]);
+        }
+        output += prefix;
+        prefix = "__";
+        output += word;
+    }
+    return output;
+}
+
+static string replace(string input, char target,
+                      const char *replacement)
+{
+    string output;
+
+    for (size_t i = 0; i < input.size(); i++) {
+        if (input[i] == target) {
+            output += replacement;
+        } else {
+            output += input[i];
+        }
+    }
+    return output;
+}
+
+static void set_method_substitutions(const MethodDescriptor *method,
+                                     string_map_t *map)
+{
+    // Request type, in camelcase.
+    // example: Hadoop__Hdfs__SetReplicationRequestProto 
+    (*map)["req_ty_camel"] =
+        get_pbc_type_name(method->input_type()->full_name());
+
+    // Request type, in underscore-separated lowercase.
+    // example: hadoop__hdfs__set_replication_request_proto
+    (*map)["req_ty_uscore"] = camel_case_to_uscore((*map)["req_ty_camel"]);
+
+    // Response type, in camelcase.
+    // example: Hadoop__Hdfs__SetReplicationResponseProto 
+    (*map)["resp_ty_camel"] =
+        get_pbc_type_name(method->output_type()->full_name());
+
+    // Response type, in underscore-separated lowercase.
+    // example: hadoop__hdfs__set_replication_response_proto
+    (*map)["resp_ty_uscore"] = camel_case_to_uscore((*map)["resp_ty_camel"]);
+
+    // RPC name, in camelcase.
+    // example: setReplication
+    (*map)["rpc_camel"] = method->name();
+
+    // RPC name, in underscore-separated lowercase.
+    // example: setReplication
+    (*map)["rpc_uscore"] = camel_case_to_uscore((*map)["rpc_camel"]);
+
+    // sync stub function name.
+    // example: cnn_set_replication
+    (*map)["sync_call"] =
+        (*map)["service_prefix"] + "_" + (*map)["rpc_uscore"];
+
+    // async stub function name.
+    // example: cnn_async_set_replication
+    (*map)["async_call"] =
+        (*map)["service_prefix"] + "_async_" + (*map)["rpc_uscore"];
+
+    // async callback adaptor function name.
+    // example: cnn_async_adaptor_set_replication
+    (*map)["async_adaptor"] =
+        (*map)["service_prefix"] + "_async_adaptor_" + (*map)["rpc_uscore"];
+}
+
+class HrpcCodeGenerator
+    : public ::google::protobuf::compiler::CodeGenerator
+{
+public:
+    HrpcCodeGenerator()
+    {
+    }
+
+    ~HrpcCodeGenerator()
+    {
+    }
+
+    bool Generate(const google::protobuf::FileDescriptor *file,
+        const string &, GeneratorContext *gen_context,
+        string *error) const
+    {
+        string_map_t path_map;
+        string ret = set_path_substitutions(file, &path_map);
+        if (!ret.empty()) {
+            *error = ret;
+            return false;
+        }
+        generate_call_header(gen_context, &path_map, file);
+        generate_call_body(gen_context, &path_map, file);
+        return true;
+    }
+
+private:
+    void generate_call_header(GeneratorContext *gen_context,
+                              string_map_t *path_map,
+                              const FileDescriptor *file) const
+    {
+        shared_ptr<google::protobuf::io::ZeroCopyOutputStream> output(
+            gen_context->Open((*path_map)["path_without_extension"] +
+                              ".call.h"));
+        Printer printer(output.get(), '$');
+        printer.Print(APACHE_HEADER);
+        printer.Print(*path_map,
+"\n"
+"// This header file was auto-generated from $path$\n"
+"\n"
+"#ifndef HADOOP_NATIVE_CORE_$path_base$_CALL_H\n"
+"#define HADOOP_NATIVE_CORE_$path_base$_CALL_H\n"
+"\n"
+"#include \"protobuf/$path_base$.pb-c.h\"\n"
+"#include \"protobuf/$path_base$.pb-c.h.s\"\n"
+"\n"
+"struct hadoop_err;\n"
+"struct hrpc_proxy;\n"
+"\n");
+        for (int service_idx = 0; service_idx < file->service_count();
+                    ++service_idx) {
+            string_map_t service_map = *path_map;
+            const ServiceDescriptor *service = file->service(service_idx);
+            set_service_substitutions(service, &service_map);
+            for (int method_idx = 0; method_idx < service->method_count();
+                    ++method_idx) {
+                const MethodDescriptor *method = service->method(method_idx);
+                string_map_t map = service_map;
+                set_method_substitutions(method, &map);
+                printer.Print(map,
+"struct hadoop_err *$sync_call$(struct hrpc_proxy *proxy,\n"
+"               const $req_ty_camel$ *req,\n"
+"               $resp_ty_camel$ **resp);\n"
+"\n"
+"void $async_call$(struct hrpc_proxy *proxy,\n"
+"    const $req_ty_camel$ *req,\n"
+"    void (*cb)($resp_ty_camel$ *,\n"
+"        struct hadoop_err *, void *cb_data),\n"
+"    void *cb_data);\n"
+"\n");
+            }
+        }
+        printer.Print("#endif\n");
+    }
+
+    void generate_call_body(GeneratorContext *gen_context,
+                            string_map_t *path_map,
+                            const FileDescriptor *file) const
+    {
+        shared_ptr<google::protobuf::io::ZeroCopyOutputStream> output(
+            gen_context->Open((*path_map)["path_without_extension"] +
+                              ".call.c"));
+        Printer printer(output.get(), '$');
+        printer.Print(APACHE_HEADER);
+        printer.Print(*path_map, "\n"
+"#include \"common/hadoop_err.h\"\n"
+"#include \"protobuf/$path_base$.call.h\"\n"
+"#include \"rpc/messenger.h\"\n"
+"#include \"rpc/proxy.h\"\n");
+        printer.Print("\n"
+"#include <errno.h>\n"
+"#include <netinet/in.h>\n"
+"#include <stdio.h>\n"
+"#include <stdlib.h>\n"
+"#include <string.h>\n"
+"#include <uv.h>\n"
+"\n");
+        for (int service_idx = 0; service_idx < file->service_count();
+                    ++service_idx) {
+            string_map_t service_map = *path_map;
+            const ServiceDescriptor *service = file->service(service_idx);
+            set_service_substitutions(service, &service_map);
+            for (int method_idx = 0; method_idx < service->method_count();
+                    ++method_idx) {
+                const MethodDescriptor *method = service->method(method_idx);
+                string_map_t map = service_map;
+                set_method_substitutions(method, &map);
+                printer.Print(map,
+"struct hadoop_err *$sync_call$(struct hrpc_proxy *proxy,\n"
+"    const $req_ty_camel$ *req,\n"
+"    $resp_ty_camel$ **out)\n"
+"{\n"
+"    struct hadoop_err *err;\n"
+"    struct hrpc_sync_ctx *ctx;\n"
+"    $resp_ty_camel$ *resp;\n"
+"\n"
+"    err = hrpc_proxy_activate(proxy);\n"
+"    if (err) {\n"
+"        return err;\n"
+"    }\n"
+"    ctx = hrpc_proxy_alloc_sync_ctx(proxy);\n"
+"    if (!ctx) {\n"
+"        hrpc_proxy_deactivate(proxy);\n"
+"        return hadoop_lerr_alloc(ENOMEM, \"$sync_call$: \"\n"
+"            \"failed to allocate sync_ctx\");\n"
+"    }\n"
+"    hrpc_proxy_start(proxy, \"$rpc_camel$\", req,\n"
+"        $req_ty_uscore$__get_packed_size(req),\n"
+"        (hrpc_pack_cb_t)$req_ty_uscore$__pack,\n"
+"        hrpc_proxy_sync_cb, ctx);\n"
+"    if (ctx->err) {\n"
+"        hrpc_free_sync_ctx(ctx);\n"
+"        return ctx->err;\n"
+"    }\n"
+"    resp = $resp_ty_uscore$__unpack(NULL, ctx->resp.pb_len,\n"
+"                                                  ctx->resp.pb_base);\n"
+"    hrpc_free_sync_ctx(ctx);\n"
+"    if (!resp) {\n"
+"        return hadoop_lerr_alloc(EINVAL,\n"
+"           \"$sync_call$: failed to parse response from server\");\n"
+"    }\n"
+"    *out = resp;\n"
+"    return NULL;\n"
+"}\n");
+                printer.Print(map,
+"struct $async_call$_cb_data {\n"
+"    void (*cb)($resp_ty_camel$ *,\n"
+"        struct hadoop_err *, void *);\n"
+"    void *cb_data;\n"
+"};\n"
+"\n"
+"void $async_adaptor$(struct hrpc_response *resp,\n"
+"                        struct hadoop_err *err, void *cb_data)\n"
+"{\n"
+"    struct $async_call$_cb_data *wrapped = cb_data;\n"
+"    $resp_ty_camel$ *msg;\n"
+"\n"
+"    if (err) {\n"
+"        wrapped->cb(NULL, err, wrapped->cb_data);\n"
+"        return;\n"
+"    }\n"
+"    msg = $resp_ty_uscore$__unpack(NULL, resp->pb_len,\n"
+"                                                  resp->pb_base);\n"
+"    free(resp->base);\n"
+"    if (!msg) {\n"
+"        wrapped->cb(NULL, hadoop_lerr_alloc(EIO,\n"
+"            \"$async_adaptor$: \"\n"
+"            \"failed to parse response from server.\"), wrapped->cb_data);\n"
+"        return;\n"
+"    }\n"
+"    wrapped->cb(msg, NULL, wrapped->cb_data);\n"
+"}\n");
+                printer.Print(map,
+"void $async_call$(struct hrpc_proxy *proxy,\n"
+"    const $req_ty_camel$ *req,\n"
+"    void (*cb)($resp_ty_camel$ *,\n"
+"        struct hadoop_err *, void *),\n"
+"    void *cb_data)\n"
+"{\n"
+"    struct $async_call$_cb_data *wrapped;\n"
+"    struct hadoop_err *err;\n"
+"\n"
+"    err = hrpc_proxy_activate(proxy);\n"
+"    if (err) {\n"
+"        cb(NULL, err, cb_data);\n"
+"        return;\n"
+"    }\n"
+"    wrapped = hrpc_proxy_alloc_userdata(proxy, sizeof(*wrapped));\n"
+"    if (!wrapped) {\n"
+"        hrpc_proxy_deactivate(proxy);\n"
+"        cb(NULL, hadoop_lerr_alloc(ENOMEM, \"$async_call$: failed \"\n"
+"                                 \"to allocate sync_ctx\"), cb_data);\n"
+"        return;\n"
+"    }\n"
+"    wrapped->cb = cb;\n"
+"    wrapped->cb_data = cb_data;\n"
+"    hrpc_proxy_start(proxy, \"$rpc_camel$\", req, \n"
+"        $req_ty_uscore$__get_packed_size(req),\n"
+"        (hrpc_pack_cb_t)$req_ty_uscore$__pack,\n"
+"        $async_adaptor$, wrapped);\n"
+"}\n"
+"\n");
+            }
+        }
+        printer.Print("// vim: ts=4:sw=4:tw=79:et\n");
+    }
+};
+
+int main(int argc, char *argv[])
+{
+  HrpcCodeGenerator generator;
+  return google::protobuf::compiler::PluginMain(argc, argv, &generator);
+}
+
+// vim: ts=4:sw=4:tw=79:et

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c Thu May  1 02:10:15 2014
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/hadoop_err.h"
+#include "protobuf/ProtobufRpcEngine.pb-c.h.s"
+#include "rpc/call.h"
+#include "rpc/messenger.h"
+#include "rpc/proxy.h"
+#include "rpc/varint.h"
+
+#include <errno.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <uv.h>
+
+#define proxy_log_warn(proxy, fmt, ...) \
+    fprintf(stderr, "WARN: proxy 0x%p: " fmt, proxy, __VA_ARGS__)
+#define proxy_log_info(proxy, fmt, ...) \
+    fprintf(stderr, "INFO: proxy 0x%p: " fmt, proxy, __VA_ARGS__)
+#define proxy_log_debug(proxy, fmt, ...) \
+    fprintf(stderr, "DEBUG: proxy 0x%p: " fmt, proxy, __VA_ARGS__)
+
+/**
+ * The maximum length that we'll allocate to hold a request to the server.
+ * This number includes the RequestHeader, but not the RpcRequestHeader.
+ */
+#define MAX_SEND_LEN (63 * 1024 * 1024)
+
+struct hrpc_proxy {
+    /**
+     * The messenger that this proxy is associated with.
+     */
+    struct hrpc_messenger *msgr;
+
+    /**
+     * Dynamically allocated string describing the protocol this proxy speaks. 
+     */
+    char *protocol;
+
+    /**
+     * The current call.
+     */
+    struct hrpc_call call;
+
+    /**
+     * A memory area which can be used by the current call.
+     *
+     * This will be null if userdata_len is 0.
+     */
+    uint8_t *userdata;
+
+    /**
+     * Length of userdata.
+     */
+    size_t userdata_len;
+};
+
+struct hrpc_proxy_builder {
+    struct hrpc_proxy *proxy;
+};
+
+static const char OOM_ERROR[] = "OOM";
+
+struct hrpc_proxy_builder *hrpc_proxy_builder_alloc(
+            struct hrpc_messenger *msgr)
+{
+    struct hrpc_proxy_builder *bld;
+
+    bld = calloc(1, sizeof(struct hrpc_proxy_builder));
+    if (!bld)
+        return NULL;
+    bld->proxy = calloc(1, sizeof(struct hrpc_proxy));
+    if (!bld->proxy) {
+        free(bld);
+        return NULL;
+    }
+    bld->proxy->msgr = msgr;
+    bld->proxy->call.remote.sin_addr.s_addr = INADDR_ANY;
+
+    return bld;
+}
+
+void hrpc_proxy_builder_free(struct hrpc_proxy_builder *bld)
+{
+    if (!bld)
+        return;
+    free(bld->proxy);
+    free(bld);
+}
+
+void hrpc_proxy_builder_set_protocol(struct hrpc_proxy_builder *bld,
+                                     const char *protocol)
+{
+    struct hrpc_proxy *proxy = bld->proxy;
+
+    if (proxy->protocol) {
+        if (proxy->protocol != OOM_ERROR) {
+            free(proxy->protocol);
+        }
+        proxy->protocol = NULL;
+    }
+    proxy->protocol = strdup(protocol);
+    if (!proxy->protocol) {
+        proxy->protocol = (char*)OOM_ERROR;
+    }
+}
+
+void hrpc_proxy_builder_set_remote(struct hrpc_proxy_builder *bld,
+                                     const struct sockaddr_in *remote)
+{
+    bld->proxy->call.remote = *remote;
+}
+
+struct hadoop_err *hrpc_proxy_create(struct hrpc_proxy_builder *bld,
+                            struct hrpc_proxy **out)
+{
+    struct hrpc_proxy *proxy;
+    
+    proxy = bld->proxy;
+    free(bld);
+    //fprintf(stderr, "proxy = %p, proxy->protocol = %s, proxy->call.cb = %p\n", proxy, proxy->protocol, proxy->call.cb);
+    if (proxy->call.remote.sin_addr.s_addr == INADDR_ANY) {
+        hrpc_proxy_free(proxy);
+        return hadoop_lerr_alloc(EINVAL, "hrpc_proxy_create: you must specify "
+                                 "a remote.");
+    }
+    if (!proxy->protocol) {
+        hrpc_proxy_free(proxy);
+        return hadoop_lerr_alloc(EINVAL, "hrpc_proxy_create: can't create "
+                                 "a proxy without a protocol argument.");
+    } else if (proxy->protocol == OOM_ERROR) {
+        // There was an OOM error during hrpc_proxy_builder_set_protocol.
+        hrpc_proxy_free(proxy);
+        return hadoop_lerr_alloc(ENOMEM, "hrpc_proxy_create: OOM error.");
+    }
+    *out = proxy;
+    return NULL;
+}
+
+void hrpc_proxy_free(struct hrpc_proxy *proxy)
+{
+    if (!proxy)
+        return;
+    if (hrpc_call_is_active(&proxy->call)) {
+        proxy_log_warn(proxy, "%s", "hrpc_proxy_free: attempt to free a proxy "
+                       "which is currently active!\n");
+        return;
+    }
+    if (proxy->protocol != OOM_ERROR) {
+        free(proxy->protocol);
+    }
+    free(proxy->userdata);
+    free(proxy);
+}
+
+struct hadoop_err *hrpc_proxy_activate(struct hrpc_proxy *proxy)
+{
+    struct hadoop_err *err;
+
+    if (!hrpc_call_activate(&proxy->call)) {
+        err = hadoop_lerr_alloc(EINVAL, "tried to start a call on a "
+                    "proxy which was still in use by another call.");
+        proxy_log_warn(proxy, "hrpc_proxy_activate: %s",
+                       hadoop_err_msg(err));
+        return err;
+    }
+    return NULL;
+}
+
+void hrpc_proxy_deactivate(struct hrpc_proxy *proxy)
+{
+    hrpc_call_deactivate(&proxy->call);
+}
+
+void *hrpc_proxy_alloc_userdata(struct hrpc_proxy *proxy, size_t size)
+{
+    if (size > proxy->userdata_len) {
+        uint8_t *new_userdata = realloc(proxy->userdata, size);
+        if (!new_userdata) {
+            return NULL;
+        }
+        proxy->userdata = new_userdata;
+        proxy->userdata_len = size;
+    }
+    return proxy->userdata;
+}
+
+struct hrpc_sync_ctx *hrpc_proxy_alloc_sync_ctx(struct hrpc_proxy *proxy)
+{
+    struct hrpc_sync_ctx *ctx = 
+        hrpc_proxy_alloc_userdata(proxy, sizeof(struct hrpc_proxy));
+    if (!ctx) {
+        return NULL;
+    }
+    if (uv_sem_init(&ctx->sem, 0)) {
+        return NULL;
+    }
+    memset(&ctx, 0, sizeof(ctx));
+    return ctx;
+}
+
+void hrpc_free_sync_ctx(struct hrpc_sync_ctx *ctx)
+{
+    free(ctx->resp.base);
+    uv_sem_destroy(&ctx->sem);
+}
+
+void hrpc_proxy_sync_cb(struct hrpc_response *resp, struct hadoop_err *err,
+                        void *cb_data)
+{
+    struct hrpc_sync_ctx *ctx = cb_data;
+    ctx->resp = *resp;
+    ctx->err = err;
+    uv_sem_post(&ctx->sem);
+}
+
+void hrpc_proxy_start(struct hrpc_proxy *proxy,
+        const char *method, const void *payload, int payload_packed_len,
+        hrpc_pack_cb_t payload_pack_cb,
+        hrpc_raw_cb_t cb, void *cb_data)
+{
+    RequestHeaderProto req_header = REQUEST_HEADER_PROTO__INIT;
+    uint64_t buf_len;
+    int32_t req_header_len, off = 0;
+    uint8_t *buf;
+    struct hrpc_call *call = &proxy->call;
+
+    call->cb = cb;
+    call->cb_data = cb_data;
+    call->protocol = strdup(proxy->protocol);
+    if (!call->protocol) {
+        hrpc_call_deliver_err(call, hadoop_lerr_alloc(ENOMEM,
+                "hrpc_proxy_start_internal: out of memory"));
+        return;
+    }
+
+    req_header.methodname = (char*)method;
+    req_header.declaringclassprotocolname = proxy->protocol;
+    req_header.clientprotocolversion = 1;
+    req_header_len = request_header_proto__get_packed_size(&req_header);
+    buf_len = varint32_size(req_header_len);
+    buf_len += req_header_len;
+    buf_len += varint32_size(payload_packed_len);
+    buf_len += payload_packed_len;
+    if (buf_len >= MAX_SEND_LEN) {
+        hrpc_call_deliver_err(call,
+            hadoop_lerr_alloc(EINVAL, "hrpc_proxy_setup_header: the "
+                "request length is too long at %"PRId64 " bytes.  The "
+                "maximum we will send is %d bytes.", buf_len, MAX_SEND_LEN));
+        return;
+    }
+    buf = malloc((size_t)buf_len);
+    if (!buf) {
+        hrpc_call_deliver_err(call, 
+            hadoop_lerr_alloc(ENOMEM, "hrpc_proxy_setup_header: "
+                "failed to allocate a buffer of length %"PRId64" bytes.",
+                buf_len));
+        return;
+    }
+    varint32_encode(req_header_len, buf, buf_len, &off);
+    request_header_proto__pack(&req_header, buf + off);
+    off += req_header_len;
+    varint32_encode(payload_packed_len, buf, buf_len, &off);
+    payload_pack_cb(payload, buf + off);
+
+    call->payload = uv_buf_init((char*)buf, buf_len);
+    hrpc_messenger_start_outbound(proxy->msgr, &proxy->call);
+}
+
+// vim: ts=4:sw=4:tw=79:et

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h Thu May  1 02:10:15 2014
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HADOOP_CORE_RPC_PROXY_H
+#define HADOOP_CORE_RPC_PROXY_H
+
+#include <stdint.h> /* for uint8_t */
+#include <uv.h> /* for uv_buf_t */
+
+struct hadoop_err;
+struct hrpc_messenger;
+struct hrpc_proxy;
+struct hrpc_proxy_builder;
+
+struct hrpc_response {
+    uint8_t *pb_base;
+    int pb_len;
+    void *base;
+};
+
+struct hrpc_sync_ctx {
+    uv_sem_t sem;
+    struct hadoop_err *err;
+    struct hrpc_response resp;
+};
+
+typedef void (*hrpc_raw_cb_t)(struct hrpc_response *,
+    struct hadoop_err *, void *);
+
+typedef size_t (*hrpc_pack_cb_t)(const void *, uint8_t *);
+
+/**
+ * Allocate a Hadoop proxy builder.
+ *
+ * @param msgr      The Hadoop messenger that this proxy will be associated
+ *                    with.
+ * @return          A Hadoop proxy builder, or NULL on OOM.
+ */
+struct hrpc_proxy_builder *hrpc_proxy_builder_alloc(
+                struct hrpc_messenger *msgr);
+
+/**
+ * Free a Hadoop proxy builder.
+ *
+ * @param bld       The Hadoop proxy builder to free.
+ */
+void hrpc_proxy_builder_free(struct hrpc_proxy_builder *bld);
+
+/**
+ * Set the protocol used by a proxy.
+ *
+ * @param bld       The Hadoop proxy builder.
+ * @param proto     The protocol string to use.  Will be deep-copied.
+ */
+void hrpc_proxy_builder_set_protocol(struct hrpc_proxy_builder *bld,
+                                     const char *proto);
+
+/**
+ * Set the remote that the proxy should connect to.
+ *
+ * @param bld       The Hadoop proxy builder.
+ * @param remote    The remote.  Will be deep-copied.
+ */
+void hrpc_proxy_builder_set_remote(struct hrpc_proxy_builder *bld,
+                                   const struct sockaddr_in *remote);
+
+/**
+ * Create a Hadoop proxy
+ *
+ * @param bld       The Hadoop proxy builder to use.
+ *                      The builder will be freed, even on failure.
+ * @param out       (out param) On success, the Hadoop proxy.
+ *
+ * @return          On success, NULL.  On error, the error message.
+ */
+struct hadoop_err *hrpc_proxy_create(struct hrpc_proxy_builder *bld,
+                            struct hrpc_proxy **out);
+
+/**
+ * Free a Hadoop proxy.
+ *
+ * @param proxy     The Hadoop proxy to free.  You must not attempt to free a
+ *                      proxy with a call in progress.
+ */
+void hrpc_proxy_free(struct hrpc_proxy *proxy);
+
+/**
+ * Mark the proxy as active.
+ *
+ * @param proxy                 The proxy
+ *
+ * @return                      NULL on success.  If the proxy is already
+ *                              active, an error will be returned. 
+ */
+struct hadoop_err *hrpc_proxy_activate(struct hrpc_proxy *proxy);
+
+/**
+ * Mark the proxy as inactive.
+ *
+ * This function should not be called after hrpc_proxy_start, since a proxy
+ * that has been started will mark itself as inactive when appropriate.
+ *
+ * @param proxy                 The proxy.
+ */
+void hrpc_proxy_deactivate(struct hrpc_proxy *proxy);
+
+/**
+ * Allocate some data in the proxy's userdata area.
+ *
+ * This will overwrite anything previously allocated in the proxy's userdata
+ * area.  It is not necessary to free this memory later; it will be freed when
+ * the proxy is freed.
+ *
+ * @param proxy                 The proxy
+ *
+ * @return                      NULL on OOM; a pointer to the userdata
+ *                              otherwise.
+ */
+void *hrpc_proxy_alloc_userdata(struct hrpc_proxy *proxy, size_t size);
+
+/**
+ * Allocate a sync context from a proxy via hrpc_proxy_alloc_userdata.
+ *
+ * @param proxy                 The proxy
+ *
+ * @return                      NULL on OOM; the sync context otherwise.
+ */
+struct hrpc_sync_ctx *hrpc_proxy_alloc_sync_ctx(struct hrpc_proxy *proxy);
+
+/**
+ * Free a sync context allocated from a proxy.
+ *
+ * @param proxy                 The sync context.
+ */
+void hrpc_free_sync_ctx(struct hrpc_sync_ctx *ctx);
+
+/**
+ * A callback which synchronous RPCs can use.
+ */
+void hrpc_proxy_sync_cb(struct hrpc_response *resp, struct hadoop_err *err,
+                        void *cb_data);
+
+/**
+ * Start an outgoing RPC from the proxy.
+ *
+ * This method will return after queuing up the RPC to be sent.
+ *
+ * @param proxy                 The Hadoop proxy to use.  A single proxy can
+ *                                  only make one call at once.
+ * @param method                The method we're calling.
+ * @param payload               The protobuf message we're sending.
+ * @param payload_packed_len    Length of payload when serialized.
+ * @param payload_pack_cb       Function used to pack the payload.
+ * @param cb                    Callback invoked when the message is done.
+ * @param cb_data               Data provided along with cb.
+ */
+void hrpc_proxy_start(struct hrpc_proxy *proxy,
+        const char *method, const void *payload, int payload_packed_len,
+        hrpc_pack_cb_t payload_pack_cb,
+        hrpc_raw_cb_t cb, void *cb_data);
+
+#endif
+
+// vim: ts=4:sw=4:tw=79:et

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c Thu May  1 02:10:15 2014
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/hadoop_err.h"
+#include "common/net.h"
+#include "common/queue.h"
+#include "common/tree.h"
+#include "rpc/call.h"
+#include "rpc/messenger.h"
+#include "rpc/reactor.h"
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <uv.h>
+
+#define reactor_log_warn(reactor, fmt, ...) \
+    fprintf(stderr, "WARN: reactor %p: " fmt, reactor, __VA_ARGS__)
+#define reactor_log_info(msgr, fmt, ...) \
+    fprintf(stderr, "INFO: reactor %p: " fmt, reactor, __VA_ARGS__)
+#define reactor_log_debug(msgr, fmt, ...) \
+    fprintf(stderr, "DBUG: reactor %p: " fmt, reactor, __VA_ARGS__)
+
+RB_GENERATE(hrpc_conns, hrpc_conn, entry, hrpc_conn_compare);
+
+static void reactor_thread_run(void *arg)
+{
+    struct hrpc_reactor *reactor = arg;
+    struct hrpc_conn *conn, *conn_tmp;
+
+    reactor_log_debug(reactor, "%s", "reactor thread starting.\n");
+    uv_run(&reactor->loop, UV_RUN_DEFAULT);
+    reactor_log_debug(reactor, "%s", "reactor thread terminating.\n");
+    RB_FOREACH_SAFE(conn, hrpc_conns, &reactor->conns, conn_tmp) {
+        hrpc_conn_destroy(conn, hadoop_lerr_alloc(ESHUTDOWN, 
+            "hrpc_reactor_start_outbound: the reactor is being shut down."));
+    }
+}
+
+/**
+ * Find an idle connection with a given address in the idle connection map.
+ *
+ * @param reactor       The reactor.
+ * @param remote        The remote address to find.
+ */
+static struct hrpc_conn *reuse_idle_conn(struct hrpc_reactor *reactor,
+            const struct sockaddr_in *remote, const struct hrpc_call *call)
+{
+    struct hrpc_conn *conn;
+    struct hrpc_conn exemplar;
+
+    memset(&exemplar, 0, sizeof(exemplar));
+    exemplar.remote = *remote;
+    exemplar.protocol = call->protocol;
+    conn = RB_NFIND(hrpc_conns, &reactor->conns, &exemplar);
+    if (!conn)
+        return NULL;
+    if (hrpc_conn_usable(conn, remote, call->protocol)) {
+        if (conn->writer.state == HRPC_CONN_WRITE_IDLE) {
+            RB_REMOVE(hrpc_conns, &reactor->conns, conn);
+            return conn;
+        }
+    }
+    return NULL;
+}
+
+static void reactor_begin_shutdown(struct hrpc_reactor *reactor,
+                             struct hrpc_calls *pending_calls)
+{
+    struct hrpc_call *call;
+
+    reactor_log_debug(reactor, "%s", "reactor_begin_shutdown\n");
+    STAILQ_FOREACH(call, pending_calls, entry) {
+        hrpc_call_deliver_err(call, hadoop_lerr_alloc(ESHUTDOWN, 
+            "hrpc_reactor_start_outbound: the reactor is being shut down."));
+    }
+    // Note: other callbacks may still run after the libuv loop has been
+    // stopped.  But we won't block for I/O after this point.
+    uv_stop(&reactor->loop);
+}
+
+static void reactor_async_start_outbound(struct hrpc_reactor *reactor,
+                                         struct hrpc_call *call)
+{
+    char remote_str[64] = { 0 };
+    struct hrpc_conn *conn;
+    struct hadoop_err *err;
+
+    conn = reuse_idle_conn(reactor, &call->remote, call);
+    if (conn) {
+        reactor_log_debug(reactor, "start_outbound(remote=%s) assigning to "
+                       "connection %p\n",
+            net_ipv4_name(&call->remote, remote_str, sizeof(remote_str)), conn);
+        hrpc_conn_start_outbound(conn, call);
+    } else {
+        err = hrpc_conn_create_outbound(reactor, call, &conn);
+        if (err) {
+            reactor_log_warn(reactor, "reactor_async_start_outbound("
+                "remote=%s) got error %s\n",
+                net_ipv4_name(&call->remote, remote_str, sizeof(remote_str)),
+                hadoop_err_msg(err));
+            hrpc_call_deliver_err(call, err);
+            return;
+        }
+        reactor_log_debug(reactor, "start_outbound(remote=%s) created new "
+                       "connection %p\n",
+            net_ipv4_name(&call->remote, remote_str, sizeof(remote_str)), conn);
+    }
+    // Add or re-add the connection to the reactor's tree.
+    RB_INSERT(hrpc_conns, &reactor->conns, conn);
+}
+
+static void reactor_async_cb(uv_async_t *handle)
+{
+    struct hrpc_reactor *reactor = handle->data;
+    int shutdown;
+    struct hrpc_calls pending_calls = STAILQ_HEAD_INITIALIZER(pending_calls);
+    struct hrpc_call *call;
+
+    uv_mutex_lock(&reactor->inbox.lock);
+    shutdown = reactor->inbox.shutdown;
+    STAILQ_SWAP(&reactor->inbox.pending_calls, &pending_calls,
+                hrpc_call);
+    uv_mutex_unlock(&reactor->inbox.lock);
+
+    if (shutdown) {
+        reactor_begin_shutdown(reactor, &pending_calls);
+        return;
+    }
+    STAILQ_FOREACH(call, &pending_calls, entry) {
+        reactor_async_start_outbound(reactor, call);
+    }
+}
+
+void reactor_remove_conn(struct hrpc_reactor *reactor, struct hrpc_conn *conn)
+{
+    struct hrpc_conn *removed;
+
+    removed = RB_REMOVE(hrpc_conns, &reactor->conns, conn);
+    if (!removed) {
+        reactor_log_warn(reactor, "reactor_remove_conn("
+            "conn=%p): no such connection found.\n", conn);
+    }
+}
+
+struct hadoop_err *hrpc_reactor_create(struct hrpc_reactor **out)
+{
+    struct hrpc_reactor *reactor = NULL;
+    struct hadoop_err *err = NULL;
+    int res;
+
+    reactor = calloc(1, sizeof(struct hrpc_reactor));
+    if (!reactor) {
+        err = hadoop_lerr_alloc(ENOMEM, "hrpc_reactor_create: OOM while allocating "
+                                "reactor structure.");
+        goto error_free_reactor;
+    }
+    if (uv_mutex_init(&reactor->inbox.lock) < 0) {
+        err = hadoop_lerr_alloc(ENOMEM, "hrpc_reactor_create: failed to init "
+                                "mutex.");
+        goto error_free_reactor;
+    }
+    RB_INIT(&reactor->conns);
+    STAILQ_INIT(&reactor->inbox.pending_calls);
+    if (uv_loop_init(&reactor->loop)) {
+        err = hadoop_lerr_alloc(ENOMEM, "hrpc_reactor_create: uv_loop_init "
+                                "failed.");
+        goto error_free_mutex;
+    }
+    res = uv_async_init(&reactor->loop, &reactor->inbox.notifier,
+                      reactor_async_cb);
+    if (res) {
+        err = hadoop_uverr_alloc(res, "hrpc_reactor_create: "
+                                 "uv_async_init failed.");
+        goto error_close_loop;
+    }
+    reactor->inbox.notifier.data = reactor;
+    res = uv_thread_create(&reactor->thread, reactor_thread_run, reactor);
+    if (res) {
+        err = hadoop_lerr_alloc(ENOMEM, "hrpc_reactor_create: "
+                                "uv_thread_create failed.");
+        goto error_free_async;
+    }
+    *out = reactor;
+    return NULL;
+
+error_free_async:
+    uv_close((uv_handle_t*)&reactor->inbox.notifier, NULL);
+error_close_loop:
+    uv_loop_close(&reactor->loop);
+error_free_mutex:
+    uv_mutex_destroy(&reactor->inbox.lock);
+error_free_reactor:
+    free(reactor);
+    return err;
+}
+
+void hrpc_reactor_shutdown(struct hrpc_reactor *reactor)
+{
+    reactor_log_debug(reactor, "%s", "hrpc_reactor_shutdown\n");
+    uv_mutex_lock(&reactor->inbox.lock);
+    reactor->inbox.shutdown = 1;
+    uv_mutex_unlock(&reactor->inbox.lock);
+    uv_async_send(&reactor->inbox.notifier);
+    uv_thread_join(&reactor->thread);
+}
+
+void hrpc_reactor_free(struct hrpc_reactor *reactor)
+{
+    reactor_log_debug(reactor, "%s", "hrpc_reactor_free\n");
+    uv_loop_close(&reactor->loop);
+    uv_mutex_destroy(&reactor->inbox.lock);
+    free(reactor);
+}
+
+void hrpc_reactor_start_outbound(struct hrpc_reactor *reactor,
+                                 struct hrpc_call *call)
+{
+    int shutdown = 0;
+
+    uv_mutex_lock(&reactor->inbox.lock);
+    shutdown = reactor->inbox.shutdown;
+    if (!shutdown) {
+        STAILQ_INSERT_TAIL(&reactor->inbox.pending_calls, call, entry);
+    }
+    uv_mutex_unlock(&reactor->inbox.lock);
+    if (shutdown) {
+        hrpc_call_deliver_err(call, hadoop_lerr_alloc(ESHUTDOWN, 
+            "hrpc_reactor_start_outbound: can't start call because the "
+            "reactor has been shut down."));
+    } else {
+        uv_async_send(&reactor->inbox.notifier);
+    }
+}
+
+// vim: ts=4:sw=4:tw=79:et

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.h?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.h (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.h Thu May  1 02:10:15 2014
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HADOOP_CORE_RPC_REACTOR_H
+#define HADOOP_CORE_RPC_REACTOR_H
+
+#include "common/tree.h"
+#include "rpc/call.h" // for hrpc_call
+#include "rpc/conn.h" // for hrpc_conn_compare
+
+#include <stdint.h>
+#include <uv.h>
+
+/**
+ * The Hadoop reactor thread header.
+ *
+ * Note: this is an internal header which users of the RPC layer don't need to
+ * include.
+ */
+
+RB_HEAD(hrpc_conns, hrpc_conn);
+RB_PROTOTYPE(hrpc_conns, hrpc_conn, entry, hrpc_conn_compare);
+
+struct hrpc_reactor_inbox {
+    /**
+     * Lock which protects the inbox.
+     */
+    uv_mutex_t lock;
+
+    /**
+     * Non-zero if the reactor should shut down.
+     */
+    int shutdown;
+
+    /**
+     * Calls which we have been asked to make.
+     */
+    struct hrpc_calls pending_calls;
+
+    /**
+     * Used to trigger the inbox callback on the reactor thread.
+     *
+     * You do not need the inbox lock to send an async signal.
+     */
+    uv_async_t notifier;
+};
+
+/**
+ * A Hadoop RPC reactor thread.
+ *
+ * Each reactor thread uses libuv to send and receive on multiple TCP sockets
+ * asynchronously.
+ *
+ * With the exception of the inbox, everything in this structure must be
+ * accessed ONLY from the reactor thread.  Nothing is safe to read or write
+ * from another thread except the inbox.
+ */
+struct hrpc_reactor {
+    /**
+     * The inbox for incoming work for this reactor thread.
+     */
+    struct hrpc_reactor_inbox inbox;
+
+    /**
+     * A red-black tree of connections.  This makes it possible to find a
+     * connection to a given IP address quickly.
+     *
+     * We may have multiple connections for the same IP:port combination.
+     */
+    struct hrpc_conns conns;
+
+    /**
+     * The libuv loop.
+     */
+    uv_loop_t loop;
+
+    /**
+     * The libuv timer.  Used to expire connections after a timeout has
+     * elapsed.
+     */
+    uv_timer_t timer;
+
+    /**
+     * The reactor thread.  All reactor callbacks are made from this context.
+     */
+    uv_thread_t thread;
+};
+
+/**
+ * Remove a connection from the reactor.
+ *
+ * @param reactor       The reactor.
+ * @param conn          The connection.
+ */
+void reactor_remove_conn(struct hrpc_reactor *reactor, struct hrpc_conn *conn);
+
+/**
+ * Create the reactor thread.
+ */
+struct hadoop_err *hrpc_reactor_create(struct hrpc_reactor **out);
+
+/**
+ * Shut down the reactor thread and wait for it to terminate.
+ *
+ * All pending calls will get timeout errors.
+ */
+void hrpc_reactor_shutdown(struct hrpc_reactor *reactor);
+
+/**
+ * Free the reactor.
+ */
+void hrpc_reactor_free(struct hrpc_reactor *reactor);
+
+/**
+ * Start an outbound transfer.
+ *
+ * @param reactor       The reactor.
+ * @param conn          The connection.  This connection must be either new, or 
+ * All pending calls will get timeout errors.
+ */
+void hrpc_reactor_start_outbound(struct hrpc_reactor *reactor,
+                                 struct hrpc_call *call);
+
+#endif
+
+// vim: ts=4:sw=4:tw=79:et

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/shorten.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/shorten.c?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/shorten.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/shorten.c Thu May  1 02:10:15 2014
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ctype.h>
+#include <errno.h>
+#include <limits.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define MAX_LINE_LEN                    16384
+#define IFNDEF                          "#ifndef"
+#define IFNDEF_LEN                      (sizeof(IFNDEF) - 1)
+
+enum parse_state {
+    PARSE_IFNDEF = 0,
+    PARSE_STRUCTS_AND_ENUMS,
+    PARSE_MESSAGES,
+    PARSE_DONE,
+};
+
+#define PROTOBUF_C_END_DECLS_STR "PROTOBUF_C_END_DECLS"
+
+static const char *PARSE_STATE_TERMINATORS[] = {
+    "PROTOBUF_C_BEGIN_DECLS",
+    "/* --- messages --- */",
+    PROTOBUF_C_END_DECLS_STR
+};
+
+static const char *MESSAGE_SUFFIXES[] = {
+    "__INIT",
+    "__get_packed_size",
+    "__pack",
+    "__pack_to_buffer",
+    "__unpack",
+    "__free_unpacked",
+};
+
+#define NUM_MESSAGE_SUFFIXES \
+    (sizeof(MESSAGE_SUFFIXES) / sizeof(MESSAGE_SUFFIXES[0]))
+
+static void add_word(char ***words, size_t *num_words, const char *word)
+{
+    size_t new_num_words;
+    char *nword;
+    char **nwords;
+
+    new_num_words = *num_words + 1;
+    nword = strdup(word);
+    if (!nword) {
+        fprintf(stderr, "failed to allocate memory for %Zd words\n",
+                new_num_words);
+        exit(1);
+    }
+    nwords = realloc(*words, sizeof(char **) * new_num_words);
+    if (!nwords) {
+        fprintf(stderr, "failed to allocate memory for %Zd words\n",
+                new_num_words);
+        free(nword);
+        exit(1);
+    }
+    nwords[new_num_words - 1] = nword;
+    *num_words = new_num_words;
+    *words = nwords;
+}
+
+static int has_suffix(const char *str, const char *suffix)
+{
+    int str_len = strlen(str);
+    int suffix_len = strlen(suffix);
+    if (str_len < suffix_len)
+        return 0;
+    return strcmp(str + str_len - suffix_len, suffix) == 0;
+}
+
+static int has_message_suffix(const char *word)
+{
+    size_t i = 0;
+
+    for (i = 0; i < NUM_MESSAGE_SUFFIXES; i++) {
+        if (has_suffix(word, MESSAGE_SUFFIXES[i]))
+            return 1;
+    }
+    return 0;
+}
+
+static void add_words(char ***words, size_t *num_words,
+                    char *line, enum parse_state state)
+{
+    char *word, *ptr = NULL;
+
+    for (word = strtok_r(line, " ", &ptr); word; 
+             word = strtok_r(NULL, " ", &ptr)) {
+        if (word[0] == '_')
+            continue;
+        if (!strstr(word, "__"))
+            continue;
+        if ((state == PARSE_MESSAGES) && (!has_message_suffix(word)))
+            continue;
+        add_word(words, num_words, word);
+    }
+}
+
+static int compare_strings(const void *a, const void *b)
+{
+    return strcmp(*(char * const*)a, *(char * const*)b);
+}
+
+static char *get_last_occurrence(char *haystack, const char *needle)
+{
+    char *val = NULL, *nval;
+    int needle_len = strlen(needle);
+
+    while (1) {
+        nval = strstr(haystack, needle);
+        if (!nval)
+            return val;
+        val = nval + needle_len;
+        haystack = nval + needle_len;
+    }
+}
+
+static char *get_second_last_occurrence(char *haystack, const char *needle)
+{
+    char *pval = NULL, *val = NULL, *nval;
+    int needle_len = strlen(needle);
+
+    while (1) {
+        nval = strstr(haystack, needle);
+        if (!nval)
+            return pval;
+        pval = val;
+        val = nval + needle_len;
+        haystack = nval + needle_len;
+    }
+}
+
+static int has_camel_case(const char *str)
+{
+    int i, prev_lower = 0;
+
+    for (i = 0; str[i]; i++) {
+        if (isupper(str[i])) {
+            if (prev_lower)
+                return 1;
+        } else if (islower(str[i])) {
+            prev_lower = 1;
+        }
+    }
+    return 0;
+}
+
+static char *get_shortened_occurrence(char *str)
+{
+    char *last, *slast;
+        
+    last = get_last_occurrence(str, "__");
+    slast = get_second_last_occurrence(str, "__");
+
+    last = get_last_occurrence(str, "__");
+    if (!last)
+        return NULL;
+    if ((!has_message_suffix(str)) && 
+            (strstr(last, "_") || has_camel_case(last))) { 
+        // Heuristic: if the last bit of the string after the double underscore
+        // has another underscore inside, or has mixed case, we assume it's
+        // complex enough to use on its own.
+        return last;
+    }
+    // Otherwise, we grab the part of the string after the second-last double
+    // underscore.
+    slast = get_second_last_occurrence(str, "__");
+    return slast ? slast : last;
+}
+
+static int output_shortening_macros(char **words, size_t num_words,
+                                    const char *out_path, FILE *out)
+{
+    size_t i;
+    const char *prev_word = "";
+    const char *shortened;
+
+    for (i = 0; i < num_words; i++) {
+        if (strcmp(prev_word, words[i]) == 0) {
+            // skip words we've already done
+            continue;
+        }
+        prev_word = words[i];
+        shortened = get_shortened_occurrence(words[i]);
+        if (shortened) {
+            if (fprintf(out, "#define %s %s\n", shortened, words[i]) < 0) {
+                fprintf(stderr, "error writing to %s\n", out_path);
+                return EIO;
+            }
+        }
+    }
+    return 0;
+}
+
+/**
+ * Remove newlines from a buffer.
+ *
+ * @param line          The buffer.
+ */
+static void chomp(char *line)
+{
+    while (1) {
+        int len = strlen(line);
+        if (len == 0) {
+            return;
+        }
+        if (line[len - 1] != '\n') {
+            return;
+        }
+        line[len - 1] = '\0';
+    }
+}
+
+/**
+ * Remove most non-alphanumeric characters from a buffer.
+ *
+ * @param line          The buffer.
+ */
+static void asciify(char *line)
+{
+    int i;
+
+    for (i = 0; line[i]; i++) {
+        if ((!isalnum(line[i])) && (line[i] != '_') && (line[i] != '#')) {
+            line[i] = ' ';
+        }
+    }
+}
+
+static const char *base_name(const char *path)
+{
+    const char *base;
+    
+    base = rindex(path, '/');
+    if (!base)
+        return NULL;
+    return base + 1;
+}
+
+static int process_file_lines(const char *in_path, const char *out_path,
+                    FILE *in, FILE *out, char ***words, size_t *num_words)
+{
+    int ret;
+    char header_guard[MAX_LINE_LEN] = { 0 };
+    char line[MAX_LINE_LEN] = { 0 };
+    const char *base = base_name(in_path);
+    enum parse_state state = PARSE_IFNDEF;
+
+    if (!base) {
+        fprintf(stderr, "failed to get basename of %s\n", in_path);
+        return EINVAL;
+    }
+    while (1) {
+        if (!fgets(line, MAX_LINE_LEN - 1, in)) {
+            if (ferror(in)) {
+                ret = errno;
+                fprintf(stderr, "error reading %s: %s (%d)\n",
+                        in_path, strerror(ret), ret);
+                return ret;
+            }
+            fprintf(stderr, "error reading %s: didn't find " 
+                    PROTOBUF_C_END_DECLS_STR, in_path);
+            return EINVAL;
+        }
+        if (strstr(line, PARSE_STATE_TERMINATORS[state])) {
+            state = state + 1;
+            if (state == PARSE_DONE) {
+                break;
+            }
+            continue;
+        }
+        chomp(line);
+        asciify(line);
+        switch (state) {
+        case PARSE_IFNDEF:
+            if (strncmp(line, IFNDEF, IFNDEF_LEN) == 0) {
+                strcpy(header_guard, line + IFNDEF_LEN + 1);
+            }
+            break;
+        default:
+            add_words(words, num_words, line, state);
+            break;
+        }
+    }
+    if (!header_guard[0]) {
+        fprintf(stderr, "failed to find header guard for %s\n", in_path);
+        return EINVAL;
+    }
+    qsort(*words, *num_words, sizeof(char*), compare_strings);
+    fprintf(out, "#ifndef %s_S\n", header_guard);
+    fprintf(out, "#define %s_S\n\n", header_guard);
+    fprintf(out, "#include \"%s\"\n\n", base);
+    ret = output_shortening_macros(*words, *num_words, out_path, out);
+    if (ret)
+        return ret;
+    fprintf(out, "\n#endif\n");
+    return 0;
+}
+
+static int process_file(const char *in_path)
+{
+    char out_path[PATH_MAX] = { 0 };
+    int res, ret = 0;
+    FILE *in = NULL, *out = NULL;
+    char **words = NULL;
+    size_t num_words = 0;
+    size_t i;
+
+    res = snprintf(out_path, PATH_MAX, "%s.s", in_path);
+    if ((res < 0) || (res >= PATH_MAX)) {
+        fprintf(stderr, "snprintf error for %s\n", in_path);
+        ret = EINVAL;
+        goto done;
+    }
+    in = fopen(in_path, "r");
+    if (!in) {
+        ret = errno;
+        fprintf(stderr, "failed to open %s for read: error %s (%d)\n",
+                in_path, strerror(ret), ret);
+        goto done;
+    }
+    out = fopen(out_path, "w");
+    if (!out) {
+        ret = errno;
+        fprintf(stderr, "failed to open %s for write: error %s (%d)\n",
+                out_path, strerror(ret), ret);
+        goto done;
+    }
+    ret = process_file_lines(in_path, out_path, in, out, &words, &num_words);
+    for (i = 0; i < num_words; i++) {
+        free(words[i]);
+    }
+    free(words);
+    if (ret) {
+        goto done;
+    }
+    if (fclose(out)) {
+        ret = errno;
+        perror("fclose error");
+    }
+    out = NULL;
+done:
+    if (in) {
+        fclose(in);
+    }
+    if (out) {
+        fclose(out);
+    }
+    return ret;
+}
+
+static void usage(void)
+{
+        fprintf(stderr,
+"shorten: creates header files with shorter definitions for protobuf-c\n"
+"definitions.  Output files will be written to the same paths as input\n"
+"files, but with a .s extension tacked on.\n"
+"\n"
+"usage: shorten [paths-to-headers]\n");
+}
+
+int main(int argc, char **argv)
+{
+    int i, ret, nproc = 0, rval = EXIT_SUCCESS;
+
+    if (argc < 2) {
+        usage();
+        exit(EXIT_SUCCESS);
+    }
+    for (i = 1; i < argc; i++) {
+        ret = process_file(argv[i]);
+        if (ret) {
+            fprintf(stderr, "error processing %s\n", argv[i]);
+            rval = EXIT_FAILURE;
+        } else {
+            nproc++;
+        }
+    }
+    //fprintf(stderr, "successfully processed %d files\n", nproc);
+    return rval;
+}
+
+// vim: ts=4:sw=4:tw=79:et

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint-unit.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint-unit.c?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint-unit.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint-unit.c Thu May  1 02:10:15 2014
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/test.h"
+#include "rpc/varint.h"
+
+#include <errno.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+static int varint32_round_trip_test(int val)
+{
+    int size;
+    int32_t oval = 0;
+    int32_t off = 0;
+    uint8_t buf[16] = { 0 };
+    const size_t buf_len = sizeof(buf)/sizeof(buf[0]);
+
+    size = varint32_size(val);
+    EXPECT_INT_ZERO(varint32_encode(val, buf, buf_len, &off));
+    EXPECT_INT_EQ(size, off);
+    off = 0;
+    EXPECT_INT_ZERO(varint32_decode(&oval, buf, buf_len, &off));
+    EXPECT_INT_EQ(size, off);
+    EXPECT_INT_EQ(val, oval);
+    return 0;
+}
+
+static int be32_round_trip_test(int val)
+{
+    int32_t oval;
+    uint8_t buf[sizeof(int32_t)] = { 0 };
+
+    be32_encode(val, buf);
+    oval = be32_decode(buf);
+    EXPECT_INT_EQ(val, oval);
+    return 0;
+}
+
+static int round_trip_test(int var)
+{
+    EXPECT_INT_ZERO(varint32_round_trip_test(var));
+    EXPECT_INT_ZERO(be32_round_trip_test(var));
+    return 0;
+}
+
+int main()
+{
+    round_trip_test(0);
+    round_trip_test(123);
+    round_trip_test(6578);
+    round_trip_test(0x7fffffff);
+    round_trip_test(-15);
+    round_trip_test(-1);
+    return EXIT_SUCCESS;
+}
+
+// vim: ts=4:sw=4:tw=79:et

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint.c?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint.c Thu May  1 02:10:15 2014
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "varint.h"
+
+#include <errno.h>
+#include <stdint.h>
+
+int varint32_size(int32_t val)
+{
+    if ((val & (0xffffffff <<  7)) == 0) return 1;
+    if ((val & (0xffffffff << 14)) == 0) return 2;
+    if ((val & (0xffffffff << 21)) == 0) return 3;
+    if ((val & (0xffffffff << 28)) == 0) return 4;
+    return 5;
+}
+
+int varint32_encode(int32_t val, uint8_t *buf, int32_t len, int32_t *off)
+{
+    int32_t o = *off;
+    uint32_t var = (uint32_t)val;
+
+    while (1) {
+        if (o == len)
+            return -ENOBUFS;
+        if (var <= 127) {
+            buf[o++] = var;
+            break;
+        }
+        buf[o++] = 0x80 | (var & 0x7f);
+        var >>= 7;
+    }
+    *off = o;
+    return 0;
+}
+
+int varint32_decode(int32_t *val, const uint8_t *buf, int32_t len,
+                    int32_t *off)
+{
+    uint32_t accum = 0;
+    int shift = 0, o = *off, idx = 0;
+    uint8_t b;
+
+    do {
+        if (o == len)
+            return -ENOBUFS;
+        if (idx++ > 5)
+            return -EINVAL;
+        b = buf[o++];
+        accum += (b & 0x7f) << shift;
+        shift += 7;
+    } while(b & 0x80);
+    *val = (int32_t)accum;
+    *off = o;
+    return 0;
+}
+
+void be32_encode(int32_t val, uint8_t *buf)
+{
+    buf[0] = (val >> 24) & 0xff;
+    buf[1] = (val >> 16) & 0xff;
+    buf[2] = (val >> 8) & 0xff;
+    buf[3] = (val >> 0) & 0xff;
+}
+
+int32_t be32_decode(const uint8_t *buf)
+{
+    int32_t v = 0;
+    v |= (buf[0] << 24);
+    v |= (buf[1] << 16);
+    v |= (buf[2] << 8);
+    v |= (buf[3] << 0);
+    return v;
+}
+
+// vim: ts=4:sw=4:tw=79:et

Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint.h?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint.h (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint.h Thu May  1 02:10:15 2014
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HADOOP_CORE_VARINT_H
+#define HADOOP_CORE_VARINT_H
+
+#include <stdint.h>
+
+/**
+ * Compute the size of a varint.
+ *
+ * @param val           The value to encode.
+ *
+ * @return              The number of bytes it will take to encode.
+ */
+int varint32_size(int32_t val);
+
+/**
+ * Encode a 32-bit varint.
+ *
+ * @param val           The value to encode.
+ * @param buf           The buffer to encode into.
+ * @param len           The length of the buffer.
+ * @param off           (inout) The offset to start writing at.  This will be
+ *                      updated with the new offset after we finish the
+ *                      encoding.
+ *
+ * @return              0 on success; -EINVAL if we ran out of space.
+ */
+int varint32_encode(int32_t val, uint8_t *buf, int32_t max, int32_t *off);
+
+/**
+ * Decode a 32-bit varint.
+ *
+ * @param val           (out param) The decoded value.
+ * @param buf           The buffer to decode from.
+ * @param len           The length of the buffer.
+ * @param off           (inout) The offset to start reading at.  This will be
+ *                      updated with the new offset after we finish the
+ *                      decoding.
+ *
+ * @return              0 on success; -EINVAL if we ran out of space.
+ */
+int varint32_decode(int32_t *val, const uint8_t *buf, int32_t max,
+                    int32_t *off);
+
+/**
+ * Encode a fixed-len, big-endian int32_t.
+ *
+ * @param val           The value to encode.
+ * @param buf           The buffer to encode into.
+ */
+void be32_encode(int32_t val, uint8_t *buf);
+
+/**
+ * Decode a fixed-len, big-endian int32_t.
+ *
+ * @param buf           The buffer to decode from.  The buffer must be at
+ *                        least 4 bytes long.
+ *
+ * @return              The decoded integer.
+ */
+int32_t be32_decode(const uint8_t *buf);
+
+#endif
+
+// vim: ts=4:sw=4:tw=79:et