You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/10/03 20:28:51 UTC

[3/5] HDFS-7012. Add hdfs native client RPC functionality (Zhanwei Wang via Colin P. McCabe)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannel.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannel.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannel.cc
new file mode 100644
index 0000000..6057a01
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannel.cc
@@ -0,0 +1,876 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "IpcConnectionContext.pb.h"
+#include "Logger.h"
+#include "RpcChannel.h"
+#include "RpcClient.h"
+#include "RpcContentWrapper.h"
+#include "RpcHeader.pb.h"
+#include "RpcHeader.pb.h"
+#include "Thread.h"
+#include "WriteBuffer.h"
+#include "server/RpcHelper.h"
+
+#include <google/protobuf/io/coded_stream.h>
+
+#define RPC_HEADER_MAGIC "hrpc"
+#define RPC_HEADER_VERSION 9
+#define SERIALIZATION_TYPE_PROTOBUF 0
+#define CONNECTION_CONTEXT_CALL_ID -3
+
+using namespace ::google::protobuf;
+using namespace google::protobuf::io;
+using namespace hadoop::common;
+using namespace hadoop::hdfs;
+
+namespace hdfs {
+namespace internal {
+
+RpcChannelImpl::RpcChannelImpl(const RpcChannelKey &k, RpcClient &c) :
+    refs(0), available(false), key(k), client(c) {
+    sock = shared_ptr<Socket>(new TcpSocketImpl);
+    sock->setLingerTimeout(k.getConf().getLingerTimeout());
+    in = shared_ptr<BufferedSocketReader>(
+             new BufferedSocketReaderImpl(
+                 *static_cast<TcpSocketImpl *>(sock.get())));
+    lastActivity = lastIdle = steady_clock::now();
+}
+
+RpcChannelImpl::RpcChannelImpl(const RpcChannelKey &k, Socket *s,
+                               BufferedSocketReader *in, RpcClient &c) :
+    refs(0), available(false), key(k), client(c) {
+    sock = shared_ptr<Socket>(s);
+    this->in = shared_ptr<BufferedSocketReader>(in);
+    lastActivity = lastIdle = steady_clock::now();
+}
+
+RpcChannelImpl::~RpcChannelImpl() {
+    assert(pendingCalls.empty());
+    assert(refs == 0);
+
+    if (available) {
+        sock->close();
+    }
+}
+
+void RpcChannelImpl::close(bool immediate) {
+    lock_guard<mutex> lock(writeMut);
+    --refs;
+    assert(refs >= 0);
+
+    if (immediate && !refs) {
+        assert(pendingCalls.empty());
+        available = false;
+        sock->close();
+    }
+}
+
+void RpcChannelImpl::sendSaslMessage(RpcSaslProto *msg, Message *resp) {
+    int totalLen;
+    WriteBuffer buffer;
+    RpcRequestHeaderProto rpcHeader;
+    rpcHeader.set_callid(AuthProtocol::SASL);
+    rpcHeader.set_clientid(client.getClientId());
+    rpcHeader.set_retrycount(INVALID_RETRY_COUNT);
+    rpcHeader.set_rpckind(RPC_PROTOCOL_BUFFER);
+    rpcHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET);
+    RpcContentWrapper wrapper(&rpcHeader, msg);
+    totalLen = wrapper.getLength();
+    buffer.writeBigEndian(totalLen);
+    wrapper.writeTo(buffer);
+    sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0),
+                     key.getConf().getWriteTimeout());
+    RpcRemoteCallPtr call(
+        new RpcRemoteCall(RpcCall(false, "sasl message", NULL, resp),
+                          AuthProtocol::SASL, client.getClientId()));
+    pendingCalls[AuthProtocol::SASL] = call;
+}
+
+const RpcSaslProto_SaslAuth * RpcChannelImpl::createSaslClient(
+    const RepeatedPtrField<RpcSaslProto_SaslAuth> *auths) {
+    const RpcSaslProto_SaslAuth *auth = NULL;
+    Token token;
+
+    for (int i = 0; i < auths->size(); ++i) {
+        auth = &auths->Get(i);
+        RpcAuth method(RpcAuth::ParseMethod(auth->method()));
+
+        if (method.getMethod() == AuthMethod::TOKEN && key.hasToken()) {
+            token = key.getToken();
+            break;
+        } else if (method.getMethod() == AuthMethod::KERBEROS) {
+            break;
+        } else if (method.getMethod() == AuthMethod::SIMPLE) {
+            return auth;
+        } else if (method.getMethod() == AuthMethod::UNKNOWN) {
+            return auth;
+        } else {
+            auth = NULL;
+        }
+    }
+
+    if (!auth) {
+        std::stringstream ss;
+        ss << "Client cannot authenticate via: ";
+
+        for (int i = 0; i < auths->size(); ++i) {
+            auth = &auths->Get(i);
+            ss << auth->mechanism() << ", ";
+        }
+
+        THROW(AccessControlException, "%s", ss.str().c_str());
+    }
+
+    saslClient = shared_ptr<SaslClient>(
+                     new SaslClient(*auth, token,
+                        key.getAuth().getUser().getPrincipal()));
+    return auth;
+}
+
+std::string RpcChannelImpl::saslEvaluateToken(RpcSaslProto &response,
+                                              bool serverIsDone) {
+    std::string token;
+
+    if (response.has_token()) {
+        token = saslClient->evaluateChallenge(response.token());
+    } else if (!serverIsDone) {
+        THROW(AccessControlException, "Server challenge contains no token");
+    }
+
+    if (serverIsDone) {
+        if (!saslClient->isComplete()) {
+            THROW(AccessControlException, "Client is out of sync with server");
+        }
+
+        if (!token.empty()) {
+            THROW(AccessControlException, "Client generated spurious "
+                  "response");
+        }
+    }
+
+    return token;
+}
+
+RpcAuth RpcChannelImpl::setupSaslConnection() {
+    RpcAuth retval;
+    RpcSaslProto negotiateRequest, response, msg;
+    negotiateRequest.set_state(RpcSaslProto_SaslState_NEGOTIATE);
+    sendSaslMessage(&negotiateRequest, &response);
+    bool done = false;
+
+    do {
+        readOneResponse(false);
+        msg.Clear();
+
+        switch (response.state()) {
+        case RpcSaslProto_SaslState_NEGOTIATE: {
+            const RpcSaslProto_SaslAuth *auth = createSaslClient(
+                    &response.auths());
+            retval = RpcAuth(RpcAuth::ParseMethod(auth->method()));
+
+            if (retval.getMethod() == AuthMethod::SIMPLE) {
+                done = true;
+            } else if (retval.getMethod() == AuthMethod::UNKNOWN) {
+                THROW(AccessControlException, "Unknown auth mechanism");
+            } else {
+                std::string respToken;
+                RpcSaslProto_SaslAuth *respAuth = msg.add_auths();
+                respAuth->CopyFrom(*auth);
+                std::string chanllege;
+
+                if (auth->has_challenge()) {
+                    chanllege = auth->challenge();
+                    respAuth->clear_challenge();
+                }
+
+                respToken = saslClient->evaluateChallenge(chanllege);
+
+                if (!respToken.empty()) {
+                    msg.set_token(respToken);
+                }
+
+                msg.set_state(RpcSaslProto_SaslState_INITIATE);
+            }
+
+            break;
+        }
+
+        case RpcSaslProto_SaslState_CHALLENGE: {
+            if (!saslClient) {
+                THROW(AccessControlException, "Server sent unsolicited challenge");
+            }
+
+            std::string token = saslEvaluateToken(response, false);
+            msg.set_token(token);
+            msg.set_state(RpcSaslProto_SaslState_RESPONSE);
+            break;
+        }
+
+        case RpcSaslProto_SaslState_SUCCESS:
+            if (!saslClient) {
+                retval = RpcAuth(AuthMethod::SIMPLE);
+            } else {
+                saslEvaluateToken(response, true);
+            }
+
+            done = true;
+            break;
+
+        default:
+            break;
+        }
+
+        if (!done) {
+            response.Clear();
+            sendSaslMessage(&msg, &response);
+        }
+    } while (!done);
+
+    return retval;
+}
+
+void RpcChannelImpl::connect() {
+    int sleep = 1;
+    exception_ptr lastError;
+    const RpcConfig & conf = key.getConf();
+    const RpcServerInfo & server = key.getServer();
+
+    for (int i = 0; i < conf.getMaxRetryOnConnect(); ++i) {
+        RpcAuth auth = key.getAuth();
+
+        try {
+            while (true) {
+                sock->connect(server.getHost().c_str(),
+                              server.getPort().c_str(),
+                              conf.getConnectTimeout());
+                sock->setNoDelay(conf.isTcpNoDelay());
+                sendConnectionHeader();
+
+                if (auth.getProtocol() == AuthProtocol::SASL) {
+                    auth = setupSaslConnection();
+                    if (auth.getProtocol() == AuthProtocol::SASL) {
+                        //success
+                        break;
+                    }
+
+                    /*
+                     * switch to other auth protocol
+                     */
+                    sock->close();
+                    CheckOperationCanceled();
+                } else {
+                    break;
+                }
+            }
+
+            auth.setUser(key.getAuth().getUser());
+            sendConnectionContent(auth);
+            available = true;
+            lastActivity = lastIdle = steady_clock::now();
+            return;
+        } catch (const SaslException & e) {
+            /*
+             * Namenode may treat this connect as replay, retry later
+             */
+            sleep = (rand() % 5) + 1;
+            lastError = current_exception();
+            LOG(LOG_ERROR,
+                "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s",
+                server.getHost().c_str(), server.getPort().c_str(),
+                GetExceptionDetail(e));
+        } catch (const HdfsNetworkException & e) {
+            sleep = 1;
+            lastError = current_exception();
+            LOG(LOG_ERROR,
+                "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s",
+                server.getHost().c_str(), server.getPort().c_str(),
+                GetExceptionDetail(e));
+        } catch (const HdfsTimeoutException & e) {
+            sleep = 1;
+            lastError = current_exception();
+            LOG(LOG_ERROR,
+                "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s",
+                server.getHost().c_str(), server.getPort().c_str(),
+                GetExceptionDetail(e));
+        }
+
+        if (i + 1 < conf.getMaxRetryOnConnect()) {
+            LOG(INFO,
+                "Retrying connect to server: \"%s:%s\". "
+                "Already tried %d time(s)", server.getHost().c_str(),
+                server.getPort().c_str(), i + 1);
+        }
+
+        sock->close();
+        CheckOperationCanceled();
+        sleep_for(seconds(sleep));
+    }
+
+    rethrow_exception(lastError);
+}
+
+exception_ptr RpcChannelImpl::invokeInternal(RpcRemoteCallPtr remote) {
+    const RpcCall & call = remote->getCall();
+    exception_ptr lastError;
+
+    try {
+        if (client.isRunning()) {
+            lock_guard<mutex> lock(writeMut);
+
+            if (!available) {
+                connect();
+            }
+
+            sendRequest(remote);
+        }
+
+        /*
+         * We use one call thread to check response,
+         * other thread will wait on RPC call complete.
+         */
+        while (client.isRunning()) {
+            if (remote->finished()) {
+                /*
+                 * Current RPC call has finished.
+                 * Wake up another thread to check response.
+                 */
+                wakeupOneCaller(remote->getIdentity());
+                break;
+            }
+
+            unique_lock<mutex> lock(readMut, defer_lock_t());
+
+            if (lock.try_lock()) {
+                /*
+                 * Current thread will check response.
+                 */
+                checkOneResponse();
+            } else {
+                /*
+                 * Another thread checks response, just wait.
+                 */
+                remote->wait();
+            }
+        }
+    } catch (const HdfsNetworkConnectException & e) {
+        try {
+            NESTED_THROW(HdfsFailoverException,
+                       "Failed to invoke RPC call \"%s\" on server \"%s:%s\"",
+                       call.getName(), key.getServer().getHost().c_str(),
+                       key.getServer().getPort().c_str());
+        } catch (const HdfsFailoverException & e) {
+            lastError = current_exception();
+        }
+    } catch (const HdfsNetworkException & e) {
+        try {
+            NESTED_THROW(HdfsRpcException,
+                       "Failed to invoke RPC call \"%s\" on server \"%s:%s\"",
+                       call.getName(), key.getServer().getHost().c_str(),
+                       key.getServer().getPort().c_str());
+        } catch (const HdfsRpcException & e) {
+            lastError = current_exception();
+        }
+    } catch (const HdfsTimeoutException & e) {
+        try {
+            NESTED_THROW(HdfsFailoverException,
+                       "Failed to invoke RPC call \"%s\" on server \"%s:%s\"",
+                       call.getName(), key.getServer().getHost().c_str(),
+                       key.getServer().getPort().c_str());
+        } catch (const HdfsFailoverException & e) {
+            lastError = current_exception();
+        }
+    } catch (const HdfsRpcException & e) {
+        lastError = current_exception();
+    } catch (const HdfsIOException & e) {
+        try {
+            NESTED_THROW(HdfsRpcException,
+                       "Failed to invoke RPC call \"%s\" on server \"%s:%s\"",
+                       call.getName(), key.getServer().getHost().c_str(),
+                       key.getServer().getPort().c_str());
+        } catch (const HdfsRpcException & e) {
+            lastError = current_exception();
+        }
+    }
+
+    return lastError;
+}
+
+void RpcChannelImpl::invoke(const RpcCall & call) {
+    assert(refs > 0);
+    RpcRemoteCallPtr remote;
+    exception_ptr lastError;
+
+    try {
+        bool retry = false;
+
+        do {
+            int32_t id = client.getCallId();
+            remote = RpcRemoteCallPtr(
+                  new RpcRemoteCall(call, id, client.getClientId()));
+            lastError = exception_ptr();
+            lastError = invokeInternal(remote);
+
+            if (lastError) {
+                lock_guard<mutex> lock(writeMut);
+                shutdown(lastError);
+
+                if (!retry && call.isIdempotent()) {
+                    retry = true;
+                    LOG(LOG_ERROR,
+                        "Failed to invoke RPC call \"%s\" on "
+                        "server \"%s:%s\": \n%s",
+                        call.getName(), key.getServer().getHost().c_str(),
+                        key.getServer().getPort().c_str(),
+                        GetExceptionDetail(lastError));
+                    LOG(INFO,
+                        "Retry idempotent RPC call \"%s\" on "
+                        "server \"%s:%s\"",
+                        call.getName(), key.getServer().getHost().c_str(),
+                        key.getServer().getPort().c_str());
+                } else {
+                    rethrow_exception(lastError);
+                }
+            } else {
+                break;
+            }
+        } while (retry);
+    } catch (const HdfsRpcServerException & e) {
+        if (!remote->finished()) {
+            /*
+             * a fatal error happened, the caller will unwrap it.
+             */
+            lock_guard<mutex> lock(writeMut);
+            lastError = current_exception();
+            shutdown(lastError);
+        }
+
+        /*
+         * else not a fatal error, check again at the end of this function.
+         */
+    } catch (const HdfsException & e) {
+        lock_guard<mutex> lock(writeMut);
+        lastError = current_exception();
+        shutdown(lastError);
+    }
+
+    /*
+     * if the call is not finished, either failed to setup connection,
+     * or client is closing.
+     */
+    if (!remote->finished() || !client.isRunning()) {
+        lock_guard<mutex> lock(writeMut);
+
+        if (lastError == exception_ptr()) {
+            try {
+                THROW(hdfs::HdfsRpcException,
+                      "Failed to invoke RPC call \"%s\", RPC channel "
+                      "to \"%s:%s\" is to be closed since RpcClient is "
+                      "closing", call.getName(),
+                      key.getServer().getHost().c_str(),
+                      key.getServer().getPort().c_str());
+            } catch (...) {
+                lastError = current_exception();
+            }
+        }
+
+        /*
+         * wake up all.
+         */
+        shutdown(lastError);
+        rethrow_exception(lastError);
+    }
+
+    remote->check();
+}
+
+void RpcChannelImpl::shutdown(exception_ptr reason) {
+    assert(reason != exception_ptr());
+    available = false;
+    cleanupPendingCalls(reason);
+    sock->close();
+}
+
+void RpcChannelImpl::wakeupOneCaller(int32_t id) {
+    lock_guard<mutex> lock(writeMut);
+    unordered_map<int32_t, RpcRemoteCallPtr>::iterator s, e;
+    e = pendingCalls.end();
+
+    for (s = pendingCalls.begin(); s != e; ++s) {
+        if (s->first != id) {
+            s->second->wakeup();
+            return;
+        }
+    }
+}
+
+void RpcChannelImpl::sendRequest(RpcRemoteCallPtr remote) {
+    WriteBuffer buffer;
+    assert(true == available);
+    remote->serialize(key.getProtocol(), buffer);
+    sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0),
+                     key.getConf().getWriteTimeout());
+    uint32_t id = remote->getIdentity();
+    pendingCalls[id] = remote;
+    lastActivity = lastIdle = steady_clock::now();
+}
+
+void RpcChannelImpl::cleanupPendingCalls(exception_ptr reason) {
+    assert(!writeMut.try_lock());
+    unordered_map<int32_t, RpcRemoteCallPtr>::iterator s, e;
+    e = pendingCalls.end();
+
+    for (s = pendingCalls.begin(); s != e; ++s) {
+        s->second->cancel(reason);
+    }
+
+    pendingCalls.clear();
+}
+
+void RpcChannelImpl::checkOneResponse() {
+    int ping = key.getConf().getPingTimeout();
+    int timeout = key.getConf().getRpcTimeout();
+    steady_clock::time_point start = steady_clock::now();
+
+    while (client.isRunning()) {
+        if (getResponse()) {
+            readOneResponse(true);
+            return;
+        } else {
+            if (ping > 0 && ToMilliSeconds(lastActivity,
+                    steady_clock::now()) >= ping) {
+                lock_guard<mutex> lock(writeMut);
+                sendPing();
+            }
+        }
+
+        if (timeout > 0 && ToMilliSeconds(start,
+                    steady_clock::now()) >= timeout) {
+            try {
+                THROW(hdfs::HdfsTimeoutException,
+                  "Timeout when wait for response from RPC channel \"%s:%s\"",
+                  key.getServer().getHost().c_str(),
+                  key.getServer().getPort().c_str());
+            } catch (...) {
+                NESTED_THROW(hdfs::HdfsRpcException,
+                  "Timeout when wait for response from RPC channel \"%s:%s\"",
+                  key.getServer().getHost().c_str(),
+                  key.getServer().getPort().c_str());
+            }
+        }
+    }
+}
+
+void RpcChannelImpl::sendPing() {
+    static const std::vector<char> pingRequest =
+      RpcRemoteCall::GetPingRequest(client.getClientId());
+
+    if (available) {
+        LOG(INFO,
+            "RPC channel to \"%s:%s\" got no response or idle for %d "
+            "milliseconds, sending ping.",
+            key.getServer().getHost().c_str(),
+            key.getServer().getPort().c_str(), key.getConf().getPingTimeout());
+        sock->writeFully(&pingRequest[0], pingRequest.size(),
+                         key.getConf().getWriteTimeout());
+        lastActivity = steady_clock::now();
+    }
+}
+
+bool RpcChannelImpl::checkIdle() {
+    unique_lock<mutex> lock(writeMut, defer_lock_t());
+
+    if (lock.try_lock()) {
+        if (!pendingCalls.empty() || refs > 0) {
+            lastIdle = steady_clock::now();
+            return false;
+        }
+
+        int idle = key.getConf().getMaxIdleTime();
+        int ping = key.getConf().getPingTimeout();
+
+        try {
+            //close the connection if idle timeout
+            if (ToMilliSeconds(lastIdle, steady_clock::now()) >= idle) {
+                sock->close();
+                return true;
+            }
+
+            //send ping
+            if (ping > 0 && ToMilliSeconds(lastActivity,
+                                           steady_clock::now()) >= ping) {
+                sendPing();
+            }
+        } catch (...) {
+            LOG(LOG_ERROR,
+                "Failed to send ping via idle RPC channel "
+                "to server \"%s:%s\": \n%s",
+                key.getServer().getHost().c_str(),
+                key.getServer().getPort().c_str(),
+                GetExceptionDetail(current_exception()));
+            sock->close();
+            return true;
+        }
+    }
+
+    return false;
+}
+
+void RpcChannelImpl::waitForExit() {
+    assert(!client.isRunning());
+
+    while (refs != 0) {
+        sleep_for(milliseconds(100));
+    }
+
+    assert(pendingCalls.empty());
+}
+
+/**
+ * Write the connection header - this is sent when connection is established
+ * +----------------------------------+
+ * |  "hrpc" 4 bytes                  |
+ * +----------------------------------+
+ * |  Version (1 byte)                |
+ * +----------------------------------+
+ * |  Service Class (1 byte)          |
+ * +----------------------------------+
+ * |  AuthProtocol (1 byte)           |
+ * +----------------------------------+
+ */
+void RpcChannelImpl::sendConnectionHeader() {
+    WriteBuffer buffer;
+    buffer.write(RPC_HEADER_MAGIC, strlen(RPC_HEADER_MAGIC));
+    buffer.write(static_cast<char>(RPC_HEADER_VERSION));
+    buffer.write(static_cast<char>(0));  //for future feature
+    buffer.write(static_cast<char>(key.getAuth().getProtocol()));
+    sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0),
+                     key.getConf().getWriteTimeout());
+}
+
+void RpcChannelImpl::buildConnectionContext(
+    IpcConnectionContextProto & connectionContext, const RpcAuth & auth) {
+    connectionContext.set_protocol(key.getProtocol().getProtocol());
+    std::string euser = key.getAuth().getUser().getPrincipal();
+    std::string ruser = key.getAuth().getUser().getRealUser();
+
+    if (auth.getMethod() != AuthMethod::TOKEN) {
+        UserInformationProto * user = connectionContext.mutable_userinfo();
+        user->set_effectiveuser(euser);
+
+        if (auth.getMethod() != AuthMethod::SIMPLE) {
+            if (!ruser.empty() && ruser != euser) {
+                user->set_realuser(ruser);
+            }
+        }
+    }
+}
+
+void RpcChannelImpl::sendConnectionContent(const RpcAuth & auth) {
+    WriteBuffer buffer;
+    IpcConnectionContextProto connectionContext;
+    RpcRequestHeaderProto rpcHeader;
+    buildConnectionContext(connectionContext, auth);
+    rpcHeader.set_callid(CONNECTION_CONTEXT_CALL_ID);
+    rpcHeader.set_clientid(client.getClientId());
+    rpcHeader.set_retrycount(INVALID_RETRY_COUNT);
+    rpcHeader.set_rpckind(RPC_PROTOCOL_BUFFER);
+    rpcHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET);
+    RpcContentWrapper wrapper(&rpcHeader, &connectionContext);
+    int size = wrapper.getLength();
+    buffer.writeBigEndian(size);
+    wrapper.writeTo(buffer);
+    sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0),
+                     key.getConf().getWriteTimeout());
+    lastActivity = lastIdle = steady_clock::now();
+}
+
+RpcRemoteCallPtr RpcChannelImpl::getPendingCall(int32_t id) {
+    unordered_map<int32_t, RpcRemoteCallPtr>::iterator it;
+    it = pendingCalls.find(id);
+
+    if (it == pendingCalls.end()) {
+        THROW(HdfsRpcException,
+              "RPC channel to \"%s:%s\" got protocol mismatch: RPC channel "
+              "cannot find pending call: id = %d.",
+              key.getServer().getHost().c_str(),
+              key.getServer().getPort().c_str(), static_cast<int>(id));
+    }
+
+    RpcRemoteCallPtr rc = it->second;
+    pendingCalls.erase(it);
+    return rc;
+}
+
+bool RpcChannelImpl::getResponse() {
+    int idleTimeout = key.getConf().getMaxIdleTime();
+    int pingTimeout = key.getConf().getPingTimeout();
+    int timeout = key.getConf().getRpcTimeout();
+    int interval = pingTimeout < idleTimeout ? pingTimeout : idleTimeout;
+    interval /= 2;
+    interval = interval < timeout ? interval : timeout;
+    steady_clock::time_point s = steady_clock::now();
+
+    while (client.isRunning()) {
+        if (in->poll(500)) {
+            return true;
+        }
+
+        if (ToMilliSeconds(s, steady_clock::now()) >= interval) {
+            return false;
+        }
+    }
+
+    return false;
+}
+
+static exception_ptr HandlerRpcResponseException(exception_ptr e) {
+    exception_ptr retval = e;
+
+    try {
+        rethrow_exception(e);
+    } catch (const HdfsRpcServerException & e) {
+        UnWrapper < NameNodeStandbyException, RpcNoSuchMethodException,
+                  UnsupportedOperationException, AccessControlException,
+                  SafeModeException, SaslException > unwrapper(e);
+
+        try {
+            unwrapper.unwrap(__FILE__, __LINE__);
+        } catch (const NameNodeStandbyException & e) {
+            retval = current_exception();
+        } catch (const UnsupportedOperationException & e) {
+            retval = current_exception();
+        } catch (const AccessControlException & e) {
+            retval = current_exception();
+        } catch (const SafeModeException & e) {
+            retval = current_exception();
+        } catch (const SaslException & e) {
+            retval = current_exception();
+        } catch (const RpcNoSuchMethodException & e) {
+            retval = current_exception();
+        } catch (const HdfsIOException & e) {
+        }
+    }
+
+    return retval;
+}
+
+void RpcChannelImpl::readOneResponse(bool writeLock) {
+    int readTimeout = key.getConf().getReadTimeout();
+    std::vector<char> buffer(128);
+    RpcResponseHeaderProto curRespHeader;
+    RpcResponseHeaderProto::RpcStatusProto status;
+    uint32_t totalen, headerSize = 0, bodySize = 0;
+    totalen = in->readBigEndianInt32(readTimeout);
+    /*
+     * read response header
+     */
+    headerSize = in->readVarint32(readTimeout);
+    buffer.resize(headerSize);
+    in->readFully(&buffer[0], headerSize, readTimeout);
+
+    if (!curRespHeader.ParseFromArray(&buffer[0], headerSize)) {
+        THROW(HdfsRpcException,
+              "RPC channel to \"%s:%s\" got protocol mismatch: RPC channel "
+              "cannot parse response header.",
+              key.getServer().getHost().c_str(),
+              key.getServer().getPort().c_str())
+    }
+
+    lastActivity = steady_clock::now();
+    status = curRespHeader.status();
+
+    if (RpcResponseHeaderProto_RpcStatusProto_SUCCESS == status) {
+        /*
+         * on success, read response body
+         */
+        RpcRemoteCallPtr rc;
+
+        if (writeLock) {
+            lock_guard<mutex> lock(writeMut);
+            rc = getPendingCall(curRespHeader.callid());
+        } else {
+            rc = getPendingCall(curRespHeader.callid());
+        }
+
+        bodySize = in->readVarint32(readTimeout);
+        buffer.resize(bodySize);
+
+        if (bodySize > 0) {
+            in->readFully(&buffer[0], bodySize, readTimeout);
+        }
+
+        Message *response = rc->getCall().getResponse();
+
+        if (!response->ParseFromArray(&buffer[0], bodySize)) {
+            THROW(HdfsRpcException,
+                  "RPC channel to \"%s:%s\" got protocol mismatch: rpc "
+                  "channel cannot parse response.",
+                  key.getServer().getHost().c_str(),
+                  key.getServer().getPort().c_str())
+        }
+
+        rc->done();
+    } else {
+        /*
+         * on error, read error class and message
+         */
+        std::string errClass, errMessage;
+        errClass = curRespHeader.exceptionclassname();
+        errMessage = curRespHeader.errormsg();
+
+        if (RpcResponseHeaderProto_RpcStatusProto_ERROR == status) {
+            RpcRemoteCallPtr rc;
+            {
+                lock_guard<mutex> lock(writeMut);
+                rc = getPendingCall(curRespHeader.callid());
+            }
+
+            try {
+                THROW(HdfsRpcServerException, "%s: %s",
+                      errClass.c_str(), errMessage.c_str());
+            } catch (HdfsRpcServerException & e) {
+                e.setErrClass(errClass);
+                e.setErrMsg(errMessage);
+                rc->cancel(HandlerRpcResponseException(current_exception()));
+            }
+        } else { /*fatal*/
+            assert(RpcResponseHeaderProto_RpcStatusProto_FATAL == status);
+
+            if (errClass.empty()) {
+                THROW(HdfsRpcException, "%s: %s",
+                      errClass.c_str(), errMessage.c_str());
+            }
+
+            try {
+                THROW(HdfsRpcServerException, "%s: %s", errClass.c_str(),
+                      errMessage.c_str());
+            } catch (HdfsRpcServerException & e) {
+                e.setErrClass(errClass);
+                e.setErrMsg(errMessage);
+                rethrow_exception(
+                      HandlerRpcResponseException(current_exception()));
+            }
+        }
+    }
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannel.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannel.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannel.h
new file mode 100644
index 0000000..f80019f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannel.h
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCHANNEL_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCHANNEL_H_
+
+#include "Atomic.h"
+#include "DateTime.h"
+#include "ExceptionInternal.h"
+#include "IpcConnectionContext.pb.h"
+#include "RpcCall.h"
+#include "RpcChannelKey.h"
+#include "RpcHeader.pb.h"
+#include "RpcRemoteCall.h"
+#include "SaslClient.h"
+#include "SharedPtr.h"
+#include "Thread.h"
+#include "UnorderedMap.h"
+#include "network/BufferedSocketReader.h"
+#include "network/TcpSocket.h"
+
+#include <google/protobuf/message.h>
+
+namespace hdfs {
+namespace internal {
+
+class RpcClient;
+
+using hadoop::common::RpcSaslProto;
+using hadoop::common::RpcSaslProto_SaslAuth;
+
+class RpcChannel {
+public:
+    /**
+     * Destroy a channel
+     */
+    virtual ~RpcChannel() {
+    }
+
+    /**
+     * The caller finished the rpc call,
+     * this channel may be reused later if immediate is false.
+     * @param immediate Do not reuse the channel any more if immediate is true.
+     */
+    virtual void close(bool immediate) = 0;
+
+    /**
+     * Invoke a rpc call.
+     * @param call The call is to be invoked.
+     * @return The remote call object.
+     */
+    virtual void invoke(const RpcCall &call) = 0;
+
+    /**
+     * Close the channel if it idle expired.
+     * @return true if the channel idle expired.
+     */
+    virtual bool checkIdle() = 0;
+
+    /**
+     * Wait for all reference exiting.
+     * The channel cannot be reused any more.
+     * @pre RpcClient is not running.
+     */
+    virtual void waitForExit() = 0;
+
+    /**
+     * Add reference count to this channel.
+     */
+    virtual void addRef() = 0;
+};
+
+/**
+ * RpcChannel represent a rpc connect to the server.
+ */
+class RpcChannelImpl: public RpcChannel {
+public:
+    /**
+     * Construct a RpcChannelImpl instance.
+     * @param k The key of this channel.
+     */
+    RpcChannelImpl(const RpcChannelKey &k, RpcClient &c);
+
+    /**
+     * Destroy a RpcChannelImpl instance.
+     */
+    ~RpcChannelImpl();
+
+    /**
+     * The caller finished the rpc call,
+     * this channel may be reused later if immediate is false.
+     * @param immediate Do not reuse the channel any more if immediate is true.
+     */
+    void close(bool immediate);
+
+    /**
+     * Invoke a rpc call.
+     * @param call The call is to be invoked.
+     * @return The remote call object.
+     */
+    void invoke(const RpcCall &call);
+
+    /**
+     * Close the channel if it idle expired.
+     * @return true if the channel idle expired.
+     */
+    bool checkIdle();
+
+    /**
+     * Wait for all reference exiting.
+     * The channel cannot be reused any more.
+     * @pre RpcClient is not running.
+     */
+    void waitForExit();
+
+    /**
+     * Add reference count to this channel.
+     */
+    void addRef() {
+        ++refs;
+    }
+
+private:
+    /**
+     * Setup the RPC connection.
+     * @pre Already hold write lock.
+     */
+    void connect();
+
+    /**
+     * Cleanup all pending calls.
+     * @param reason The reason to cancel the call.
+     * @pre Already hold write lock.
+     */
+    void cleanupPendingCalls(exception_ptr reason);
+
+    /**
+     * Send rpc connect protocol header.
+     * @throw HdfsNetworkException
+     * @throw HdfsTimeout
+     */
+    void sendConnectionHeader();
+
+    /**
+     * Send rpc connection protocol content.
+     */
+    void sendConnectionContent(const RpcAuth &auth);
+
+    /**
+     * Build rpc connect context.
+     */
+    void buildConnectionContext(
+          hadoop::common::IpcConnectionContextProto &connectionContext,
+          const RpcAuth &auth);
+
+    /**
+     * Send ping packet to server.
+     * @throw HdfsNetworkException
+     * @throw HdfsTimeout
+     * @pre Caller should hold the write lock.
+     */
+    void sendPing();
+
+    /**
+     * Send the call message to rpc server.
+     * @param remote The remote call.
+     * @pre Already hold write lock.
+     */
+    void sendRequest(RpcRemoteCallPtr remote);
+
+    /**
+     * Issue a rpc call and check response.
+     * Catch all recoverable error in this function
+     *
+     * @param remote The remote call
+     */
+    exception_ptr invokeInternal(RpcRemoteCallPtr remote);
+
+    /**
+     * Check response, block until get one response.
+     * @pre Channel already hold read lock.
+     */
+    void checkOneResponse();
+
+    /**
+     * read and handle one response.
+     * @pre Channel already hold read lock.
+     */
+    void readOneResponse(bool writeLock);
+
+    /**
+     * Get the call object with given id, and then remove it from pending call list.
+     * @param id The id of the call object to be returned.
+     * @return The call object with given id.
+     * @throw HdfsIOException
+     * @pre Channel already locked.
+     */
+    RpcRemoteCallPtr getPendingCall(int32_t id);
+
+    /**
+     * Check if there is data available for reading on socket.
+     * @return true if response is available.
+     */
+    bool getResponse();
+
+    /**
+     * wake up one caller to check response.
+     * @param id The call id which current caller handled.
+     */
+    void wakeupOneCaller(int32_t id);
+
+    /**
+     * shutdown the RPC connection since error.
+     * @param reason The reason to cancel the call
+     * @pre Already hold write lock.
+     */
+    void shutdown(exception_ptr reason);
+
+    const RpcSaslProto_SaslAuth *createSaslClient(
+        const ::google::protobuf::RepeatedPtrField<RpcSaslProto_SaslAuth> *auths);
+
+    void sendSaslMessage(RpcSaslProto *msg, ::google::protobuf::Message *resp);
+
+    std::string saslEvaluateToken(RpcSaslProto &response, bool serverIsDone);
+
+    RpcAuth setupSaslConnection();
+
+private:
+    /**
+     * Construct a RpcChannelImpl instance for test.
+     * @param key The key of this channel.
+     * @param sock The socket instance.
+     * @param in The BufferedSocketReader instance build on sock.
+     * @param client The RpcClient instance.
+     */
+    RpcChannelImpl(const RpcChannelKey &key, Socket *sock,
+                   BufferedSocketReader *in, RpcClient &client);
+
+private:
+    atomic<int> refs;
+    bool available;
+    mutex readMut;
+    mutex writeMut;
+    RpcChannelKey key;
+    RpcClient &client;
+    shared_ptr<BufferedSocketReader> in;
+    shared_ptr<SaslClient> saslClient;
+    shared_ptr<Socket> sock;
+    steady_clock::time_point lastActivity; // ping is a kind of activity, lastActivity will be updated after ping
+    steady_clock::time_point lastIdle; // ping cannot change idle state. If there is still pending calls, lastIdle is always "NOW".
+    unordered_map<int32_t, RpcRemoteCallPtr> pendingCalls;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCHANNEL_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannelKey.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannelKey.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannelKey.cc
new file mode 100644
index 0000000..b3600cb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannelKey.cc
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RpcChannelKey.h"
+
+#include <vector>
+
+namespace hdfs {
+namespace internal {
+
+RpcChannelKey::RpcChannelKey(const RpcAuth &a, const RpcProtocolInfo &p,
+                             const RpcServerInfo &s, const RpcConfig &c) :
+    auth(a), conf(c), protocol(p), server(s) {
+    const Token *temp = auth.getUser().selectToken(protocol.getTokenKind(),
+                         server.getTokenService());
+
+    if (temp) {
+        token = shared_ptr<Token> (new Token(*temp));
+    }
+}
+
+size_t RpcChannelKey::hash_value() const {
+    size_t tokenHash = token ? token->hash_value() : 0;
+    size_t values[] = { auth.hash_value(), protocol.hash_value(),
+                        server.hash_value(), conf.hash_value(), tokenHash
+                      };
+    return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannelKey.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannelKey.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannelKey.h
new file mode 100644
index 0000000..72ffcae
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannelKey.h
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCHANNELKEY_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCHANNELKEY_H_
+
+#include "Hash.h"
+#include "RpcAuth.h"
+#include "RpcConfig.h"
+#include "RpcProtocolInfo.h"
+#include "RpcServerInfo.h"
+#include "SharedPtr.h"
+#include "client/Token.h"
+
+namespace hdfs {
+namespace internal {
+
+class RpcChannelKey {
+public:
+    RpcChannelKey(const RpcAuth &a, const RpcProtocolInfo &p,
+                  const RpcServerInfo &s, const RpcConfig &c);
+
+public:
+    size_t hash_value() const;
+
+    const RpcAuth &getAuth() const {
+        return auth;
+    }
+
+    const RpcConfig &getConf() const {
+        return conf;
+    }
+
+    const RpcProtocolInfo &getProtocol() const {
+        return protocol;
+    }
+
+    const RpcServerInfo &getServer() const {
+        return server;
+    }
+
+    bool operator ==(const RpcChannelKey &other) const {
+        return this->auth == other.auth && this->protocol == other.protocol
+               && this->server == other.server && this->conf == other.conf
+               && ((token == NULL && other.token == NULL)
+                   || (token && other.token && *token == *other.token));
+    }
+
+    const Token &getToken() const {
+        assert(token != NULL);
+        return *token;
+    }
+
+    bool hasToken() {
+        return token != NULL;
+    }
+
+private:
+    const RpcAuth auth;
+    const RpcConfig conf;
+    const RpcProtocolInfo protocol;
+    const RpcServerInfo server;
+    hdfs::internal::shared_ptr<Token> token;
+};
+
+}
+}
+
+HDFS_HASH_DEFINE(::hdfs::internal::RpcChannelKey);
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCHANNELKEY_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcClient.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcClient.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcClient.cc
new file mode 100644
index 0000000..b0d1caf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcClient.cc
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "Logger.h"
+#include "SharedPtr.h"
+#include "RpcClient.h"
+#include "Thread.h"
+
+#include <uuid/uuid.h>
+
+namespace hdfs {
+namespace internal {
+
+once_flag RpcClient::once;
+shared_ptr<RpcClient> RpcClient::client;
+
+void RpcClient::createSinglten() {
+    client = shared_ptr < RpcClient > (new RpcClientImpl());
+}
+
+RpcClient & RpcClient::getClient() {
+    call_once(once, &RpcClientImpl::createSinglten);
+    assert(client);
+    return *client;
+}
+
+RpcClientImpl::RpcClientImpl() :
+    cleaning(false), running(true), count(0) {
+    uuid_t id;
+    uuid_generate(id);
+    clientId.resize(sizeof(uuid_t));
+    memcpy(&clientId[0], id, sizeof(uuid_t));
+#ifdef MOCK
+    stub = NULL;
+#endif
+}
+
+RpcClientImpl::~RpcClientImpl() {
+    running = false;
+    cond.notify_all();
+
+    if (cleaner.joinable()) {
+        cleaner.join();
+    }
+
+    close();
+}
+
+void RpcClientImpl::clean() {
+    assert(cleaning);
+
+    try {
+        while (running) {
+            try {
+                unique_lock<mutex> lock(mut);
+                cond.wait_for(lock, seconds(1));
+
+                if (!running || allChannels.empty()) {
+                    break;
+                }
+
+                unordered_map<RpcChannelKey, shared_ptr<RpcChannel> >::iterator s, e;
+                e = allChannels.end();
+
+                for (s = allChannels.begin(); s != e;) {
+                    if (s->second->checkIdle()) {
+                        s->second.reset();
+                        s = allChannels.erase(s);
+                    } else {
+                        ++s;
+                    }
+                }
+            } catch (const HdfsCanceled & e) {
+                /*
+                 * ignore cancel signal here.
+                 */
+            }
+        }
+    } catch (const hdfs::HdfsException & e) {
+        LOG(LOG_ERROR, "RpcClientImpl's idle cleaner exit: %s",
+            GetExceptionDetail(e));
+    } catch (const std::exception & e) {
+        LOG(LOG_ERROR, "RpcClientImpl's idle cleaner exit: %s", e.what());
+    }
+
+    cleaning = false;
+}
+
+void RpcClientImpl::close() {
+    lock_guard<mutex> lock(mut);
+    running = false;
+    unordered_map<RpcChannelKey, shared_ptr<RpcChannel> >::iterator s, e;
+    e = allChannels.end();
+
+    for (s = allChannels.begin(); s != e; ++s) {
+        s->second->waitForExit();
+    }
+
+    allChannels.clear();
+}
+
+bool RpcClientImpl::isRunning() {
+    return running;
+}
+
+RpcChannel & RpcClientImpl::getChannel(const RpcAuth &auth,
+            const RpcProtocolInfo &protocol, const RpcServerInfo &server,
+            const RpcConfig &conf) {
+    shared_ptr<RpcChannel> rc;
+    RpcChannelKey key(auth, protocol, server, conf);
+
+    try {
+        lock_guard<mutex> lock(mut);
+
+        if (!running) {
+            THROW(hdfs::HdfsRpcException,
+                  "Cannot Setup RPC channel to \"%s:%s\" since RpcClient is closing",
+                  key.getServer().getHost().c_str(), key.getServer().getPort().c_str());
+        }
+
+        unordered_map<RpcChannelKey, shared_ptr<RpcChannel> >::iterator it;
+        it = allChannels.find(key);
+
+        if (it != allChannels.end()) {
+            rc = it->second;
+        } else {
+            rc = createChannelInternal(key);
+            allChannels[key] = rc;
+        }
+
+        rc->addRef();
+
+        if (!cleaning) {
+            cleaning = true;
+
+            if (cleaner.joinable()) {
+                cleaner.join();
+            }
+
+            CREATE_THREAD(cleaner, bind(&RpcClientImpl::clean, this));
+        }
+    } catch (const HdfsRpcException & e) {
+        throw;
+    } catch (...) {
+        NESTED_THROW(HdfsRpcException,
+                     "RpcClient failed to create a channel to \"%s:%s\"",
+                     server.getHost().c_str(), server.getPort().c_str());
+    }
+
+    return *rc;
+}
+
+shared_ptr<RpcChannel> RpcClientImpl::createChannelInternal(
+            const RpcChannelKey & key) {
+    shared_ptr<RpcChannel> channel;
+#ifdef MOCK
+
+    if (stub) {
+        channel = shared_ptr < RpcChannel > (stub->getChannel(key, *this));
+    } else {
+        channel = shared_ptr < RpcChannel > (new RpcChannelImpl(key, *this));
+    }
+
+#else
+    channel = shared_ptr<RpcChannel>(new RpcChannelImpl(key, *this));
+#endif
+    return channel;
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcClient.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcClient.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcClient.h
new file mode 100644
index 0000000..7a7a65a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcClient.h
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCLIENT_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCLIENT_H_
+
+#include "RpcAuth.h"
+#include "RpcCall.h"
+#include "RpcChannel.h"
+#include "RpcChannelKey.h"
+#include "RpcConfig.h"
+#include "RpcProtocolInfo.h"
+#include "RpcServerInfo.h"
+#include "SharedPtr.h"
+#include "Thread.h"
+#include "UnorderedMap.h"
+
+#include <stdint.h>
+#include <string>
+#include <vector>
+
+#ifdef MOCK
+#include "TestRpcChannelStub.h"
+#endif
+
+namespace hdfs {
+namespace internal {
+
+class RpcClient {
+public:
+    /**
+     * Destroy an RpcClient instance.
+     */
+    virtual ~RpcClient() {
+    }
+
+    /**
+     * Get a RPC channel, create a new one if necessary.
+     * @param auth Authentication information used to setup RPC connection.
+     * @param protocol The RPC protocol used in this call.
+     * @param server Remote server information.
+     * @param conf RPC connection configuration.
+     * @param once If true, the RPC channel will not be reused.
+     */
+    virtual RpcChannel &getChannel(const RpcAuth &auth,
+              const RpcProtocolInfo &protocol,
+              const RpcServerInfo &server, const RpcConfig &conf) = 0;
+
+    /**
+     * Check the RpcClient is still running.
+     * @return true if the RpcClient is still running.
+     */
+    virtual bool isRunning() = 0;
+
+    virtual std::string getClientId() const = 0;
+
+    virtual int32_t getCallId() = 0;
+
+public:
+    static RpcClient &getClient();
+    static void createSinglten();
+
+private:
+    static once_flag once;
+    static shared_ptr<RpcClient> client;
+};
+
+class RpcClientImpl: public RpcClient {
+public:
+    /**
+     * Construct a RpcClient.
+     */
+    RpcClientImpl();
+
+    /**
+     * Destroy an RpcClient instance.
+     */
+    ~RpcClientImpl();
+
+    /**
+     * Get a RPC channel, create a new one if necessary.
+     * @param auth Authentication information used to setup RPC connection.
+     * @param protocol The RPC protocol used in this call.
+     * @param server Remote server information.
+     * @param conf RPC connection configuration.
+     * @param once If true, the RPC channel will not be reused.
+     */
+    RpcChannel &getChannel(const RpcAuth &auth,
+          const RpcProtocolInfo &protocol, const RpcServerInfo &server,
+          const RpcConfig &conf);
+
+    /**
+     * Close the RPC channel.
+     */
+    void close();
+
+    /**
+     * Check the RpcClient is still running.
+     * @return true if the RpcClient is still running.
+     */
+    bool isRunning();
+
+    std::string getClientId() const {
+        return clientId;
+    }
+
+    int32_t getCallId() {
+        static mutex mutid;
+        lock_guard<mutex> lock(mutid);
+        ++count;
+        count = count < std::numeric_limits<int32_t>::max() ? count : 0;
+        return count;
+    }
+
+private:
+    shared_ptr<RpcChannel> createChannelInternal(
+        const RpcChannelKey &key);
+
+    void clean();
+
+private:
+    atomic<bool> cleaning;
+    atomic<bool> running;
+    condition_variable cond;
+    int64_t count;
+    mutex mut;
+    std::string clientId;
+    thread cleaner;
+    unordered_map<RpcChannelKey, shared_ptr<RpcChannel> > allChannels;
+
+#ifdef MOCK
+private:
+    hdfs::mock::TestRpcChannelStub *stub;
+#endif
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCLIENT_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcConfig.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcConfig.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcConfig.cc
new file mode 100644
index 0000000..84b5648
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcConfig.cc
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RpcConfig.h"
+
+#include <vector>
+
+namespace hdfs {
+namespace internal {
+
+size_t RpcConfig::hash_value() const {
+    size_t values[] = { Int32Hasher(maxIdleTime), Int32Hasher(pingTimeout),
+                        Int32Hasher(connectTimeout), Int32Hasher(readTimeout), Int32Hasher(
+                            writeTimeout), Int32Hasher(maxRetryOnConnect), Int32Hasher(
+                            lingerTimeout), Int32Hasher(rpcTimeout), BoolHasher(tcpNoDelay)
+                      };
+    return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcConfig.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcConfig.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcConfig.h
new file mode 100644
index 0000000..4b32611
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcConfig.h
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCONFIG_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCONFIG_H_
+
+#include "Hash.h"
+#include "SessionConfig.h"
+
+namespace hdfs {
+namespace internal {
+
+class RpcConfig {
+public:
+
+    RpcConfig(const SessionConfig &conf) {
+        connectTimeout = conf.getRpcConnectTimeout();
+        maxIdleTime = conf.getRpcMaxIdleTime();
+        maxRetryOnConnect = conf.getRpcMaxRetryOnConnect();
+        pingTimeout = conf.getRpcPingTimeout();
+        readTimeout = conf.getRpcReadTimeout();
+        writeTimeout = conf.getRpcWriteTimeout();
+        tcpNoDelay = conf.isRpcTcpNoDelay();
+        lingerTimeout = conf.getRpcSocketLingerTimeout();
+        rpcTimeout = conf.getRpcTimeout();
+    }
+
+    size_t hash_value() const;
+
+    int getConnectTimeout() const {
+        return connectTimeout;
+    }
+
+    void setConnectTimeout(int connectTimeout) {
+        this->connectTimeout = connectTimeout;
+    }
+
+    int getMaxIdleTime() const {
+        return maxIdleTime;
+    }
+
+    void setMaxIdleTime(int maxIdleTime) {
+        this->maxIdleTime = maxIdleTime;
+    }
+
+    int getMaxRetryOnConnect() const {
+        return maxRetryOnConnect;
+    }
+
+    void setMaxRetryOnConnect(int maxRetryOnConnect) {
+        this->maxRetryOnConnect = maxRetryOnConnect;
+    }
+
+    int getReadTimeout() const {
+        return readTimeout;
+    }
+
+    void setReadTimeout(int readTimeout) {
+        this->readTimeout = readTimeout;
+    }
+
+    bool isTcpNoDelay() const {
+        return tcpNoDelay;
+    }
+
+    void setTcpNoDelay(bool tcpNoDelay) {
+        this->tcpNoDelay = tcpNoDelay;
+    }
+
+    int getWriteTimeout() const {
+        return writeTimeout;
+    }
+
+    void setWriteTimeout(int writeTimeout) {
+        this->writeTimeout = writeTimeout;
+    }
+
+    int getPingTimeout() const {
+        return pingTimeout;
+    }
+
+    void setPingTimeout(int maxPingTimeout) {
+        this->pingTimeout = maxPingTimeout;
+    }
+
+    int getLingerTimeout() const {
+        return lingerTimeout;
+    }
+
+    void setLingerTimeout(int lingerTimeout) {
+        this->lingerTimeout = lingerTimeout;
+    }
+
+    int getRpcTimeout() const {
+        return rpcTimeout;
+    }
+
+    void setRpcTimeout(int rpcTimeout) {
+        this->rpcTimeout = rpcTimeout;
+    }
+
+    bool operator ==(const RpcConfig &other) const {
+        return this->maxIdleTime == other.maxIdleTime
+               && this->pingTimeout == other.pingTimeout
+               && this->connectTimeout == other.connectTimeout
+               && this->readTimeout == other.readTimeout
+               && this->writeTimeout == other.writeTimeout
+               && this->maxRetryOnConnect == other.maxRetryOnConnect
+               && this->tcpNoDelay == other.tcpNoDelay
+               && this->lingerTimeout == other.lingerTimeout
+               && this->rpcTimeout == other.rpcTimeout;
+    }
+
+private:
+    int maxIdleTime;
+    int pingTimeout;
+    int connectTimeout;
+    int readTimeout;
+    int writeTimeout;
+    int maxRetryOnConnect;
+    int lingerTimeout;
+    int rpcTimeout;
+    bool tcpNoDelay;
+};
+
+}
+}
+
+HDFS_HASH_DEFINE(::hdfs::internal::RpcConfig);
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCONFIG_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcContentWrapper.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcContentWrapper.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcContentWrapper.cc
new file mode 100644
index 0000000..c9ea2b8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcContentWrapper.cc
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <google/protobuf/io/coded_stream.h>
+
+#include "RpcContentWrapper.h"
+
+using namespace ::google::protobuf;
+using namespace ::google::protobuf::io;
+
+namespace hdfs {
+namespace internal {
+
+RpcContentWrapper::RpcContentWrapper(Message * header, Message * msg) :
+    header(header), msg(msg) {
+}
+
+int RpcContentWrapper::getLength() {
+    int headerLen, msgLen = 0;
+    headerLen = header->ByteSize();
+    msgLen = msg == NULL ? 0 : msg->ByteSize();
+    return headerLen + CodedOutputStream::VarintSize32(headerLen)
+           + (msg == NULL ?
+              0 : msgLen + CodedOutputStream::VarintSize32(msgLen));
+}
+
+void RpcContentWrapper::writeTo(WriteBuffer & buffer) {
+    int size = header->ByteSize();
+    buffer.writeVarint32(size);
+    header->SerializeToArray(buffer.alloc(size), size);
+
+    if (msg != NULL) {
+        size = msg->ByteSize();
+        buffer.writeVarint32(size);
+        msg->SerializeToArray(buffer.alloc(size), size);
+    }
+}
+
+}
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcContentWrapper.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcContentWrapper.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcContentWrapper.h
new file mode 100644
index 0000000..bad8736
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcContentWrapper.h
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCONTENTWRAPPER_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCONTENTWRAPPER_H_
+
+#include <google/protobuf/message.h>
+
+#include "WriteBuffer.h"
+
+namespace hdfs {
+namespace internal {
+
+class RpcContentWrapper {
+public:
+    RpcContentWrapper(::google::protobuf::Message *header,
+                      ::google::protobuf::Message *msg);
+
+    int getLength();
+    void writeTo(WriteBuffer &buffer);
+
+public:
+    ::google::protobuf::Message *header;
+    ::google::protobuf::Message *msg;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCONTENTWRAPPER_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcProtocolInfo.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcProtocolInfo.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcProtocolInfo.cc
new file mode 100644
index 0000000..89ca9bb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcProtocolInfo.cc
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RpcProtocolInfo.h"
+
+namespace hdfs {
+namespace internal {
+
+size_t RpcProtocolInfo::hash_value() const {
+    size_t values[] = { Int32Hasher(version), StringHasher(protocol), StringHasher(tokenKind) };
+    return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcProtocolInfo.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcProtocolInfo.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcProtocolInfo.h
new file mode 100644
index 0000000..032a58c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcProtocolInfo.h
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCPROTOCOLINFO_H_
+#define _HDFS_LIBHDFS3_RPC_RPCPROTOCOLINFO_H_
+
+#include "Hash.h"
+
+#include <string>
+
+namespace hdfs {
+namespace internal {
+
+class RpcProtocolInfo {
+public:
+    RpcProtocolInfo(int v, const std::string &p,
+            const std::string &tokenKind) :
+        version(v), protocol(p), tokenKind(tokenKind) {
+    }
+
+    size_t hash_value() const;
+
+    bool operator ==(const RpcProtocolInfo &other) const {
+        return version == other.version && protocol == other.protocol &&
+          tokenKind == other.tokenKind;
+    }
+
+    const std::string &getProtocol() const {
+        return protocol;
+    }
+
+    void setProtocol(const std::string &protocol) {
+        this->protocol = protocol;
+    }
+
+    int getVersion() const {
+        return version;
+    }
+
+    void setVersion(int version) {
+        this->version = version;
+    }
+
+    const std::string getTokenKind() const {
+        return tokenKind;
+    }
+
+    void setTokenKind(const std::string &tokenKind) {
+        this->tokenKind = tokenKind;
+    }
+
+private:
+    int version;
+    std::string protocol;
+    std::string tokenKind;
+};
+
+}
+}
+
+HDFS_HASH_DEFINE(::hdfs::internal::RpcProtocolInfo);
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCPROTOCOLINFO_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcRemoteCall.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcRemoteCall.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcRemoteCall.cc
new file mode 100644
index 0000000..e53e88a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcRemoteCall.cc
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ProtobufRpcEngine.pb.h"
+#include "RpcCall.h"
+#include "RpcContentWrapper.h"
+#include "RpcHeader.pb.h"
+#include "RpcRemoteCall.h"
+#include "SharedPtr.h"
+#include "WriteBuffer.h"
+
+#include <google/protobuf/io/coded_stream.h>
+
+#define PING_CALL_ID -4
+
+using namespace google::protobuf::io;
+using namespace ::hadoop::common;
+
+namespace hdfs {
+namespace internal {
+
+void RpcRemoteCall::serialize(const RpcProtocolInfo & protocol,
+                              WriteBuffer & buffer) {
+    RpcRequestHeaderProto rpcHeader;
+    rpcHeader.set_callid(identity);
+    rpcHeader.set_clientid(clientId);
+    rpcHeader.set_retrycount(-1);
+    rpcHeader.set_rpckind(RPC_PROTOCOL_BUFFER);
+    rpcHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET);
+    RequestHeaderProto requestHeader;
+    requestHeader.set_methodname(call.getName());
+    requestHeader.set_declaringclassprotocolname(protocol.getProtocol());
+    requestHeader.set_clientprotocolversion(protocol.getVersion());
+    RpcContentWrapper wrapper(&requestHeader, call.getRequest());
+    int rpcHeaderLen = rpcHeader.ByteSize();
+    int size = CodedOutputStream::VarintSize32(rpcHeaderLen) + rpcHeaderLen + wrapper.getLength();
+    buffer.writeBigEndian(size);
+    buffer.writeVarint32(rpcHeaderLen);
+    rpcHeader.SerializeToArray(buffer.alloc(rpcHeaderLen), rpcHeaderLen);
+    wrapper.writeTo(buffer);
+}
+
+std::vector<char> RpcRemoteCall::GetPingRequest(const std::string & clientid) {
+    WriteBuffer buffer;
+    std::vector<char> retval;
+    RpcRequestHeaderProto pingHeader;
+    pingHeader.set_callid(PING_CALL_ID);
+    pingHeader.set_clientid(clientid);
+    pingHeader.set_retrycount(INVALID_RETRY_COUNT);
+    pingHeader.set_rpckind(RpcKindProto::RPC_PROTOCOL_BUFFER);
+    pingHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET);
+    int rpcHeaderLen = pingHeader.ByteSize();
+    int size = CodedOutputStream::VarintSize32(rpcHeaderLen) + rpcHeaderLen;
+    buffer.writeBigEndian(size);
+    buffer.writeVarint32(rpcHeaderLen);
+    pingHeader.SerializeWithCachedSizesToArray(reinterpret_cast<unsigned char *>(buffer.alloc(pingHeader.ByteSize())));
+    retval.resize(buffer.getDataSize(0));
+    memcpy(&retval[0], buffer.getBuffer(0), retval.size());
+    return retval;
+}
+
+}
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcRemoteCall.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcRemoteCall.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcRemoteCall.h
new file mode 100644
index 0000000..d5177b5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcRemoteCall.h
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCREMOTECALL_
+#define _HDFS_LIBHDFS3_RPC_RPCREMOTECALL_
+
+#include "DateTime.h"
+#include "ExceptionInternal.h"
+#include "RpcCall.h"
+#include "RpcProtocolInfo.h"
+#include "SharedPtr.h"
+#include "Thread.h"
+#include "WriteBuffer.h"
+
+#include <stdint.h>
+#include <string>
+
+#define INVALID_RETRY_COUNT -1
+
+namespace hdfs {
+namespace internal {
+
+class RpcRemoteCall;
+typedef shared_ptr<RpcRemoteCall> RpcRemoteCallPtr;
+
+class RpcRemoteCall {
+public:
+    RpcRemoteCall(const RpcCall &c, int32_t id, const std::string &clientId) :
+        complete(false), identity(id), call(c), clientId(clientId) {
+    }
+
+    virtual ~RpcRemoteCall() {
+    }
+
+    virtual void cancel(exception_ptr reason) {
+        unique_lock<mutex> lock(mut);
+        complete = true;
+        error = reason;
+        cond.notify_all();
+    }
+
+    virtual void serialize(const RpcProtocolInfo &protocol,
+                           WriteBuffer &buffer);
+
+    const int32_t getIdentity() const {
+        return identity;
+    }
+
+    void wait() {
+        unique_lock<mutex> lock(mut);
+
+        if (!complete) {
+            cond.wait_for(lock, milliseconds(500));
+        }
+    }
+
+    void check() {
+        if (error != exception_ptr()) {
+            rethrow_exception(error);
+        }
+    }
+
+    RpcCall &getCall() {
+        return call;
+    }
+
+    void done() {
+        unique_lock<mutex> lock(mut);
+        complete = true;
+        cond.notify_all();
+    }
+
+    void wakeup() {
+        cond.notify_all();
+    }
+
+    bool finished() {
+        unique_lock<mutex> lock(mut);
+        return complete;
+    }
+
+public:
+    static std::vector<char> GetPingRequest(const std::string &clientid);
+
+private:
+    bool complete;
+    condition_variable cond;
+    const int32_t identity;
+    exception_ptr error;
+    mutex mut;
+    RpcCall call;
+    std::string clientId;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCREMOTECALL_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcServerInfo.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcServerInfo.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcServerInfo.cc
new file mode 100644
index 0000000..5a4c1a1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcServerInfo.cc
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RpcServerInfo.h"
+
+#include <string>
+
+namespace hdfs {
+namespace internal {
+
+size_t RpcServerInfo::hash_value() const {
+    size_t values[] = { StringHasher(host), StringHasher(port), StringHasher(tokenService) };
+    return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcServerInfo.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcServerInfo.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcServerInfo.h
new file mode 100644
index 0000000..fe437b7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcServerInfo.h
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCSERVERINFO_H_
+#define _HDFS_LIBHDFS3_RPC_RPCSERVERINFO_H_
+
+#include "Hash.h"
+
+#include <string>
+#include <sstream>
+
+namespace hdfs {
+namespace internal {
+
+class RpcServerInfo {
+public:
+    RpcServerInfo(const std::string &tokenService, const std::string &h,
+                  const std::string &p) :
+        host(h), port(p), tokenService(tokenService) {
+    }
+
+    RpcServerInfo(const std::string &h, uint32_t p) :
+        host(h) {
+        std::stringstream ss;
+        ss << p;
+        port = ss.str();
+    }
+
+    size_t hash_value() const;
+
+    bool operator ==(const RpcServerInfo &other) const {
+        return this->host == other.host && this->port == other.port &&
+            tokenService == other.tokenService;
+    }
+
+    const std::string &getTokenService() const {
+        return tokenService;
+    }
+
+    const std::string &getHost() const {
+        return host;
+    }
+
+    const std::string &getPort() const {
+        return port;
+    }
+
+    void setTokenService(const std::string &tokenService) {
+        this->tokenService = tokenService;
+    }
+
+private:
+    std::string host;
+    std::string port;
+    std::string tokenService;
+};
+
+}
+}
+
+HDFS_HASH_DEFINE(::hdfs::internal::RpcServerInfo);
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCSERVERINFO_H_ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/SaslClient.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/SaslClient.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/SaslClient.cc
new file mode 100644
index 0000000..bfe6868
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/SaslClient.cc
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <cctype>
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "SaslClient.h"
+
+#define SASL_SUCCESS 0
+
+namespace hdfs {
+namespace internal {
+
+SaslClient::SaslClient(const RpcSaslProto_SaslAuth & auth, const Token & token,
+                       const std::string & principal) :
+    complete(false) {
+    int rc;
+    ctx = NULL;
+    RpcAuth method = RpcAuth(RpcAuth::ParseMethod(auth.method()));
+    rc = gsasl_init(&ctx);
+
+    if (rc != GSASL_OK) {
+        THROW(HdfsIOException, "cannot initialize libgsasl");
+    }
+
+    switch (method.getMethod()) {
+    case AuthMethod::KERBEROS:
+        initKerberos(auth, principal);
+        break;
+
+    case AuthMethod::TOKEN:
+        initDigestMd5(auth, token);
+        break;
+
+    default:
+        THROW(HdfsIOException, "unknown auth method.");
+        break;
+    }
+}
+
+SaslClient::~SaslClient() {
+    if (session != NULL) {
+        gsasl_finish(session);
+    }
+
+    if (ctx != NULL) {
+        gsasl_done(ctx);
+    }
+}
+
+void SaslClient::initKerberos(const RpcSaslProto_SaslAuth & auth,
+                              const std::string & principal) {
+    int rc;
+
+    /* Create new authentication session. */
+    if ((rc = gsasl_client_start(ctx, auth.mechanism().c_str(), &session)) != GSASL_OK) {
+        THROW(HdfsIOException, "Cannot initialize client (%d): %s", rc,
+              gsasl_strerror(rc));
+    }
+
+    gsasl_property_set(session, GSASL_SERVICE, auth.protocol().c_str());
+    gsasl_property_set(session, GSASL_AUTHID, principal.c_str());
+    gsasl_property_set(session, GSASL_HOSTNAME, auth.serverid().c_str());
+}
+
+std::string Base64Encode(const std::string & in) {
+    char * temp;
+    size_t len;
+    std::string retval;
+    int rc = gsasl_base64_to(in.c_str(), in.size(), &temp, &len);
+
+    if (rc != GSASL_OK) {
+        throw std::bad_alloc();
+    }
+
+    if (temp) {
+        retval = temp;
+        free(temp);
+    }
+
+    if (!temp || retval.length() != len) {
+        THROW(HdfsIOException, "SaslClient: Failed to encode string to base64");
+    }
+
+    return retval;
+}
+
+void SaslClient::initDigestMd5(const RpcSaslProto_SaslAuth & auth,
+                               const Token & token) {
+    int rc;
+
+    if ((rc = gsasl_client_start(ctx, auth.mechanism().c_str(), &session)) != GSASL_OK) {
+        THROW(HdfsIOException, "Cannot initialize client (%d): %s", rc, gsasl_strerror(rc));
+    }
+
+    std::string password = Base64Encode(token.getPassword());
+    std::string identifier = Base64Encode(token.getIdentifier());
+    gsasl_property_set(session, GSASL_PASSWORD, password.c_str());
+    gsasl_property_set(session, GSASL_AUTHID, identifier.c_str());
+    gsasl_property_set(session, GSASL_HOSTNAME, auth.serverid().c_str());
+    gsasl_property_set(session, GSASL_SERVICE, auth.protocol().c_str());
+}
+
+std::string SaslClient::evaluateChallenge(const std::string & challenge) {
+    int rc;
+    char * output = NULL;
+    size_t outputSize;
+    std::string retval;
+    rc = gsasl_step(session, &challenge[0], challenge.size(), &output,
+                    &outputSize);
+
+    if (rc == GSASL_NEEDS_MORE || rc == GSASL_OK) {
+        retval.resize(outputSize);
+        memcpy(&retval[0], output, outputSize);
+
+        if (output) {
+            free(output);
+        }
+    } else {
+        if (output) {
+            free(output);
+        }
+
+        THROW(AccessControlException, "Failed to evaluate challenge: %s", gsasl_strerror(rc));
+    }
+
+    if (rc == GSASL_OK) {
+        complete = true;
+    }
+
+    return retval;
+}
+
+bool SaslClient::isComplete() {
+    return complete;
+}
+
+}
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/SaslClient.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/SaslClient.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/SaslClient.h
new file mode 100644
index 0000000..5e95ebd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/SaslClient.h
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_SASLCLIENT_H_
+#define _HDFS_LIBHDFS3_RPC_SASLCLIENT_H_
+
+#include "RpcAuth.h"
+#include "RpcHeader.pb.h"
+#include "client/Token.h"
+#include "network/Socket.h"
+
+#include <gsasl.h>
+#include <string>
+
+#define SWITCH_TO_SIMPLE_AUTH -88
+
+namespace hdfs {
+namespace internal {
+
+using hadoop::common::RpcSaslProto_SaslAuth;
+
+class SaslClient {
+public:
+    SaslClient(const RpcSaslProto_SaslAuth &auth, const Token &token,
+               const std::string &principal);
+
+    ~SaslClient();
+
+    std::string evaluateChallenge(const std::string &challenge);
+
+    bool isComplete();
+
+private:
+    void initKerberos(const RpcSaslProto_SaslAuth &auth,
+                      const std::string &principal);
+    void initDigestMd5(const RpcSaslProto_SaslAuth &auth, const Token &token);
+
+private:
+    Gsasl *ctx;
+    Gsasl_session *session;
+    bool complete;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_RPC_SASLCLIENT_H_ */