You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/05/01 04:10:16 UTC
svn commit: r1591534 [3/3] - in
/hadoop/common/branches/HADOOP-10388/hadoop-native-core: ./ common/ hdfs/
rpc/
Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/protoc-gen-hrpc.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/protoc-gen-hrpc.cc?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/protoc-gen-hrpc.cc (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/protoc-gen-hrpc.cc Thu May 1 02:10:15 2014
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ctype.h>
+#include <google/protobuf/compiler/code_generator.h>
+#include <google/protobuf/compiler/plugin.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/io/printer.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+#include <google/protobuf/stubs/common.h>
+#include <tr1/memory>
+
+#include <iostream>
+#include <map>
+#include <stdio.h>
+#include <string>
+
+#define PROTO_EXTENSION ".proto"
+
+#define APACHE_HEADER \
+"/**\n" \
+" * Licensed to the Apache Software Foundation (ASF) under one\n" \
+" * or more contributor license agreements. See the NOTICE file\n" \
+" * distributed with this work for additional information\n" \
+" * regarding copyright ownership. The ASF licenses this file\n" \
+" * to you under the Apache License, Version 2.0 (the\n" \
+" * \"License\"); you may not use this file except in compliance\n" \
+" * with the License. You may obtain a copy of the License at\n" \
+" *\n" \
+" * http://www.apache.org/licenses/LICENSE-2.0\n" \
+" *\n" \
+" * Unless required by applicable law or agreed to in writing, software\n" \
+" * distributed under the License is distributed on an \"AS IS\" BASIS,\n" \
+" * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" \
+" * See the License for the specific language governing permissions and\n" \
+" * limitations under the License.\n" \
+" */\n"
+
+using google::protobuf::FileDescriptor;
+using google::protobuf::MethodDescriptor;
+using google::protobuf::ServiceDescriptor;
+using google::protobuf::compiler::GeneratorContext;
+using google::protobuf::io::Printer;
+using std::map;
+using std::string;
+using std::tr1::shared_ptr;
+
+typedef map<string, string> string_map_t;
+
+static string camel_case_to_uscore(const string &in)
+{
+ string out;
+ bool prev_lower = false;
+
+ for (size_t i = 0; i < in.size(); i++) {
+ char c = in[i];
+ if (isupper(c)) {
+ if (prev_lower) {
+ out += "_";
+ }
+ prev_lower = false;
+ } else if (islower(c) || isdigit(c)) {
+ prev_lower = true;
+ } else {
+ prev_lower = false;
+ }
+ out += tolower(c);
+ }
+ return out;
+}
+
+static bool try_strip_suffix(const string &str, const char *suffix,
+ string *out)
+{
+ size_t suffix_len = strlen(suffix);
+
+ if (str.size() < suffix_len) {
+ return false;
+ }
+ *out = str.substr(0, str.size() - suffix_len);
+ return true;
+}
+
+static void get_base_name(const string &path, string *base)
+{
+ size_t last_slash = path.find_last_of("/");
+ if (last_slash != string::npos) {
+ *base = path.substr(last_slash + 1);
+ } else {
+ *base = path;
+ }
+}
+
+static string set_path_substitutions(const FileDescriptor *file,
+ string_map_t *map)
+{
+ string path = file->name();
+ (*map)["path"] = path;
+ // Initialize path_
+ // If path = /foo/bar/baz_stuff.proto, path_ = /foo/bar/baz_stuff
+ string path_without_extension;
+ if (!try_strip_suffix(path, PROTO_EXTENSION, &path_without_extension)) {
+ return string("file name " + path + " did not end in " +
+ PROTO_EXTENSION);
+ }
+ (*map)["path_without_extension"] = path_without_extension;
+
+ // If path = /foo/bar/baz_stuff.proto, base_ = baz_stuff
+ string base;
+ get_base_name(path_without_extension, &base);
+ (*map)["path_base"] = base;
+ (*map)["function_prefix"] = base;
+ return "";
+}
+
+static string shorten_service_prefix(const string &prefix)
+{
+ if (prefix == "ClientNamenodeProtocol") {
+ return "cnn";
+ } else if (prefix == "ClientDatanodeProtocolService") {
+ return "cdn";
+ } else if (prefix == "NamenodeProtocolService") {
+ return "nnp";
+ } else if (prefix == "DatanodeProtocolService") {
+ return "dn";
+ } else {
+ return prefix;
+ }
+}
+
+static void set_service_substitutions(const ServiceDescriptor *service,
+ string_map_t *map)
+{
+ // Service prefix.
+ // example: cnn
+ (*map)["service_prefix"] = shorten_service_prefix(service->name());
+}
+
+/**
+ * Process a dot-separated type name into a protobuf-c type name.
+ *
+ * @param input The input type name.
+ *
+ * @return The protobuf-c type name.
+ */
+static string get_pbc_type_name(string input)
+{
+ char *word, *ptr = NULL;
+ string output, prefix;
+ char line[input.size() + 1];
+ strcpy(line, input.c_str());
+
+ for (word = strtok_r(line, ".", &ptr); word;
+ word = strtok_r(NULL, ".", &ptr)) {
+ //fprintf(stderr, "word = %s\n", word);
+ if (!isupper(word[0])) {
+ word[0] = toupper(word[0]);
+ }
+ output += prefix;
+ prefix = "__";
+ output += word;
+ }
+ return output;
+}
+
+static string replace(string input, char target,
+ const char *replacement)
+{
+ string output;
+
+ for (size_t i = 0; i < input.size(); i++) {
+ if (input[i] == target) {
+ output += replacement;
+ } else {
+ output += input[i];
+ }
+ }
+ return output;
+}
+
+static void set_method_substitutions(const MethodDescriptor *method,
+ string_map_t *map)
+{
+ // Request type, in camelcase.
+ // example: Hadoop__Hdfs__SetReplicationRequestProto
+ (*map)["req_ty_camel"] =
+ get_pbc_type_name(method->input_type()->full_name());
+
+ // Request type, in underscore-separated lowercase.
+ // example: hadoop__hdfs__set_replication_request_proto
+ (*map)["req_ty_uscore"] = camel_case_to_uscore((*map)["req_ty_camel"]);
+
+ // Response type, in camelcase.
+ // example: Hadoop__Hdfs__SetReplicationResponseProto
+ (*map)["resp_ty_camel"] =
+ get_pbc_type_name(method->output_type()->full_name());
+
+ // Response type, in underscore-separated lowercase.
+ // example: hadoop__hdfs__set_replication_response_proto
+ (*map)["resp_ty_uscore"] = camel_case_to_uscore((*map)["resp_ty_camel"]);
+
+ // RPC name, in camelcase.
+ // example: setReplication
+ (*map)["rpc_camel"] = method->name();
+
+ // RPC name, in underscore-separated lowercase.
+ // example: setReplication
+ (*map)["rpc_uscore"] = camel_case_to_uscore((*map)["rpc_camel"]);
+
+ // sync stub function name.
+ // example: cnn_set_replication
+ (*map)["sync_call"] =
+ (*map)["service_prefix"] + "_" + (*map)["rpc_uscore"];
+
+ // async stub function name.
+ // example: cnn_async_set_replication
+ (*map)["async_call"] =
+ (*map)["service_prefix"] + "_async_" + (*map)["rpc_uscore"];
+
+ // async callback adaptor function name.
+ // example: cnn_async_adaptor_set_replication
+ (*map)["async_adaptor"] =
+ (*map)["service_prefix"] + "_async_adaptor_" + (*map)["rpc_uscore"];
+}
+
+class HrpcCodeGenerator
+ : public ::google::protobuf::compiler::CodeGenerator
+{
+public:
+ HrpcCodeGenerator()
+ {
+ }
+
+ ~HrpcCodeGenerator()
+ {
+ }
+
+ bool Generate(const google::protobuf::FileDescriptor *file,
+ const string &, GeneratorContext *gen_context,
+ string *error) const
+ {
+ string_map_t path_map;
+ string ret = set_path_substitutions(file, &path_map);
+ if (!ret.empty()) {
+ *error = ret;
+ return false;
+ }
+ generate_call_header(gen_context, &path_map, file);
+ generate_call_body(gen_context, &path_map, file);
+ return true;
+ }
+
+private:
+ void generate_call_header(GeneratorContext *gen_context,
+ string_map_t *path_map,
+ const FileDescriptor *file) const
+ {
+ shared_ptr<google::protobuf::io::ZeroCopyOutputStream> output(
+ gen_context->Open((*path_map)["path_without_extension"] +
+ ".call.h"));
+ Printer printer(output.get(), '$');
+ printer.Print(APACHE_HEADER);
+ printer.Print(*path_map,
+"\n"
+"// This header file was auto-generated from $path$\n"
+"\n"
+"#ifndef HADOOP_NATIVE_CORE_$path_base$_CALL_H\n"
+"#define HADOOP_NATIVE_CORE_$path_base$_CALL_H\n"
+"\n"
+"#include \"protobuf/$path_base$.pb-c.h\"\n"
+"#include \"protobuf/$path_base$.pb-c.h.s\"\n"
+"\n"
+"struct hadoop_err;\n"
+"struct hrpc_proxy;\n"
+"\n");
+ for (int service_idx = 0; service_idx < file->service_count();
+ ++service_idx) {
+ string_map_t service_map = *path_map;
+ const ServiceDescriptor *service = file->service(service_idx);
+ set_service_substitutions(service, &service_map);
+ for (int method_idx = 0; method_idx < service->method_count();
+ ++method_idx) {
+ const MethodDescriptor *method = service->method(method_idx);
+ string_map_t map = service_map;
+ set_method_substitutions(method, &map);
+ printer.Print(map,
+"struct hadoop_err *$sync_call$(struct hrpc_proxy *proxy,\n"
+" const $req_ty_camel$ *req,\n"
+" $resp_ty_camel$ **resp);\n"
+"\n"
+"void $async_call$(struct hrpc_proxy *proxy,\n"
+" const $req_ty_camel$ *req,\n"
+" void (*cb)($resp_ty_camel$ *,\n"
+" struct hadoop_err *, void *cb_data),\n"
+" void *cb_data);\n"
+"\n");
+ }
+ }
+ printer.Print("#endif\n");
+ }
+
+ void generate_call_body(GeneratorContext *gen_context,
+ string_map_t *path_map,
+ const FileDescriptor *file) const
+ {
+ shared_ptr<google::protobuf::io::ZeroCopyOutputStream> output(
+ gen_context->Open((*path_map)["path_without_extension"] +
+ ".call.c"));
+ Printer printer(output.get(), '$');
+ printer.Print(APACHE_HEADER);
+ printer.Print(*path_map, "\n"
+"#include \"common/hadoop_err.h\"\n"
+"#include \"protobuf/$path_base$.call.h\"\n"
+"#include \"rpc/messenger.h\"\n"
+"#include \"rpc/proxy.h\"\n");
+ printer.Print("\n"
+"#include <errno.h>\n"
+"#include <netinet/in.h>\n"
+"#include <stdio.h>\n"
+"#include <stdlib.h>\n"
+"#include <string.h>\n"
+"#include <uv.h>\n"
+"\n");
+ for (int service_idx = 0; service_idx < file->service_count();
+ ++service_idx) {
+ string_map_t service_map = *path_map;
+ const ServiceDescriptor *service = file->service(service_idx);
+ set_service_substitutions(service, &service_map);
+ for (int method_idx = 0; method_idx < service->method_count();
+ ++method_idx) {
+ const MethodDescriptor *method = service->method(method_idx);
+ string_map_t map = service_map;
+ set_method_substitutions(method, &map);
+ printer.Print(map,
+"struct hadoop_err *$sync_call$(struct hrpc_proxy *proxy,\n"
+" const $req_ty_camel$ *req,\n"
+" $resp_ty_camel$ **out)\n"
+"{\n"
+" struct hadoop_err *err;\n"
+" struct hrpc_sync_ctx *ctx;\n"
+" $resp_ty_camel$ *resp;\n"
+"\n"
+" err = hrpc_proxy_activate(proxy);\n"
+" if (err) {\n"
+" return err;\n"
+" }\n"
+" ctx = hrpc_proxy_alloc_sync_ctx(proxy);\n"
+" if (!ctx) {\n"
+" hrpc_proxy_deactivate(proxy);\n"
+" return hadoop_lerr_alloc(ENOMEM, \"$sync_call$: \"\n"
+" \"failed to allocate sync_ctx\");\n"
+" }\n"
+" hrpc_proxy_start(proxy, \"$rpc_camel$\", req,\n"
+" $req_ty_uscore$__get_packed_size(req),\n"
+" (hrpc_pack_cb_t)$req_ty_uscore$__pack,\n"
+" hrpc_proxy_sync_cb, ctx);\n"
+" if (ctx->err) {\n"
+" hrpc_free_sync_ctx(ctx);\n"
+" return ctx->err;\n"
+" }\n"
+" resp = $resp_ty_uscore$__unpack(NULL, ctx->resp.pb_len,\n"
+" ctx->resp.pb_base);\n"
+" hrpc_free_sync_ctx(ctx);\n"
+" if (!resp) {\n"
+" return hadoop_lerr_alloc(EINVAL,\n"
+" \"$sync_call$: failed to parse response from server\");\n"
+" }\n"
+" *out = resp;\n"
+" return NULL;\n"
+"}\n");
+ printer.Print(map,
+"struct $async_call$_cb_data {\n"
+" void (*cb)($resp_ty_camel$ *,\n"
+" struct hadoop_err *, void *);\n"
+" void *cb_data;\n"
+"};\n"
+"\n"
+"void $async_adaptor$(struct hrpc_response *resp,\n"
+" struct hadoop_err *err, void *cb_data)\n"
+"{\n"
+" struct $async_call$_cb_data *wrapped = cb_data;\n"
+" $resp_ty_camel$ *msg;\n"
+"\n"
+" if (err) {\n"
+" wrapped->cb(NULL, err, wrapped->cb_data);\n"
+" return;\n"
+" }\n"
+" msg = $resp_ty_uscore$__unpack(NULL, resp->pb_len,\n"
+" resp->pb_base);\n"
+" free(resp->base);\n"
+" if (!msg) {\n"
+" wrapped->cb(NULL, hadoop_lerr_alloc(EIO,\n"
+" \"$async_adaptor$: \"\n"
+" \"failed to parse response from server.\"), wrapped->cb_data);\n"
+" return;\n"
+" }\n"
+" wrapped->cb(msg, NULL, wrapped->cb_data);\n"
+"}\n");
+ printer.Print(map,
+"void $async_call$(struct hrpc_proxy *proxy,\n"
+" const $req_ty_camel$ *req,\n"
+" void (*cb)($resp_ty_camel$ *,\n"
+" struct hadoop_err *, void *),\n"
+" void *cb_data)\n"
+"{\n"
+" struct $async_call$_cb_data *wrapped;\n"
+" struct hadoop_err *err;\n"
+"\n"
+" err = hrpc_proxy_activate(proxy);\n"
+" if (err) {\n"
+" cb(NULL, err, cb_data);\n"
+" return;\n"
+" }\n"
+" wrapped = hrpc_proxy_alloc_userdata(proxy, sizeof(*wrapped));\n"
+" if (!wrapped) {\n"
+" hrpc_proxy_deactivate(proxy);\n"
+" cb(NULL, hadoop_lerr_alloc(ENOMEM, \"$async_call$: failed \"\n"
+" \"to allocate sync_ctx\"), cb_data);\n"
+" return;\n"
+" }\n"
+" wrapped->cb = cb;\n"
+" wrapped->cb_data = cb_data;\n"
+" hrpc_proxy_start(proxy, \"$rpc_camel$\", req, \n"
+" $req_ty_uscore$__get_packed_size(req),\n"
+" (hrpc_pack_cb_t)$req_ty_uscore$__pack,\n"
+" $async_adaptor$, wrapped);\n"
+"}\n"
+"\n");
+ }
+ }
+ printer.Print("// vim: ts=4:sw=4:tw=79:et\n");
+ }
+};
+
+int main(int argc, char *argv[])
+{
+ HrpcCodeGenerator generator;
+ return google::protobuf::compiler::PluginMain(argc, argv, &generator);
+}
+
+// vim: ts=4:sw=4:tw=79:et
Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.c Thu May 1 02:10:15 2014
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/hadoop_err.h"
+#include "protobuf/ProtobufRpcEngine.pb-c.h.s"
+#include "rpc/call.h"
+#include "rpc/messenger.h"
+#include "rpc/proxy.h"
+#include "rpc/varint.h"
+
+#include <errno.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <uv.h>
+
+#define proxy_log_warn(proxy, fmt, ...) \
+ fprintf(stderr, "WARN: proxy 0x%p: " fmt, proxy, __VA_ARGS__)
+#define proxy_log_info(proxy, fmt, ...) \
+ fprintf(stderr, "INFO: proxy 0x%p: " fmt, proxy, __VA_ARGS__)
+#define proxy_log_debug(proxy, fmt, ...) \
+ fprintf(stderr, "DEBUG: proxy 0x%p: " fmt, proxy, __VA_ARGS__)
+
+/**
+ * The maximum length that we'll allocate to hold a request to the server.
+ * This number includes the RequestHeader, but not the RpcRequestHeader.
+ */
+#define MAX_SEND_LEN (63 * 1024 * 1024)
+
+struct hrpc_proxy {
+ /**
+ * The messenger that this proxy is associated with.
+ */
+ struct hrpc_messenger *msgr;
+
+ /**
+ * Dynamically allocated string describing the protocol this proxy speaks.
+ */
+ char *protocol;
+
+ /**
+ * The current call.
+ */
+ struct hrpc_call call;
+
+ /**
+ * A memory area which can be used by the current call.
+ *
+ * This will be null if userdata_len is 0.
+ */
+ uint8_t *userdata;
+
+ /**
+ * Length of userdata.
+ */
+ size_t userdata_len;
+};
+
+struct hrpc_proxy_builder {
+ struct hrpc_proxy *proxy;
+};
+
+static const char OOM_ERROR[] = "OOM";
+
+struct hrpc_proxy_builder *hrpc_proxy_builder_alloc(
+ struct hrpc_messenger *msgr)
+{
+ struct hrpc_proxy_builder *bld;
+
+ bld = calloc(1, sizeof(struct hrpc_proxy_builder));
+ if (!bld)
+ return NULL;
+ bld->proxy = calloc(1, sizeof(struct hrpc_proxy));
+ if (!bld->proxy) {
+ free(bld);
+ return NULL;
+ }
+ bld->proxy->msgr = msgr;
+ bld->proxy->call.remote.sin_addr.s_addr = INADDR_ANY;
+
+ return bld;
+}
+
+void hrpc_proxy_builder_free(struct hrpc_proxy_builder *bld)
+{
+ if (!bld)
+ return;
+ free(bld->proxy);
+ free(bld);
+}
+
+void hrpc_proxy_builder_set_protocol(struct hrpc_proxy_builder *bld,
+ const char *protocol)
+{
+ struct hrpc_proxy *proxy = bld->proxy;
+
+ if (proxy->protocol) {
+ if (proxy->protocol != OOM_ERROR) {
+ free(proxy->protocol);
+ }
+ proxy->protocol = NULL;
+ }
+ proxy->protocol = strdup(protocol);
+ if (!proxy->protocol) {
+ proxy->protocol = (char*)OOM_ERROR;
+ }
+}
+
+void hrpc_proxy_builder_set_remote(struct hrpc_proxy_builder *bld,
+ const struct sockaddr_in *remote)
+{
+ bld->proxy->call.remote = *remote;
+}
+
+struct hadoop_err *hrpc_proxy_create(struct hrpc_proxy_builder *bld,
+ struct hrpc_proxy **out)
+{
+ struct hrpc_proxy *proxy;
+
+ proxy = bld->proxy;
+ free(bld);
+ //fprintf(stderr, "proxy = %p, proxy->protocol = %s, proxy->call.cb = %p\n", proxy, proxy->protocol, proxy->call.cb);
+ if (proxy->call.remote.sin_addr.s_addr == INADDR_ANY) {
+ hrpc_proxy_free(proxy);
+ return hadoop_lerr_alloc(EINVAL, "hrpc_proxy_create: you must specify "
+ "a remote.");
+ }
+ if (!proxy->protocol) {
+ hrpc_proxy_free(proxy);
+ return hadoop_lerr_alloc(EINVAL, "hrpc_proxy_create: can't create "
+ "a proxy without a protocol argument.");
+ } else if (proxy->protocol == OOM_ERROR) {
+ // There was an OOM error during hrpc_proxy_builder_set_protocol.
+ hrpc_proxy_free(proxy);
+ return hadoop_lerr_alloc(ENOMEM, "hrpc_proxy_create: OOM error.");
+ }
+ *out = proxy;
+ return NULL;
+}
+
+void hrpc_proxy_free(struct hrpc_proxy *proxy)
+{
+ if (!proxy)
+ return;
+ if (hrpc_call_is_active(&proxy->call)) {
+ proxy_log_warn(proxy, "%s", "hrpc_proxy_free: attempt to free a proxy "
+ "which is currently active!\n");
+ return;
+ }
+ if (proxy->protocol != OOM_ERROR) {
+ free(proxy->protocol);
+ }
+ free(proxy->userdata);
+ free(proxy);
+}
+
+struct hadoop_err *hrpc_proxy_activate(struct hrpc_proxy *proxy)
+{
+ struct hadoop_err *err;
+
+ if (!hrpc_call_activate(&proxy->call)) {
+ err = hadoop_lerr_alloc(EINVAL, "tried to start a call on a "
+ "proxy which was still in use by another call.");
+ proxy_log_warn(proxy, "hrpc_proxy_activate: %s",
+ hadoop_err_msg(err));
+ return err;
+ }
+ return NULL;
+}
+
+void hrpc_proxy_deactivate(struct hrpc_proxy *proxy)
+{
+ hrpc_call_deactivate(&proxy->call);
+}
+
+void *hrpc_proxy_alloc_userdata(struct hrpc_proxy *proxy, size_t size)
+{
+ if (size > proxy->userdata_len) {
+ uint8_t *new_userdata = realloc(proxy->userdata, size);
+ if (!new_userdata) {
+ return NULL;
+ }
+ proxy->userdata = new_userdata;
+ proxy->userdata_len = size;
+ }
+ return proxy->userdata;
+}
+
+struct hrpc_sync_ctx *hrpc_proxy_alloc_sync_ctx(struct hrpc_proxy *proxy)
+{
+ struct hrpc_sync_ctx *ctx =
+ hrpc_proxy_alloc_userdata(proxy, sizeof(struct hrpc_proxy));
+ if (!ctx) {
+ return NULL;
+ }
+ if (uv_sem_init(&ctx->sem, 0)) {
+ return NULL;
+ }
+ memset(&ctx, 0, sizeof(ctx));
+ return ctx;
+}
+
+void hrpc_free_sync_ctx(struct hrpc_sync_ctx *ctx)
+{
+ free(ctx->resp.base);
+ uv_sem_destroy(&ctx->sem);
+}
+
+void hrpc_proxy_sync_cb(struct hrpc_response *resp, struct hadoop_err *err,
+ void *cb_data)
+{
+ struct hrpc_sync_ctx *ctx = cb_data;
+ ctx->resp = *resp;
+ ctx->err = err;
+ uv_sem_post(&ctx->sem);
+}
+
+void hrpc_proxy_start(struct hrpc_proxy *proxy,
+ const char *method, const void *payload, int payload_packed_len,
+ hrpc_pack_cb_t payload_pack_cb,
+ hrpc_raw_cb_t cb, void *cb_data)
+{
+ RequestHeaderProto req_header = REQUEST_HEADER_PROTO__INIT;
+ uint64_t buf_len;
+ int32_t req_header_len, off = 0;
+ uint8_t *buf;
+ struct hrpc_call *call = &proxy->call;
+
+ call->cb = cb;
+ call->cb_data = cb_data;
+ call->protocol = strdup(proxy->protocol);
+ if (!call->protocol) {
+ hrpc_call_deliver_err(call, hadoop_lerr_alloc(ENOMEM,
+ "hrpc_proxy_start_internal: out of memory"));
+ return;
+ }
+
+ req_header.methodname = (char*)method;
+ req_header.declaringclassprotocolname = proxy->protocol;
+ req_header.clientprotocolversion = 1;
+ req_header_len = request_header_proto__get_packed_size(&req_header);
+ buf_len = varint32_size(req_header_len);
+ buf_len += req_header_len;
+ buf_len += varint32_size(payload_packed_len);
+ buf_len += payload_packed_len;
+ if (buf_len >= MAX_SEND_LEN) {
+ hrpc_call_deliver_err(call,
+ hadoop_lerr_alloc(EINVAL, "hrpc_proxy_setup_header: the "
+ "request length is too long at %"PRId64 " bytes. The "
+ "maximum we will send is %d bytes.", buf_len, MAX_SEND_LEN));
+ return;
+ }
+ buf = malloc((size_t)buf_len);
+ if (!buf) {
+ hrpc_call_deliver_err(call,
+ hadoop_lerr_alloc(ENOMEM, "hrpc_proxy_setup_header: "
+ "failed to allocate a buffer of length %"PRId64" bytes.",
+ buf_len));
+ return;
+ }
+ varint32_encode(req_header_len, buf, buf_len, &off);
+ request_header_proto__pack(&req_header, buf + off);
+ off += req_header_len;
+ varint32_encode(payload_packed_len, buf, buf_len, &off);
+ payload_pack_cb(payload, buf + off);
+
+ call->payload = uv_buf_init((char*)buf, buf_len);
+ hrpc_messenger_start_outbound(proxy->msgr, &proxy->call);
+}
+
+// vim: ts=4:sw=4:tw=79:et
Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/proxy.h Thu May 1 02:10:15 2014
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HADOOP_CORE_RPC_PROXY_H
+#define HADOOP_CORE_RPC_PROXY_H
+
+#include <stdint.h> /* for uint8_t */
+#include <uv.h> /* for uv_buf_t */
+
+struct hadoop_err;
+struct hrpc_messenger;
+struct hrpc_proxy;
+struct hrpc_proxy_builder;
+
+struct hrpc_response {
+ uint8_t *pb_base;
+ int pb_len;
+ void *base;
+};
+
+struct hrpc_sync_ctx {
+ uv_sem_t sem;
+ struct hadoop_err *err;
+ struct hrpc_response resp;
+};
+
+typedef void (*hrpc_raw_cb_t)(struct hrpc_response *,
+ struct hadoop_err *, void *);
+
+typedef size_t (*hrpc_pack_cb_t)(const void *, uint8_t *);
+
+/**
+ * Allocate a Hadoop proxy builder.
+ *
+ * @param msgr The Hadoop messenger that this proxy will be associated
+ * with.
+ * @return A Hadoop proxy builder, or NULL on OOM.
+ */
+struct hrpc_proxy_builder *hrpc_proxy_builder_alloc(
+ struct hrpc_messenger *msgr);
+
+/**
+ * Free a Hadoop proxy builder.
+ *
+ * @param bld The Hadoop proxy builder to free.
+ */
+void hrpc_proxy_builder_free(struct hrpc_proxy_builder *bld);
+
+/**
+ * Set the protocol used by a proxy.
+ *
+ * @param bld The Hadoop proxy builder.
+ * @param proto The protocol string to use. Will be deep-copied.
+ */
+void hrpc_proxy_builder_set_protocol(struct hrpc_proxy_builder *bld,
+ const char *proto);
+
+/**
+ * Set the remote that the proxy should connect to.
+ *
+ * @param bld The Hadoop proxy builder.
+ * @param remote The remote. Will be deep-copied.
+ */
+void hrpc_proxy_builder_set_remote(struct hrpc_proxy_builder *bld,
+ const struct sockaddr_in *remote);
+
+/**
+ * Create a Hadoop proxy
+ *
+ * @param bld The Hadoop proxy builder to use.
+ * The builder will be freed, even on failure.
+ * @param out (out param) On success, the Hadoop proxy.
+ *
+ * @return On success, NULL. On error, the error message.
+ */
+struct hadoop_err *hrpc_proxy_create(struct hrpc_proxy_builder *bld,
+ struct hrpc_proxy **out);
+
+/**
+ * Free a Hadoop proxy.
+ *
+ * @param proxy The Hadoop proxy to free. You must not attempt to free a
+ * proxy with a call in progress.
+ */
+void hrpc_proxy_free(struct hrpc_proxy *proxy);
+
+/**
+ * Mark the proxy as active.
+ *
+ * @param proxy The proxy
+ *
+ * @return NULL on success. If the proxy is already
+ * active, an error will be returned.
+ */
+struct hadoop_err *hrpc_proxy_activate(struct hrpc_proxy *proxy);
+
+/**
+ * Mark the proxy as inactive.
+ *
+ * This function should not be called after hrpc_proxy_start, since a proxy
+ * that has been started will mark itself as inactive when appropriate.
+ *
+ * @param proxy The proxy.
+ */
+void hrpc_proxy_deactivate(struct hrpc_proxy *proxy);
+
+/**
+ * Allocate some data in the proxy's userdata area.
+ *
+ * This will overwrite anything previously allocated in the proxy's userdata
+ * area. It is not necessary to free this memory later; it will be freed when
+ * the proxy is freed.
+ *
+ * @param proxy The proxy
+ *
+ * @return NULL on OOM; a pointer to the userdata
+ * otherwise.
+ */
+void *hrpc_proxy_alloc_userdata(struct hrpc_proxy *proxy, size_t size);
+
+/**
+ * Allocate a sync context from a proxy via hrpc_proxy_alloc_userdata.
+ *
+ * @param proxy The proxy
+ *
+ * @return NULL on OOM; the sync context otherwise.
+ */
+struct hrpc_sync_ctx *hrpc_proxy_alloc_sync_ctx(struct hrpc_proxy *proxy);
+
+/**
+ * Free a sync context allocated from a proxy.
+ *
+ * @param proxy The sync context.
+ */
+void hrpc_free_sync_ctx(struct hrpc_sync_ctx *ctx);
+
+/**
+ * A callback which synchronous RPCs can use.
+ */
+void hrpc_proxy_sync_cb(struct hrpc_response *resp, struct hadoop_err *err,
+ void *cb_data);
+
+/**
+ * Start an outgoing RPC from the proxy.
+ *
+ * This method will return after queuing up the RPC to be sent.
+ *
+ * @param proxy The Hadoop proxy to use. A single proxy can
+ * only make one call at once.
+ * @param method The method we're calling.
+ * @param payload The protobuf message we're sending.
+ * @param payload_packed_len Length of payload when serialized.
+ * @param payload_pack_cb Function used to pack the payload.
+ * @param cb Callback invoked when the message is done.
+ * @param cb_data Data provided along with cb.
+ */
+void hrpc_proxy_start(struct hrpc_proxy *proxy,
+ const char *method, const void *payload, int payload_packed_len,
+ hrpc_pack_cb_t payload_pack_cb,
+ hrpc_raw_cb_t cb, void *cb_data);
+
+#endif
+
+// vim: ts=4:sw=4:tw=79:et
Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.c Thu May 1 02:10:15 2014
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/hadoop_err.h"
+#include "common/net.h"
+#include "common/queue.h"
+#include "common/tree.h"
+#include "rpc/call.h"
+#include "rpc/messenger.h"
+#include "rpc/reactor.h"
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <uv.h>
+
+#define reactor_log_warn(reactor, fmt, ...) \
+ fprintf(stderr, "WARN: reactor %p: " fmt, reactor, __VA_ARGS__)
+#define reactor_log_info(msgr, fmt, ...) \
+ fprintf(stderr, "INFO: reactor %p: " fmt, reactor, __VA_ARGS__)
+#define reactor_log_debug(msgr, fmt, ...) \
+ fprintf(stderr, "DBUG: reactor %p: " fmt, reactor, __VA_ARGS__)
+
+RB_GENERATE(hrpc_conns, hrpc_conn, entry, hrpc_conn_compare);
+
+static void reactor_thread_run(void *arg)
+{
+ struct hrpc_reactor *reactor = arg;
+ struct hrpc_conn *conn, *conn_tmp;
+
+ reactor_log_debug(reactor, "%s", "reactor thread starting.\n");
+ uv_run(&reactor->loop, UV_RUN_DEFAULT);
+ reactor_log_debug(reactor, "%s", "reactor thread terminating.\n");
+ RB_FOREACH_SAFE(conn, hrpc_conns, &reactor->conns, conn_tmp) {
+ hrpc_conn_destroy(conn, hadoop_lerr_alloc(ESHUTDOWN,
+ "hrpc_reactor_start_outbound: the reactor is being shut down."));
+ }
+}
+
+/**
+ * Find an idle connection with a given address in the idle connection map.
+ *
+ * @param reactor The reactor.
+ * @param remote The remote address to find.
+ */
+static struct hrpc_conn *reuse_idle_conn(struct hrpc_reactor *reactor,
+ const struct sockaddr_in *remote, const struct hrpc_call *call)
+{
+ struct hrpc_conn *conn;
+ struct hrpc_conn exemplar;
+
+ memset(&exemplar, 0, sizeof(exemplar));
+ exemplar.remote = *remote;
+ exemplar.protocol = call->protocol;
+ conn = RB_NFIND(hrpc_conns, &reactor->conns, &exemplar);
+ if (!conn)
+ return NULL;
+ if (hrpc_conn_usable(conn, remote, call->protocol)) {
+ if (conn->writer.state == HRPC_CONN_WRITE_IDLE) {
+ RB_REMOVE(hrpc_conns, &reactor->conns, conn);
+ return conn;
+ }
+ }
+ return NULL;
+}
+
+static void reactor_begin_shutdown(struct hrpc_reactor *reactor,
+ struct hrpc_calls *pending_calls)
+{
+ struct hrpc_call *call;
+
+ reactor_log_debug(reactor, "%s", "reactor_begin_shutdown\n");
+ STAILQ_FOREACH(call, pending_calls, entry) {
+ hrpc_call_deliver_err(call, hadoop_lerr_alloc(ESHUTDOWN,
+ "hrpc_reactor_start_outbound: the reactor is being shut down."));
+ }
+ // Note: other callbacks may still run after the libuv loop has been
+ // stopped. But we won't block for I/O after this point.
+ uv_stop(&reactor->loop);
+}
+
+static void reactor_async_start_outbound(struct hrpc_reactor *reactor,
+ struct hrpc_call *call)
+{
+ char remote_str[64] = { 0 };
+ struct hrpc_conn *conn;
+ struct hadoop_err *err;
+
+ conn = reuse_idle_conn(reactor, &call->remote, call);
+ if (conn) {
+ reactor_log_debug(reactor, "start_outbound(remote=%s) assigning to "
+ "connection %p\n",
+ net_ipv4_name(&call->remote, remote_str, sizeof(remote_str)), conn);
+ hrpc_conn_start_outbound(conn, call);
+ } else {
+ err = hrpc_conn_create_outbound(reactor, call, &conn);
+ if (err) {
+ reactor_log_warn(reactor, "reactor_async_start_outbound("
+ "remote=%s) got error %s\n",
+ net_ipv4_name(&call->remote, remote_str, sizeof(remote_str)),
+ hadoop_err_msg(err));
+ hrpc_call_deliver_err(call, err);
+ return;
+ }
+ reactor_log_debug(reactor, "start_outbound(remote=%s) created new "
+ "connection %p\n",
+ net_ipv4_name(&call->remote, remote_str, sizeof(remote_str)), conn);
+ }
+ // Add or re-add the connection to the reactor's tree.
+ RB_INSERT(hrpc_conns, &reactor->conns, conn);
+}
+
+static void reactor_async_cb(uv_async_t *handle)
+{
+ struct hrpc_reactor *reactor = handle->data;
+ int shutdown;
+ struct hrpc_calls pending_calls = STAILQ_HEAD_INITIALIZER(pending_calls);
+ struct hrpc_call *call;
+
+ uv_mutex_lock(&reactor->inbox.lock);
+ shutdown = reactor->inbox.shutdown;
+ STAILQ_SWAP(&reactor->inbox.pending_calls, &pending_calls,
+ hrpc_call);
+ uv_mutex_unlock(&reactor->inbox.lock);
+
+ if (shutdown) {
+ reactor_begin_shutdown(reactor, &pending_calls);
+ return;
+ }
+ STAILQ_FOREACH(call, &pending_calls, entry) {
+ reactor_async_start_outbound(reactor, call);
+ }
+}
+
+void reactor_remove_conn(struct hrpc_reactor *reactor, struct hrpc_conn *conn)
+{
+ struct hrpc_conn *removed;
+
+ removed = RB_REMOVE(hrpc_conns, &reactor->conns, conn);
+ if (!removed) {
+ reactor_log_warn(reactor, "reactor_remove_conn("
+ "conn=%p): no such connection found.\n", conn);
+ }
+}
+
+struct hadoop_err *hrpc_reactor_create(struct hrpc_reactor **out)
+{
+ struct hrpc_reactor *reactor = NULL;
+ struct hadoop_err *err = NULL;
+ int res;
+
+ reactor = calloc(1, sizeof(struct hrpc_reactor));
+ if (!reactor) {
+ err = hadoop_lerr_alloc(ENOMEM, "hrpc_reactor_create: OOM while allocating "
+ "reactor structure.");
+ goto error_free_reactor;
+ }
+ if (uv_mutex_init(&reactor->inbox.lock) < 0) {
+ err = hadoop_lerr_alloc(ENOMEM, "hrpc_reactor_create: failed to init "
+ "mutex.");
+ goto error_free_reactor;
+ }
+ RB_INIT(&reactor->conns);
+ STAILQ_INIT(&reactor->inbox.pending_calls);
+ if (uv_loop_init(&reactor->loop)) {
+ err = hadoop_lerr_alloc(ENOMEM, "hrpc_reactor_create: uv_loop_init "
+ "failed.");
+ goto error_free_mutex;
+ }
+ res = uv_async_init(&reactor->loop, &reactor->inbox.notifier,
+ reactor_async_cb);
+ if (res) {
+ err = hadoop_uverr_alloc(res, "hrpc_reactor_create: "
+ "uv_async_init failed.");
+ goto error_close_loop;
+ }
+ reactor->inbox.notifier.data = reactor;
+ res = uv_thread_create(&reactor->thread, reactor_thread_run, reactor);
+ if (res) {
+ err = hadoop_lerr_alloc(ENOMEM, "hrpc_reactor_create: "
+ "uv_thread_create failed.");
+ goto error_free_async;
+ }
+ *out = reactor;
+ return NULL;
+
+error_free_async:
+ uv_close((uv_handle_t*)&reactor->inbox.notifier, NULL);
+error_close_loop:
+ uv_loop_close(&reactor->loop);
+error_free_mutex:
+ uv_mutex_destroy(&reactor->inbox.lock);
+error_free_reactor:
+ free(reactor);
+ return err;
+}
+
+void hrpc_reactor_shutdown(struct hrpc_reactor *reactor)
+{
+ reactor_log_debug(reactor, "%s", "hrpc_reactor_shutdown\n");
+ uv_mutex_lock(&reactor->inbox.lock);
+ reactor->inbox.shutdown = 1;
+ uv_mutex_unlock(&reactor->inbox.lock);
+ uv_async_send(&reactor->inbox.notifier);
+ uv_thread_join(&reactor->thread);
+}
+
+void hrpc_reactor_free(struct hrpc_reactor *reactor)
+{
+ reactor_log_debug(reactor, "%s", "hrpc_reactor_free\n");
+ uv_loop_close(&reactor->loop);
+ uv_mutex_destroy(&reactor->inbox.lock);
+ free(reactor);
+}
+
+void hrpc_reactor_start_outbound(struct hrpc_reactor *reactor,
+ struct hrpc_call *call)
+{
+ int shutdown = 0;
+
+ uv_mutex_lock(&reactor->inbox.lock);
+ shutdown = reactor->inbox.shutdown;
+ if (!shutdown) {
+ STAILQ_INSERT_TAIL(&reactor->inbox.pending_calls, call, entry);
+ }
+ uv_mutex_unlock(&reactor->inbox.lock);
+ if (shutdown) {
+ hrpc_call_deliver_err(call, hadoop_lerr_alloc(ESHUTDOWN,
+ "hrpc_reactor_start_outbound: can't start call because the "
+ "reactor has been shut down."));
+ } else {
+ uv_async_send(&reactor->inbox.notifier);
+ }
+}
+
+// vim: ts=4:sw=4:tw=79:et
Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.h?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.h (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/reactor.h Thu May 1 02:10:15 2014
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HADOOP_CORE_RPC_REACTOR_H
+#define HADOOP_CORE_RPC_REACTOR_H
+
+#include "common/tree.h"
+#include "rpc/call.h" // for hrpc_call
+#include "rpc/conn.h" // for hrpc_conn_compare
+
+#include <stdint.h>
+#include <uv.h>
+
+/**
+ * The Hadoop reactor thread header.
+ *
+ * Note: this is an internal header which users of the RPC layer don't need to
+ * include.
+ */
+
+RB_HEAD(hrpc_conns, hrpc_conn);
+RB_PROTOTYPE(hrpc_conns, hrpc_conn, entry, hrpc_conn_compare);
+
+struct hrpc_reactor_inbox {
+ /**
+ * Lock which protects the inbox.
+ */
+ uv_mutex_t lock;
+
+ /**
+ * Non-zero if the reactor should shut down.
+ */
+ int shutdown;
+
+ /**
+ * Calls which we have been asked to make.
+ */
+ struct hrpc_calls pending_calls;
+
+ /**
+ * Used to trigger the inbox callback on the reactor thread.
+ *
+ * You do not need the inbox lock to send an async signal.
+ */
+ uv_async_t notifier;
+};
+
+/**
+ * A Hadoop RPC reactor thread.
+ *
+ * Each reactor thread uses libuv to send and receive on multiple TCP sockets
+ * asynchronously.
+ *
+ * With the exception of the inbox, everything in this structure must be
+ * accessed ONLY from the reactor thread. Nothing is safe to read or write
+ * from another thread except the inbox.
+ */
+struct hrpc_reactor {
+ /**
+ * The inbox for incoming work for this reactor thread.
+ */
+ struct hrpc_reactor_inbox inbox;
+
+ /**
+ * A red-black tree of connections. This makes it possible to find a
+ * connection to a given IP address quickly.
+ *
+ * We may have multiple connections for the same IP:port combination.
+ */
+ struct hrpc_conns conns;
+
+ /**
+ * The libuv loop.
+ */
+ uv_loop_t loop;
+
+ /**
+ * The libuv timer. Used to expire connections after a timeout has
+ * elapsed.
+ */
+ uv_timer_t timer;
+
+ /**
+ * The reactor thread. All reactor callbacks are made from this context.
+ */
+ uv_thread_t thread;
+};
+
+/**
+ * Remove a connection from the reactor.
+ *
+ * @param reactor The reactor.
+ * @param conn The connection.
+ */
+void reactor_remove_conn(struct hrpc_reactor *reactor, struct hrpc_conn *conn);
+
+/**
+ * Create the reactor thread.
+ */
+struct hadoop_err *hrpc_reactor_create(struct hrpc_reactor **out);
+
+/**
+ * Shut down the reactor thread and wait for it to terminate.
+ *
+ * All pending calls will get timeout errors.
+ */
+void hrpc_reactor_shutdown(struct hrpc_reactor *reactor);
+
+/**
+ * Free the reactor.
+ */
+void hrpc_reactor_free(struct hrpc_reactor *reactor);
+
+/**
+ * Start an outbound transfer.
+ *
+ * @param reactor The reactor.
+ * @param conn The connection. This connection must be either new, or
+ * All pending calls will get timeout errors.
+ */
+void hrpc_reactor_start_outbound(struct hrpc_reactor *reactor,
+ struct hrpc_call *call);
+
+#endif
+
+// vim: ts=4:sw=4:tw=79:et
Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/shorten.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/shorten.c?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/shorten.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/shorten.c Thu May 1 02:10:15 2014
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ctype.h>
+#include <errno.h>
+#include <limits.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define MAX_LINE_LEN 16384
+#define IFNDEF "#ifndef"
+#define IFNDEF_LEN (sizeof(IFNDEF) - 1)
+
+enum parse_state {
+ PARSE_IFNDEF = 0,
+ PARSE_STRUCTS_AND_ENUMS,
+ PARSE_MESSAGES,
+ PARSE_DONE,
+};
+
+#define PROTOBUF_C_END_DECLS_STR "PROTOBUF_C_END_DECLS"
+
+static const char *PARSE_STATE_TERMINATORS[] = {
+ "PROTOBUF_C_BEGIN_DECLS",
+ "/* --- messages --- */",
+ PROTOBUF_C_END_DECLS_STR
+};
+
+static const char *MESSAGE_SUFFIXES[] = {
+ "__INIT",
+ "__get_packed_size",
+ "__pack",
+ "__pack_to_buffer",
+ "__unpack",
+ "__free_unpacked",
+};
+
+#define NUM_MESSAGE_SUFFIXES \
+ (sizeof(MESSAGE_SUFFIXES) / sizeof(MESSAGE_SUFFIXES[0]))
+
+static void add_word(char ***words, size_t *num_words, const char *word)
+{
+ size_t new_num_words;
+ char *nword;
+ char **nwords;
+
+ new_num_words = *num_words + 1;
+ nword = strdup(word);
+ if (!nword) {
+ fprintf(stderr, "failed to allocate memory for %Zd words\n",
+ new_num_words);
+ exit(1);
+ }
+ nwords = realloc(*words, sizeof(char **) * new_num_words);
+ if (!nwords) {
+ fprintf(stderr, "failed to allocate memory for %Zd words\n",
+ new_num_words);
+ free(nword);
+ exit(1);
+ }
+ nwords[new_num_words - 1] = nword;
+ *num_words = new_num_words;
+ *words = nwords;
+}
+
+static int has_suffix(const char *str, const char *suffix)
+{
+ int str_len = strlen(str);
+ int suffix_len = strlen(suffix);
+ if (str_len < suffix_len)
+ return 0;
+ return strcmp(str + str_len - suffix_len, suffix) == 0;
+}
+
+static int has_message_suffix(const char *word)
+{
+ size_t i = 0;
+
+ for (i = 0; i < NUM_MESSAGE_SUFFIXES; i++) {
+ if (has_suffix(word, MESSAGE_SUFFIXES[i]))
+ return 1;
+ }
+ return 0;
+}
+
+static void add_words(char ***words, size_t *num_words,
+ char *line, enum parse_state state)
+{
+ char *word, *ptr = NULL;
+
+ for (word = strtok_r(line, " ", &ptr); word;
+ word = strtok_r(NULL, " ", &ptr)) {
+ if (word[0] == '_')
+ continue;
+ if (!strstr(word, "__"))
+ continue;
+ if ((state == PARSE_MESSAGES) && (!has_message_suffix(word)))
+ continue;
+ add_word(words, num_words, word);
+ }
+}
+
+static int compare_strings(const void *a, const void *b)
+{
+ return strcmp(*(char * const*)a, *(char * const*)b);
+}
+
+static char *get_last_occurrence(char *haystack, const char *needle)
+{
+ char *val = NULL, *nval;
+ int needle_len = strlen(needle);
+
+ while (1) {
+ nval = strstr(haystack, needle);
+ if (!nval)
+ return val;
+ val = nval + needle_len;
+ haystack = nval + needle_len;
+ }
+}
+
+static char *get_second_last_occurrence(char *haystack, const char *needle)
+{
+ char *pval = NULL, *val = NULL, *nval;
+ int needle_len = strlen(needle);
+
+ while (1) {
+ nval = strstr(haystack, needle);
+ if (!nval)
+ return pval;
+ pval = val;
+ val = nval + needle_len;
+ haystack = nval + needle_len;
+ }
+}
+
+static int has_camel_case(const char *str)
+{
+ int i, prev_lower = 0;
+
+ for (i = 0; str[i]; i++) {
+ if (isupper(str[i])) {
+ if (prev_lower)
+ return 1;
+ } else if (islower(str[i])) {
+ prev_lower = 1;
+ }
+ }
+ return 0;
+}
+
+static char *get_shortened_occurrence(char *str)
+{
+ char *last, *slast;
+
+ last = get_last_occurrence(str, "__");
+ slast = get_second_last_occurrence(str, "__");
+
+ last = get_last_occurrence(str, "__");
+ if (!last)
+ return NULL;
+ if ((!has_message_suffix(str)) &&
+ (strstr(last, "_") || has_camel_case(last))) {
+ // Heuristic: if the last bit of the string after the double underscore
+ // has another underscore inside, or has mixed case, we assume it's
+ // complex enough to use on its own.
+ return last;
+ }
+ // Otherwise, we grab the part of the string after the second-last double
+ // underscore.
+ slast = get_second_last_occurrence(str, "__");
+ return slast ? slast : last;
+}
+
+static int output_shortening_macros(char **words, size_t num_words,
+ const char *out_path, FILE *out)
+{
+ size_t i;
+ const char *prev_word = "";
+ const char *shortened;
+
+ for (i = 0; i < num_words; i++) {
+ if (strcmp(prev_word, words[i]) == 0) {
+ // skip words we've already done
+ continue;
+ }
+ prev_word = words[i];
+ shortened = get_shortened_occurrence(words[i]);
+ if (shortened) {
+ if (fprintf(out, "#define %s %s\n", shortened, words[i]) < 0) {
+ fprintf(stderr, "error writing to %s\n", out_path);
+ return EIO;
+ }
+ }
+ }
+ return 0;
+}
+
+/**
+ * Remove newlines from a buffer.
+ *
+ * @param line The buffer.
+ */
+static void chomp(char *line)
+{
+ while (1) {
+ int len = strlen(line);
+ if (len == 0) {
+ return;
+ }
+ if (line[len - 1] != '\n') {
+ return;
+ }
+ line[len - 1] = '\0';
+ }
+}
+
+/**
+ * Remove most non-alphanumeric characters from a buffer.
+ *
+ * @param line The buffer.
+ */
+static void asciify(char *line)
+{
+ int i;
+
+ for (i = 0; line[i]; i++) {
+ if ((!isalnum(line[i])) && (line[i] != '_') && (line[i] != '#')) {
+ line[i] = ' ';
+ }
+ }
+}
+
+static const char *base_name(const char *path)
+{
+ const char *base;
+
+ base = rindex(path, '/');
+ if (!base)
+ return NULL;
+ return base + 1;
+}
+
+static int process_file_lines(const char *in_path, const char *out_path,
+ FILE *in, FILE *out, char ***words, size_t *num_words)
+{
+ int ret;
+ char header_guard[MAX_LINE_LEN] = { 0 };
+ char line[MAX_LINE_LEN] = { 0 };
+ const char *base = base_name(in_path);
+ enum parse_state state = PARSE_IFNDEF;
+
+ if (!base) {
+ fprintf(stderr, "failed to get basename of %s\n", in_path);
+ return EINVAL;
+ }
+ while (1) {
+ if (!fgets(line, MAX_LINE_LEN - 1, in)) {
+ if (ferror(in)) {
+ ret = errno;
+ fprintf(stderr, "error reading %s: %s (%d)\n",
+ in_path, strerror(ret), ret);
+ return ret;
+ }
+ fprintf(stderr, "error reading %s: didn't find "
+ PROTOBUF_C_END_DECLS_STR, in_path);
+ return EINVAL;
+ }
+ if (strstr(line, PARSE_STATE_TERMINATORS[state])) {
+ state = state + 1;
+ if (state == PARSE_DONE) {
+ break;
+ }
+ continue;
+ }
+ chomp(line);
+ asciify(line);
+ switch (state) {
+ case PARSE_IFNDEF:
+ if (strncmp(line, IFNDEF, IFNDEF_LEN) == 0) {
+ strcpy(header_guard, line + IFNDEF_LEN + 1);
+ }
+ break;
+ default:
+ add_words(words, num_words, line, state);
+ break;
+ }
+ }
+ if (!header_guard[0]) {
+ fprintf(stderr, "failed to find header guard for %s\n", in_path);
+ return EINVAL;
+ }
+ qsort(*words, *num_words, sizeof(char*), compare_strings);
+ fprintf(out, "#ifndef %s_S\n", header_guard);
+ fprintf(out, "#define %s_S\n\n", header_guard);
+ fprintf(out, "#include \"%s\"\n\n", base);
+ ret = output_shortening_macros(*words, *num_words, out_path, out);
+ if (ret)
+ return ret;
+ fprintf(out, "\n#endif\n");
+ return 0;
+}
+
+static int process_file(const char *in_path)
+{
+ char out_path[PATH_MAX] = { 0 };
+ int res, ret = 0;
+ FILE *in = NULL, *out = NULL;
+ char **words = NULL;
+ size_t num_words = 0;
+ size_t i;
+
+ res = snprintf(out_path, PATH_MAX, "%s.s", in_path);
+ if ((res < 0) || (res >= PATH_MAX)) {
+ fprintf(stderr, "snprintf error for %s\n", in_path);
+ ret = EINVAL;
+ goto done;
+ }
+ in = fopen(in_path, "r");
+ if (!in) {
+ ret = errno;
+ fprintf(stderr, "failed to open %s for read: error %s (%d)\n",
+ in_path, strerror(ret), ret);
+ goto done;
+ }
+ out = fopen(out_path, "w");
+ if (!out) {
+ ret = errno;
+ fprintf(stderr, "failed to open %s for write: error %s (%d)\n",
+ out_path, strerror(ret), ret);
+ goto done;
+ }
+ ret = process_file_lines(in_path, out_path, in, out, &words, &num_words);
+ for (i = 0; i < num_words; i++) {
+ free(words[i]);
+ }
+ free(words);
+ if (ret) {
+ goto done;
+ }
+ if (fclose(out)) {
+ ret = errno;
+ perror("fclose error");
+ }
+ out = NULL;
+done:
+ if (in) {
+ fclose(in);
+ }
+ if (out) {
+ fclose(out);
+ }
+ return ret;
+}
+
+static void usage(void)
+{
+ fprintf(stderr,
+"shorten: creates header files with shorter definitions for protobuf-c\n"
+"definitions. Output files will be written to the same paths as input\n"
+"files, but with a .s extension tacked on.\n"
+"\n"
+"usage: shorten [paths-to-headers]\n");
+}
+
+int main(int argc, char **argv)
+{
+ int i, ret, nproc = 0, rval = EXIT_SUCCESS;
+
+ if (argc < 2) {
+ usage();
+ exit(EXIT_SUCCESS);
+ }
+ for (i = 1; i < argc; i++) {
+ ret = process_file(argv[i]);
+ if (ret) {
+ fprintf(stderr, "error processing %s\n", argv[i]);
+ rval = EXIT_FAILURE;
+ } else {
+ nproc++;
+ }
+ }
+ //fprintf(stderr, "successfully processed %d files\n", nproc);
+ return rval;
+}
+
+// vim: ts=4:sw=4:tw=79:et
Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint-unit.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint-unit.c?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint-unit.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint-unit.c Thu May 1 02:10:15 2014
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/test.h"
+#include "rpc/varint.h"
+
+#include <errno.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+static int varint32_round_trip_test(int val)
+{
+ int size;
+ int32_t oval = 0;
+ int32_t off = 0;
+ uint8_t buf[16] = { 0 };
+ const size_t buf_len = sizeof(buf)/sizeof(buf[0]);
+
+ size = varint32_size(val);
+ EXPECT_INT_ZERO(varint32_encode(val, buf, buf_len, &off));
+ EXPECT_INT_EQ(size, off);
+ off = 0;
+ EXPECT_INT_ZERO(varint32_decode(&oval, buf, buf_len, &off));
+ EXPECT_INT_EQ(size, off);
+ EXPECT_INT_EQ(val, oval);
+ return 0;
+}
+
+static int be32_round_trip_test(int val)
+{
+ int32_t oval;
+ uint8_t buf[sizeof(int32_t)] = { 0 };
+
+ be32_encode(val, buf);
+ oval = be32_decode(buf);
+ EXPECT_INT_EQ(val, oval);
+ return 0;
+}
+
+static int round_trip_test(int var)
+{
+ EXPECT_INT_ZERO(varint32_round_trip_test(var));
+ EXPECT_INT_ZERO(be32_round_trip_test(var));
+ return 0;
+}
+
+int main()
+{
+ round_trip_test(0);
+ round_trip_test(123);
+ round_trip_test(6578);
+ round_trip_test(0x7fffffff);
+ round_trip_test(-15);
+ round_trip_test(-1);
+ return EXIT_SUCCESS;
+}
+
+// vim: ts=4:sw=4:tw=79:et
Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint.c?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint.c (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint.c Thu May 1 02:10:15 2014
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "varint.h"
+
+#include <errno.h>
+#include <stdint.h>
+
+int varint32_size(int32_t val)
+{
+ if ((val & (0xffffffff << 7)) == 0) return 1;
+ if ((val & (0xffffffff << 14)) == 0) return 2;
+ if ((val & (0xffffffff << 21)) == 0) return 3;
+ if ((val & (0xffffffff << 28)) == 0) return 4;
+ return 5;
+}
+
+int varint32_encode(int32_t val, uint8_t *buf, int32_t len, int32_t *off)
+{
+ int32_t o = *off;
+ uint32_t var = (uint32_t)val;
+
+ while (1) {
+ if (o == len)
+ return -ENOBUFS;
+ if (var <= 127) {
+ buf[o++] = var;
+ break;
+ }
+ buf[o++] = 0x80 | (var & 0x7f);
+ var >>= 7;
+ }
+ *off = o;
+ return 0;
+}
+
+int varint32_decode(int32_t *val, const uint8_t *buf, int32_t len,
+ int32_t *off)
+{
+ uint32_t accum = 0;
+ int shift = 0, o = *off, idx = 0;
+ uint8_t b;
+
+ do {
+ if (o == len)
+ return -ENOBUFS;
+ if (idx++ > 5)
+ return -EINVAL;
+ b = buf[o++];
+ accum += (b & 0x7f) << shift;
+ shift += 7;
+ } while(b & 0x80);
+ *val = (int32_t)accum;
+ *off = o;
+ return 0;
+}
+
+void be32_encode(int32_t val, uint8_t *buf)
+{
+ buf[0] = (val >> 24) & 0xff;
+ buf[1] = (val >> 16) & 0xff;
+ buf[2] = (val >> 8) & 0xff;
+ buf[3] = (val >> 0) & 0xff;
+}
+
+int32_t be32_decode(const uint8_t *buf)
+{
+ int32_t v = 0;
+ v |= (buf[0] << 24);
+ v |= (buf[1] << 16);
+ v |= (buf[2] << 8);
+ v |= (buf[3] << 0);
+ return v;
+}
+
+// vim: ts=4:sw=4:tw=79:et
Added: hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint.h?rev=1591534&view=auto
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint.h (added)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/rpc/varint.h Thu May 1 02:10:15 2014
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HADOOP_CORE_VARINT_H
+#define HADOOP_CORE_VARINT_H
+
+#include <stdint.h>
+
+/**
+ * Compute the size of a varint.
+ *
+ * @param val The value to encode.
+ *
+ * @return The number of bytes it will take to encode.
+ */
+int varint32_size(int32_t val);
+
+/**
+ * Encode a 32-bit varint.
+ *
+ * @param val The value to encode.
+ * @param buf The buffer to encode into.
+ * @param len The length of the buffer.
+ * @param off (inout) The offset to start writing at. This will be
+ * updated with the new offset after we finish the
+ * encoding.
+ *
+ * @return 0 on success; -EINVAL if we ran out of space.
+ */
+int varint32_encode(int32_t val, uint8_t *buf, int32_t max, int32_t *off);
+
+/**
+ * Decode a 32-bit varint.
+ *
+ * @param val (out param) The decoded value.
+ * @param buf The buffer to decode from.
+ * @param len The length of the buffer.
+ * @param off (inout) The offset to start reading at. This will be
+ * updated with the new offset after we finish the
+ * decoding.
+ *
+ * @return 0 on success; -EINVAL if we ran out of space.
+ */
+int varint32_decode(int32_t *val, const uint8_t *buf, int32_t max,
+ int32_t *off);
+
+/**
+ * Encode a fixed-len, big-endian int32_t.
+ *
+ * @param val The value to encode.
+ * @param buf The buffer to encode into.
+ */
+void be32_encode(int32_t val, uint8_t *buf);
+
+/**
+ * Decode a fixed-len, big-endian int32_t.
+ *
+ * @param buf The buffer to decode from. The buffer must be at
+ * least 4 bytes long.
+ *
+ * @return The decoded integer.
+ */
+int32_t be32_decode(const uint8_t *buf);
+
+#endif
+
+// vim: ts=4:sw=4:tw=79:et