You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/10/03 20:28:51 UTC
[3/5] HDFS-7012. Add hdfs native client RPC functionality (Zhanwei
Wang via Colin P. McCabe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannel.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannel.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannel.cc
new file mode 100644
index 0000000..6057a01
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannel.cc
@@ -0,0 +1,876 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "IpcConnectionContext.pb.h"
+#include "Logger.h"
+#include "RpcChannel.h"
+#include "RpcClient.h"
+#include "RpcContentWrapper.h"
+#include "RpcHeader.pb.h"
+#include "RpcHeader.pb.h"
+#include "Thread.h"
+#include "WriteBuffer.h"
+#include "server/RpcHelper.h"
+
+#include <google/protobuf/io/coded_stream.h>
+
+#define RPC_HEADER_MAGIC "hrpc"
+#define RPC_HEADER_VERSION 9
+#define SERIALIZATION_TYPE_PROTOBUF 0
+#define CONNECTION_CONTEXT_CALL_ID -3
+
+using namespace ::google::protobuf;
+using namespace google::protobuf::io;
+using namespace hadoop::common;
+using namespace hadoop::hdfs;
+
+namespace hdfs {
+namespace internal {
+
+RpcChannelImpl::RpcChannelImpl(const RpcChannelKey &k, RpcClient &c) :
+ refs(0), available(false), key(k), client(c) {
+ sock = shared_ptr<Socket>(new TcpSocketImpl);
+ sock->setLingerTimeout(k.getConf().getLingerTimeout());
+ in = shared_ptr<BufferedSocketReader>(
+ new BufferedSocketReaderImpl(
+ *static_cast<TcpSocketImpl *>(sock.get())));
+ lastActivity = lastIdle = steady_clock::now();
+}
+
+RpcChannelImpl::RpcChannelImpl(const RpcChannelKey &k, Socket *s,
+ BufferedSocketReader *in, RpcClient &c) :
+ refs(0), available(false), key(k), client(c) {
+ sock = shared_ptr<Socket>(s);
+ this->in = shared_ptr<BufferedSocketReader>(in);
+ lastActivity = lastIdle = steady_clock::now();
+}
+
+RpcChannelImpl::~RpcChannelImpl() {
+ assert(pendingCalls.empty());
+ assert(refs == 0);
+
+ if (available) {
+ sock->close();
+ }
+}
+
+void RpcChannelImpl::close(bool immediate) {
+ lock_guard<mutex> lock(writeMut);
+ --refs;
+ assert(refs >= 0);
+
+ if (immediate && !refs) {
+ assert(pendingCalls.empty());
+ available = false;
+ sock->close();
+ }
+}
+
+void RpcChannelImpl::sendSaslMessage(RpcSaslProto *msg, Message *resp) {
+ int totalLen;
+ WriteBuffer buffer;
+ RpcRequestHeaderProto rpcHeader;
+ rpcHeader.set_callid(AuthProtocol::SASL);
+ rpcHeader.set_clientid(client.getClientId());
+ rpcHeader.set_retrycount(INVALID_RETRY_COUNT);
+ rpcHeader.set_rpckind(RPC_PROTOCOL_BUFFER);
+ rpcHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET);
+ RpcContentWrapper wrapper(&rpcHeader, msg);
+ totalLen = wrapper.getLength();
+ buffer.writeBigEndian(totalLen);
+ wrapper.writeTo(buffer);
+ sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0),
+ key.getConf().getWriteTimeout());
+ RpcRemoteCallPtr call(
+ new RpcRemoteCall(RpcCall(false, "sasl message", NULL, resp),
+ AuthProtocol::SASL, client.getClientId()));
+ pendingCalls[AuthProtocol::SASL] = call;
+}
+
+const RpcSaslProto_SaslAuth * RpcChannelImpl::createSaslClient(
+ const RepeatedPtrField<RpcSaslProto_SaslAuth> *auths) {
+ const RpcSaslProto_SaslAuth *auth = NULL;
+ Token token;
+
+ for (int i = 0; i < auths->size(); ++i) {
+ auth = &auths->Get(i);
+ RpcAuth method(RpcAuth::ParseMethod(auth->method()));
+
+ if (method.getMethod() == AuthMethod::TOKEN && key.hasToken()) {
+ token = key.getToken();
+ break;
+ } else if (method.getMethod() == AuthMethod::KERBEROS) {
+ break;
+ } else if (method.getMethod() == AuthMethod::SIMPLE) {
+ return auth;
+ } else if (method.getMethod() == AuthMethod::UNKNOWN) {
+ return auth;
+ } else {
+ auth = NULL;
+ }
+ }
+
+ if (!auth) {
+ std::stringstream ss;
+ ss << "Client cannot authenticate via: ";
+
+ for (int i = 0; i < auths->size(); ++i) {
+ auth = &auths->Get(i);
+ ss << auth->mechanism() << ", ";
+ }
+
+ THROW(AccessControlException, "%s", ss.str().c_str());
+ }
+
+ saslClient = shared_ptr<SaslClient>(
+ new SaslClient(*auth, token,
+ key.getAuth().getUser().getPrincipal()));
+ return auth;
+}
+
+std::string RpcChannelImpl::saslEvaluateToken(RpcSaslProto &response,
+ bool serverIsDone) {
+ std::string token;
+
+ if (response.has_token()) {
+ token = saslClient->evaluateChallenge(response.token());
+ } else if (!serverIsDone) {
+ THROW(AccessControlException, "Server challenge contains no token");
+ }
+
+ if (serverIsDone) {
+ if (!saslClient->isComplete()) {
+ THROW(AccessControlException, "Client is out of sync with server");
+ }
+
+ if (!token.empty()) {
+ THROW(AccessControlException, "Client generated spurious "
+ "response");
+ }
+ }
+
+ return token;
+}
+
+RpcAuth RpcChannelImpl::setupSaslConnection() {
+ RpcAuth retval;
+ RpcSaslProto negotiateRequest, response, msg;
+ negotiateRequest.set_state(RpcSaslProto_SaslState_NEGOTIATE);
+ sendSaslMessage(&negotiateRequest, &response);
+ bool done = false;
+
+ do {
+ readOneResponse(false);
+ msg.Clear();
+
+ switch (response.state()) {
+ case RpcSaslProto_SaslState_NEGOTIATE: {
+ const RpcSaslProto_SaslAuth *auth = createSaslClient(
+ &response.auths());
+ retval = RpcAuth(RpcAuth::ParseMethod(auth->method()));
+
+ if (retval.getMethod() == AuthMethod::SIMPLE) {
+ done = true;
+ } else if (retval.getMethod() == AuthMethod::UNKNOWN) {
+ THROW(AccessControlException, "Unknown auth mechanism");
+ } else {
+ std::string respToken;
+ RpcSaslProto_SaslAuth *respAuth = msg.add_auths();
+ respAuth->CopyFrom(*auth);
+ std::string chanllege;
+
+ if (auth->has_challenge()) {
+ chanllege = auth->challenge();
+ respAuth->clear_challenge();
+ }
+
+ respToken = saslClient->evaluateChallenge(chanllege);
+
+ if (!respToken.empty()) {
+ msg.set_token(respToken);
+ }
+
+ msg.set_state(RpcSaslProto_SaslState_INITIATE);
+ }
+
+ break;
+ }
+
+ case RpcSaslProto_SaslState_CHALLENGE: {
+ if (!saslClient) {
+ THROW(AccessControlException, "Server sent unsolicited challenge");
+ }
+
+ std::string token = saslEvaluateToken(response, false);
+ msg.set_token(token);
+ msg.set_state(RpcSaslProto_SaslState_RESPONSE);
+ break;
+ }
+
+ case RpcSaslProto_SaslState_SUCCESS:
+ if (!saslClient) {
+ retval = RpcAuth(AuthMethod::SIMPLE);
+ } else {
+ saslEvaluateToken(response, true);
+ }
+
+ done = true;
+ break;
+
+ default:
+ break;
+ }
+
+ if (!done) {
+ response.Clear();
+ sendSaslMessage(&msg, &response);
+ }
+ } while (!done);
+
+ return retval;
+}
+
+void RpcChannelImpl::connect() {
+ int sleep = 1;
+ exception_ptr lastError;
+ const RpcConfig & conf = key.getConf();
+ const RpcServerInfo & server = key.getServer();
+
+ for (int i = 0; i < conf.getMaxRetryOnConnect(); ++i) {
+ RpcAuth auth = key.getAuth();
+
+ try {
+ while (true) {
+ sock->connect(server.getHost().c_str(),
+ server.getPort().c_str(),
+ conf.getConnectTimeout());
+ sock->setNoDelay(conf.isTcpNoDelay());
+ sendConnectionHeader();
+
+ if (auth.getProtocol() == AuthProtocol::SASL) {
+ auth = setupSaslConnection();
+ if (auth.getProtocol() == AuthProtocol::SASL) {
+ //success
+ break;
+ }
+
+ /*
+ * switch to other auth protocol
+ */
+ sock->close();
+ CheckOperationCanceled();
+ } else {
+ break;
+ }
+ }
+
+ auth.setUser(key.getAuth().getUser());
+ sendConnectionContent(auth);
+ available = true;
+ lastActivity = lastIdle = steady_clock::now();
+ return;
+ } catch (const SaslException & e) {
+ /*
+ * Namenode may treat this connect as replay, retry later
+ */
+ sleep = (rand() % 5) + 1;
+ lastError = current_exception();
+ LOG(LOG_ERROR,
+ "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s",
+ server.getHost().c_str(), server.getPort().c_str(),
+ GetExceptionDetail(e));
+ } catch (const HdfsNetworkException & e) {
+ sleep = 1;
+ lastError = current_exception();
+ LOG(LOG_ERROR,
+ "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s",
+ server.getHost().c_str(), server.getPort().c_str(),
+ GetExceptionDetail(e));
+ } catch (const HdfsTimeoutException & e) {
+ sleep = 1;
+ lastError = current_exception();
+ LOG(LOG_ERROR,
+ "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s",
+ server.getHost().c_str(), server.getPort().c_str(),
+ GetExceptionDetail(e));
+ }
+
+ if (i + 1 < conf.getMaxRetryOnConnect()) {
+ LOG(INFO,
+ "Retrying connect to server: \"%s:%s\". "
+ "Already tried %d time(s)", server.getHost().c_str(),
+ server.getPort().c_str(), i + 1);
+ }
+
+ sock->close();
+ CheckOperationCanceled();
+ sleep_for(seconds(sleep));
+ }
+
+ rethrow_exception(lastError);
+}
+
+exception_ptr RpcChannelImpl::invokeInternal(RpcRemoteCallPtr remote) {
+ const RpcCall & call = remote->getCall();
+ exception_ptr lastError;
+
+ try {
+ if (client.isRunning()) {
+ lock_guard<mutex> lock(writeMut);
+
+ if (!available) {
+ connect();
+ }
+
+ sendRequest(remote);
+ }
+
+ /*
+ * We use one call thread to check response,
+ * other thread will wait on RPC call complete.
+ */
+ while (client.isRunning()) {
+ if (remote->finished()) {
+ /*
+ * Current RPC call has finished.
+ * Wake up another thread to check response.
+ */
+ wakeupOneCaller(remote->getIdentity());
+ break;
+ }
+
+ unique_lock<mutex> lock(readMut, defer_lock_t());
+
+ if (lock.try_lock()) {
+ /*
+ * Current thread will check response.
+ */
+ checkOneResponse();
+ } else {
+ /*
+ * Another thread checks response, just wait.
+ */
+ remote->wait();
+ }
+ }
+ } catch (const HdfsNetworkConnectException & e) {
+ try {
+ NESTED_THROW(HdfsFailoverException,
+ "Failed to invoke RPC call \"%s\" on server \"%s:%s\"",
+ call.getName(), key.getServer().getHost().c_str(),
+ key.getServer().getPort().c_str());
+ } catch (const HdfsFailoverException & e) {
+ lastError = current_exception();
+ }
+ } catch (const HdfsNetworkException & e) {
+ try {
+ NESTED_THROW(HdfsRpcException,
+ "Failed to invoke RPC call \"%s\" on server \"%s:%s\"",
+ call.getName(), key.getServer().getHost().c_str(),
+ key.getServer().getPort().c_str());
+ } catch (const HdfsRpcException & e) {
+ lastError = current_exception();
+ }
+ } catch (const HdfsTimeoutException & e) {
+ try {
+ NESTED_THROW(HdfsFailoverException,
+ "Failed to invoke RPC call \"%s\" on server \"%s:%s\"",
+ call.getName(), key.getServer().getHost().c_str(),
+ key.getServer().getPort().c_str());
+ } catch (const HdfsFailoverException & e) {
+ lastError = current_exception();
+ }
+ } catch (const HdfsRpcException & e) {
+ lastError = current_exception();
+ } catch (const HdfsIOException & e) {
+ try {
+ NESTED_THROW(HdfsRpcException,
+ "Failed to invoke RPC call \"%s\" on server \"%s:%s\"",
+ call.getName(), key.getServer().getHost().c_str(),
+ key.getServer().getPort().c_str());
+ } catch (const HdfsRpcException & e) {
+ lastError = current_exception();
+ }
+ }
+
+ return lastError;
+}
+
+void RpcChannelImpl::invoke(const RpcCall & call) {
+ assert(refs > 0);
+ RpcRemoteCallPtr remote;
+ exception_ptr lastError;
+
+ try {
+ bool retry = false;
+
+ do {
+ int32_t id = client.getCallId();
+ remote = RpcRemoteCallPtr(
+ new RpcRemoteCall(call, id, client.getClientId()));
+ lastError = exception_ptr();
+ lastError = invokeInternal(remote);
+
+ if (lastError) {
+ lock_guard<mutex> lock(writeMut);
+ shutdown(lastError);
+
+ if (!retry && call.isIdempotent()) {
+ retry = true;
+ LOG(LOG_ERROR,
+ "Failed to invoke RPC call \"%s\" on "
+ "server \"%s:%s\": \n%s",
+ call.getName(), key.getServer().getHost().c_str(),
+ key.getServer().getPort().c_str(),
+ GetExceptionDetail(lastError));
+ LOG(INFO,
+ "Retry idempotent RPC call \"%s\" on "
+ "server \"%s:%s\"",
+ call.getName(), key.getServer().getHost().c_str(),
+ key.getServer().getPort().c_str());
+ } else {
+ rethrow_exception(lastError);
+ }
+ } else {
+ break;
+ }
+ } while (retry);
+ } catch (const HdfsRpcServerException & e) {
+ if (!remote->finished()) {
+ /*
+ * a fatal error happened, the caller will unwrap it.
+ */
+ lock_guard<mutex> lock(writeMut);
+ lastError = current_exception();
+ shutdown(lastError);
+ }
+
+ /*
+ * else not a fatal error, check again at the end of this function.
+ */
+ } catch (const HdfsException & e) {
+ lock_guard<mutex> lock(writeMut);
+ lastError = current_exception();
+ shutdown(lastError);
+ }
+
+ /*
+ * if the call is not finished, either failed to setup connection,
+ * or client is closing.
+ */
+ if (!remote->finished() || !client.isRunning()) {
+ lock_guard<mutex> lock(writeMut);
+
+ if (lastError == exception_ptr()) {
+ try {
+ THROW(hdfs::HdfsRpcException,
+ "Failed to invoke RPC call \"%s\", RPC channel "
+ "to \"%s:%s\" is to be closed since RpcClient is "
+ "closing", call.getName(),
+ key.getServer().getHost().c_str(),
+ key.getServer().getPort().c_str());
+ } catch (...) {
+ lastError = current_exception();
+ }
+ }
+
+ /*
+ * wake up all.
+ */
+ shutdown(lastError);
+ rethrow_exception(lastError);
+ }
+
+ remote->check();
+}
+
+void RpcChannelImpl::shutdown(exception_ptr reason) {
+ assert(reason != exception_ptr());
+ available = false;
+ cleanupPendingCalls(reason);
+ sock->close();
+}
+
+void RpcChannelImpl::wakeupOneCaller(int32_t id) {
+ lock_guard<mutex> lock(writeMut);
+ unordered_map<int32_t, RpcRemoteCallPtr>::iterator s, e;
+ e = pendingCalls.end();
+
+ for (s = pendingCalls.begin(); s != e; ++s) {
+ if (s->first != id) {
+ s->second->wakeup();
+ return;
+ }
+ }
+}
+
+void RpcChannelImpl::sendRequest(RpcRemoteCallPtr remote) {
+ WriteBuffer buffer;
+ assert(true == available);
+ remote->serialize(key.getProtocol(), buffer);
+ sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0),
+ key.getConf().getWriteTimeout());
+ uint32_t id = remote->getIdentity();
+ pendingCalls[id] = remote;
+ lastActivity = lastIdle = steady_clock::now();
+}
+
+void RpcChannelImpl::cleanupPendingCalls(exception_ptr reason) {
+ assert(!writeMut.try_lock());
+ unordered_map<int32_t, RpcRemoteCallPtr>::iterator s, e;
+ e = pendingCalls.end();
+
+ for (s = pendingCalls.begin(); s != e; ++s) {
+ s->second->cancel(reason);
+ }
+
+ pendingCalls.clear();
+}
+
+void RpcChannelImpl::checkOneResponse() {
+ int ping = key.getConf().getPingTimeout();
+ int timeout = key.getConf().getRpcTimeout();
+ steady_clock::time_point start = steady_clock::now();
+
+ while (client.isRunning()) {
+ if (getResponse()) {
+ readOneResponse(true);
+ return;
+ } else {
+ if (ping > 0 && ToMilliSeconds(lastActivity,
+ steady_clock::now()) >= ping) {
+ lock_guard<mutex> lock(writeMut);
+ sendPing();
+ }
+ }
+
+ if (timeout > 0 && ToMilliSeconds(start,
+ steady_clock::now()) >= timeout) {
+ try {
+ THROW(hdfs::HdfsTimeoutException,
+ "Timeout when wait for response from RPC channel \"%s:%s\"",
+ key.getServer().getHost().c_str(),
+ key.getServer().getPort().c_str());
+ } catch (...) {
+ NESTED_THROW(hdfs::HdfsRpcException,
+ "Timeout when wait for response from RPC channel \"%s:%s\"",
+ key.getServer().getHost().c_str(),
+ key.getServer().getPort().c_str());
+ }
+ }
+ }
+}
+
+void RpcChannelImpl::sendPing() {
+ static const std::vector<char> pingRequest =
+ RpcRemoteCall::GetPingRequest(client.getClientId());
+
+ if (available) {
+ LOG(INFO,
+ "RPC channel to \"%s:%s\" got no response or idle for %d "
+ "milliseconds, sending ping.",
+ key.getServer().getHost().c_str(),
+ key.getServer().getPort().c_str(), key.getConf().getPingTimeout());
+ sock->writeFully(&pingRequest[0], pingRequest.size(),
+ key.getConf().getWriteTimeout());
+ lastActivity = steady_clock::now();
+ }
+}
+
+bool RpcChannelImpl::checkIdle() {
+ unique_lock<mutex> lock(writeMut, defer_lock_t());
+
+ if (lock.try_lock()) {
+ if (!pendingCalls.empty() || refs > 0) {
+ lastIdle = steady_clock::now();
+ return false;
+ }
+
+ int idle = key.getConf().getMaxIdleTime();
+ int ping = key.getConf().getPingTimeout();
+
+ try {
+ //close the connection if idle timeout
+ if (ToMilliSeconds(lastIdle, steady_clock::now()) >= idle) {
+ sock->close();
+ return true;
+ }
+
+ //send ping
+ if (ping > 0 && ToMilliSeconds(lastActivity,
+ steady_clock::now()) >= ping) {
+ sendPing();
+ }
+ } catch (...) {
+ LOG(LOG_ERROR,
+ "Failed to send ping via idle RPC channel "
+ "to server \"%s:%s\": \n%s",
+ key.getServer().getHost().c_str(),
+ key.getServer().getPort().c_str(),
+ GetExceptionDetail(current_exception()));
+ sock->close();
+ return true;
+ }
+ }
+
+ return false;
+}
+
+void RpcChannelImpl::waitForExit() {
+ assert(!client.isRunning());
+
+ while (refs != 0) {
+ sleep_for(milliseconds(100));
+ }
+
+ assert(pendingCalls.empty());
+}
+
+/**
+ * Write the connection header - this is sent when connection is established
+ * +----------------------------------+
+ * | "hrpc" 4 bytes |
+ * +----------------------------------+
+ * | Version (1 byte) |
+ * +----------------------------------+
+ * | Service Class (1 byte) |
+ * +----------------------------------+
+ * | AuthProtocol (1 byte) |
+ * +----------------------------------+
+ */
+void RpcChannelImpl::sendConnectionHeader() {
+ WriteBuffer buffer;
+ buffer.write(RPC_HEADER_MAGIC, strlen(RPC_HEADER_MAGIC));
+ buffer.write(static_cast<char>(RPC_HEADER_VERSION));
+ buffer.write(static_cast<char>(0)); //for future feature
+ buffer.write(static_cast<char>(key.getAuth().getProtocol()));
+ sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0),
+ key.getConf().getWriteTimeout());
+}
+
+void RpcChannelImpl::buildConnectionContext(
+ IpcConnectionContextProto & connectionContext, const RpcAuth & auth) {
+ connectionContext.set_protocol(key.getProtocol().getProtocol());
+ std::string euser = key.getAuth().getUser().getPrincipal();
+ std::string ruser = key.getAuth().getUser().getRealUser();
+
+ if (auth.getMethod() != AuthMethod::TOKEN) {
+ UserInformationProto * user = connectionContext.mutable_userinfo();
+ user->set_effectiveuser(euser);
+
+ if (auth.getMethod() != AuthMethod::SIMPLE) {
+ if (!ruser.empty() && ruser != euser) {
+ user->set_realuser(ruser);
+ }
+ }
+ }
+}
+
+void RpcChannelImpl::sendConnectionContent(const RpcAuth & auth) {
+ WriteBuffer buffer;
+ IpcConnectionContextProto connectionContext;
+ RpcRequestHeaderProto rpcHeader;
+ buildConnectionContext(connectionContext, auth);
+ rpcHeader.set_callid(CONNECTION_CONTEXT_CALL_ID);
+ rpcHeader.set_clientid(client.getClientId());
+ rpcHeader.set_retrycount(INVALID_RETRY_COUNT);
+ rpcHeader.set_rpckind(RPC_PROTOCOL_BUFFER);
+ rpcHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET);
+ RpcContentWrapper wrapper(&rpcHeader, &connectionContext);
+ int size = wrapper.getLength();
+ buffer.writeBigEndian(size);
+ wrapper.writeTo(buffer);
+ sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0),
+ key.getConf().getWriteTimeout());
+ lastActivity = lastIdle = steady_clock::now();
+}
+
+RpcRemoteCallPtr RpcChannelImpl::getPendingCall(int32_t id) {
+ unordered_map<int32_t, RpcRemoteCallPtr>::iterator it;
+ it = pendingCalls.find(id);
+
+ if (it == pendingCalls.end()) {
+ THROW(HdfsRpcException,
+ "RPC channel to \"%s:%s\" got protocol mismatch: RPC channel "
+ "cannot find pending call: id = %d.",
+ key.getServer().getHost().c_str(),
+ key.getServer().getPort().c_str(), static_cast<int>(id));
+ }
+
+ RpcRemoteCallPtr rc = it->second;
+ pendingCalls.erase(it);
+ return rc;
+}
+
+bool RpcChannelImpl::getResponse() {
+ int idleTimeout = key.getConf().getMaxIdleTime();
+ int pingTimeout = key.getConf().getPingTimeout();
+ int timeout = key.getConf().getRpcTimeout();
+ int interval = pingTimeout < idleTimeout ? pingTimeout : idleTimeout;
+ interval /= 2;
+ interval = interval < timeout ? interval : timeout;
+ steady_clock::time_point s = steady_clock::now();
+
+ while (client.isRunning()) {
+ if (in->poll(500)) {
+ return true;
+ }
+
+ if (ToMilliSeconds(s, steady_clock::now()) >= interval) {
+ return false;
+ }
+ }
+
+ return false;
+}
+
+static exception_ptr HandlerRpcResponseException(exception_ptr e) {
+ exception_ptr retval = e;
+
+ try {
+ rethrow_exception(e);
+ } catch (const HdfsRpcServerException & e) {
+ UnWrapper < NameNodeStandbyException, RpcNoSuchMethodException,
+ UnsupportedOperationException, AccessControlException,
+ SafeModeException, SaslException > unwrapper(e);
+
+ try {
+ unwrapper.unwrap(__FILE__, __LINE__);
+ } catch (const NameNodeStandbyException & e) {
+ retval = current_exception();
+ } catch (const UnsupportedOperationException & e) {
+ retval = current_exception();
+ } catch (const AccessControlException & e) {
+ retval = current_exception();
+ } catch (const SafeModeException & e) {
+ retval = current_exception();
+ } catch (const SaslException & e) {
+ retval = current_exception();
+ } catch (const RpcNoSuchMethodException & e) {
+ retval = current_exception();
+ } catch (const HdfsIOException & e) {
+ }
+ }
+
+ return retval;
+}
+
+void RpcChannelImpl::readOneResponse(bool writeLock) {
+ int readTimeout = key.getConf().getReadTimeout();
+ std::vector<char> buffer(128);
+ RpcResponseHeaderProto curRespHeader;
+ RpcResponseHeaderProto::RpcStatusProto status;
+ uint32_t totalen, headerSize = 0, bodySize = 0;
+ totalen = in->readBigEndianInt32(readTimeout);
+ /*
+ * read response header
+ */
+ headerSize = in->readVarint32(readTimeout);
+ buffer.resize(headerSize);
+ in->readFully(&buffer[0], headerSize, readTimeout);
+
+ if (!curRespHeader.ParseFromArray(&buffer[0], headerSize)) {
+ THROW(HdfsRpcException,
+ "RPC channel to \"%s:%s\" got protocol mismatch: RPC channel "
+ "cannot parse response header.",
+ key.getServer().getHost().c_str(),
+ key.getServer().getPort().c_str())
+ }
+
+ lastActivity = steady_clock::now();
+ status = curRespHeader.status();
+
+ if (RpcResponseHeaderProto_RpcStatusProto_SUCCESS == status) {
+ /*
+ * on success, read response body
+ */
+ RpcRemoteCallPtr rc;
+
+ if (writeLock) {
+ lock_guard<mutex> lock(writeMut);
+ rc = getPendingCall(curRespHeader.callid());
+ } else {
+ rc = getPendingCall(curRespHeader.callid());
+ }
+
+ bodySize = in->readVarint32(readTimeout);
+ buffer.resize(bodySize);
+
+ if (bodySize > 0) {
+ in->readFully(&buffer[0], bodySize, readTimeout);
+ }
+
+ Message *response = rc->getCall().getResponse();
+
+ if (!response->ParseFromArray(&buffer[0], bodySize)) {
+ THROW(HdfsRpcException,
+ "RPC channel to \"%s:%s\" got protocol mismatch: rpc "
+ "channel cannot parse response.",
+ key.getServer().getHost().c_str(),
+ key.getServer().getPort().c_str())
+ }
+
+ rc->done();
+ } else {
+ /*
+ * on error, read error class and message
+ */
+ std::string errClass, errMessage;
+ errClass = curRespHeader.exceptionclassname();
+ errMessage = curRespHeader.errormsg();
+
+ if (RpcResponseHeaderProto_RpcStatusProto_ERROR == status) {
+ RpcRemoteCallPtr rc;
+ {
+ lock_guard<mutex> lock(writeMut);
+ rc = getPendingCall(curRespHeader.callid());
+ }
+
+ try {
+ THROW(HdfsRpcServerException, "%s: %s",
+ errClass.c_str(), errMessage.c_str());
+ } catch (HdfsRpcServerException & e) {
+ e.setErrClass(errClass);
+ e.setErrMsg(errMessage);
+ rc->cancel(HandlerRpcResponseException(current_exception()));
+ }
+ } else { /*fatal*/
+ assert(RpcResponseHeaderProto_RpcStatusProto_FATAL == status);
+
+ if (errClass.empty()) {
+ THROW(HdfsRpcException, "%s: %s",
+ errClass.c_str(), errMessage.c_str());
+ }
+
+ try {
+ THROW(HdfsRpcServerException, "%s: %s", errClass.c_str(),
+ errMessage.c_str());
+ } catch (HdfsRpcServerException & e) {
+ e.setErrClass(errClass);
+ e.setErrMsg(errMessage);
+ rethrow_exception(
+ HandlerRpcResponseException(current_exception()));
+ }
+ }
+ }
+}
+
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannel.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannel.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannel.h
new file mode 100644
index 0000000..f80019f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannel.h
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCHANNEL_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCHANNEL_H_
+
+#include "Atomic.h"
+#include "DateTime.h"
+#include "ExceptionInternal.h"
+#include "IpcConnectionContext.pb.h"
+#include "RpcCall.h"
+#include "RpcChannelKey.h"
+#include "RpcHeader.pb.h"
+#include "RpcRemoteCall.h"
+#include "SaslClient.h"
+#include "SharedPtr.h"
+#include "Thread.h"
+#include "UnorderedMap.h"
+#include "network/BufferedSocketReader.h"
+#include "network/TcpSocket.h"
+
+#include <google/protobuf/message.h>
+
+namespace hdfs {
+namespace internal {
+
+class RpcClient;
+
+using hadoop::common::RpcSaslProto;
+using hadoop::common::RpcSaslProto_SaslAuth;
+
+class RpcChannel {
+public:
+ /**
+ * Destroy a channel
+ */
+ virtual ~RpcChannel() {
+ }
+
+ /**
+ * The caller finished the rpc call,
+ * this channel may be reused later if immediate is false.
+ * @param immediate Do not reuse the channel any more if immediate is true.
+ */
+ virtual void close(bool immediate) = 0;
+
+ /**
+ * Invoke a rpc call.
+ * @param call The call is to be invoked.
+ * @return The remote call object.
+ */
+ virtual void invoke(const RpcCall &call) = 0;
+
+ /**
+ * Close the channel if it idle expired.
+ * @return true if the channel idle expired.
+ */
+ virtual bool checkIdle() = 0;
+
+ /**
+ * Wait for all reference exiting.
+ * The channel cannot be reused any more.
+ * @pre RpcClient is not running.
+ */
+ virtual void waitForExit() = 0;
+
+ /**
+ * Add reference count to this channel.
+ */
+ virtual void addRef() = 0;
+};
+
+/**
+ * RpcChannel represent a rpc connect to the server.
+ */
+class RpcChannelImpl: public RpcChannel {
+public:
+ /**
+ * Construct a RpcChannelImpl instance.
+ * @param k The key of this channel.
+ */
+ RpcChannelImpl(const RpcChannelKey &k, RpcClient &c);
+
+ /**
+ * Destroy a RpcChannelImpl instance.
+ */
+ ~RpcChannelImpl();
+
+ /**
+ * The caller finished the rpc call,
+ * this channel may be reused later if immediate is false.
+ * @param immediate Do not reuse the channel any more if immediate is true.
+ */
+ void close(bool immediate);
+
+ /**
+ * Invoke a rpc call.
+ * @param call The call is to be invoked.
+ * @return The remote call object.
+ */
+ void invoke(const RpcCall &call);
+
+ /**
+ * Close the channel if it idle expired.
+ * @return true if the channel idle expired.
+ */
+ bool checkIdle();
+
+ /**
+ * Wait for all reference exiting.
+ * The channel cannot be reused any more.
+ * @pre RpcClient is not running.
+ */
+ void waitForExit();
+
+ /**
+ * Add reference count to this channel.
+ */
+ void addRef() {
+ ++refs;
+ }
+
+private:
+ /**
+ * Setup the RPC connection.
+ * @pre Already hold write lock.
+ */
+ void connect();
+
+ /**
+ * Cleanup all pending calls.
+ * @param reason The reason to cancel the call.
+ * @pre Already hold write lock.
+ */
+ void cleanupPendingCalls(exception_ptr reason);
+
+ /**
+ * Send rpc connect protocol header.
+ * @throw HdfsNetworkException
+ * @throw HdfsTimeout
+ */
+ void sendConnectionHeader();
+
+ /**
+ * Send rpc connection protocol content.
+ */
+ void sendConnectionContent(const RpcAuth &auth);
+
+ /**
+ * Build rpc connect context.
+ */
+ void buildConnectionContext(
+ hadoop::common::IpcConnectionContextProto &connectionContext,
+ const RpcAuth &auth);
+
+ /**
+ * Send ping packet to server.
+ * @throw HdfsNetworkException
+ * @throw HdfsTimeout
+ * @pre Caller should hold the write lock.
+ */
+ void sendPing();
+
+ /**
+ * Send the call message to rpc server.
+ * @param remote The remote call.
+ * @pre Already hold write lock.
+ */
+ void sendRequest(RpcRemoteCallPtr remote);
+
+ /**
+ * Issue a rpc call and check response.
+ * Catch all recoverable error in this function
+ *
+ * @param remote The remote call
+ */
+ exception_ptr invokeInternal(RpcRemoteCallPtr remote);
+
+ /**
+ * Check response, block until get one response.
+ * @pre Channel already hold read lock.
+ */
+ void checkOneResponse();
+
+ /**
+ * read and handle one response.
+ * @pre Channel already hold read lock.
+ */
+ void readOneResponse(bool writeLock);
+
+ /**
+ * Get the call object with given id, and then remove it from pending call list.
+ * @param id The id of the call object to be returned.
+ * @return The call object with given id.
+ * @throw HdfsIOException
+ * @pre Channel already locked.
+ */
+ RpcRemoteCallPtr getPendingCall(int32_t id);
+
+ /**
+ * Check if there is data available for reading on socket.
+ * @return true if response is available.
+ */
+ bool getResponse();
+
+ /**
+ * wake up one caller to check response.
+ * @param id The call id which current caller handled.
+ */
+ void wakeupOneCaller(int32_t id);
+
+ /**
+ * shutdown the RPC connection since error.
+ * @param reason The reason to cancel the call
+ * @pre Already hold write lock.
+ */
+ void shutdown(exception_ptr reason);
+
+ const RpcSaslProto_SaslAuth *createSaslClient(
+ const ::google::protobuf::RepeatedPtrField<RpcSaslProto_SaslAuth> *auths);
+
+ void sendSaslMessage(RpcSaslProto *msg, ::google::protobuf::Message *resp);
+
+ std::string saslEvaluateToken(RpcSaslProto &response, bool serverIsDone);
+
+ RpcAuth setupSaslConnection();
+
+private:
+ /**
+ * Construct a RpcChannelImpl instance for test.
+ * @param key The key of this channel.
+ * @param sock The socket instance.
+ * @param in The BufferedSocketReader instance build on sock.
+ * @param client The RpcClient instance.
+ */
+ RpcChannelImpl(const RpcChannelKey &key, Socket *sock,
+ BufferedSocketReader *in, RpcClient &client);
+
+private:
+ atomic<int> refs;
+ bool available;
+ mutex readMut;
+ mutex writeMut;
+ RpcChannelKey key;
+ RpcClient &client;
+ shared_ptr<BufferedSocketReader> in;
+ shared_ptr<SaslClient> saslClient;
+ shared_ptr<Socket> sock;
+ steady_clock::time_point lastActivity; // ping is a kind of activity, lastActivity will be updated after ping
+ steady_clock::time_point lastIdle; // ping cannot change idle state. If there is still pending calls, lastIdle is always "NOW".
+ unordered_map<int32_t, RpcRemoteCallPtr> pendingCalls;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCHANNEL_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannelKey.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannelKey.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannelKey.cc
new file mode 100644
index 0000000..b3600cb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannelKey.cc
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RpcChannelKey.h"
+
+#include <vector>
+
+namespace hdfs {
+namespace internal {
+
+RpcChannelKey::RpcChannelKey(const RpcAuth &a, const RpcProtocolInfo &p,
+ const RpcServerInfo &s, const RpcConfig &c) :
+ auth(a), conf(c), protocol(p), server(s) {
+ const Token *temp = auth.getUser().selectToken(protocol.getTokenKind(),
+ server.getTokenService());
+
+ if (temp) {
+ token = shared_ptr<Token> (new Token(*temp));
+ }
+}
+
+size_t RpcChannelKey::hash_value() const {
+ size_t tokenHash = token ? token->hash_value() : 0;
+ size_t values[] = { auth.hash_value(), protocol.hash_value(),
+ server.hash_value(), conf.hash_value(), tokenHash
+ };
+ return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannelKey.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannelKey.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannelKey.h
new file mode 100644
index 0000000..72ffcae
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcChannelKey.h
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCHANNELKEY_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCHANNELKEY_H_
+
+#include "Hash.h"
+#include "RpcAuth.h"
+#include "RpcConfig.h"
+#include "RpcProtocolInfo.h"
+#include "RpcServerInfo.h"
+#include "SharedPtr.h"
+#include "client/Token.h"
+
+namespace hdfs {
+namespace internal {
+
+class RpcChannelKey {
+public:
+ RpcChannelKey(const RpcAuth &a, const RpcProtocolInfo &p,
+ const RpcServerInfo &s, const RpcConfig &c);
+
+public:
+ size_t hash_value() const;
+
+ const RpcAuth &getAuth() const {
+ return auth;
+ }
+
+ const RpcConfig &getConf() const {
+ return conf;
+ }
+
+ const RpcProtocolInfo &getProtocol() const {
+ return protocol;
+ }
+
+ const RpcServerInfo &getServer() const {
+ return server;
+ }
+
+ bool operator ==(const RpcChannelKey &other) const {
+ return this->auth == other.auth && this->protocol == other.protocol
+ && this->server == other.server && this->conf == other.conf
+ && ((token == NULL && other.token == NULL)
+ || (token && other.token && *token == *other.token));
+ }
+
+ const Token &getToken() const {
+ assert(token != NULL);
+ return *token;
+ }
+
+ bool hasToken() {
+ return token != NULL;
+ }
+
+private:
+ const RpcAuth auth;
+ const RpcConfig conf;
+ const RpcProtocolInfo protocol;
+ const RpcServerInfo server;
+ hdfs::internal::shared_ptr<Token> token;
+};
+
+}
+}
+
+HDFS_HASH_DEFINE(::hdfs::internal::RpcChannelKey);
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCHANNELKEY_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcClient.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcClient.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcClient.cc
new file mode 100644
index 0000000..b0d1caf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcClient.cc
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "Logger.h"
+#include "SharedPtr.h"
+#include "RpcClient.h"
+#include "Thread.h"
+
+#include <uuid/uuid.h>
+
+namespace hdfs {
+namespace internal {
+
+once_flag RpcClient::once;
+shared_ptr<RpcClient> RpcClient::client;
+
+void RpcClient::createSinglten() {
+ client = shared_ptr < RpcClient > (new RpcClientImpl());
+}
+
+RpcClient & RpcClient::getClient() {
+ call_once(once, &RpcClientImpl::createSinglten);
+ assert(client);
+ return *client;
+}
+
+RpcClientImpl::RpcClientImpl() :
+ cleaning(false), running(true), count(0) {
+ uuid_t id;
+ uuid_generate(id);
+ clientId.resize(sizeof(uuid_t));
+ memcpy(&clientId[0], id, sizeof(uuid_t));
+#ifdef MOCK
+ stub = NULL;
+#endif
+}
+
+RpcClientImpl::~RpcClientImpl() {
+ running = false;
+ cond.notify_all();
+
+ if (cleaner.joinable()) {
+ cleaner.join();
+ }
+
+ close();
+}
+
+void RpcClientImpl::clean() {
+ assert(cleaning);
+
+ try {
+ while (running) {
+ try {
+ unique_lock<mutex> lock(mut);
+ cond.wait_for(lock, seconds(1));
+
+ if (!running || allChannels.empty()) {
+ break;
+ }
+
+ unordered_map<RpcChannelKey, shared_ptr<RpcChannel> >::iterator s, e;
+ e = allChannels.end();
+
+ for (s = allChannels.begin(); s != e;) {
+ if (s->second->checkIdle()) {
+ s->second.reset();
+ s = allChannels.erase(s);
+ } else {
+ ++s;
+ }
+ }
+ } catch (const HdfsCanceled & e) {
+ /*
+ * ignore cancel signal here.
+ */
+ }
+ }
+ } catch (const hdfs::HdfsException & e) {
+ LOG(LOG_ERROR, "RpcClientImpl's idle cleaner exit: %s",
+ GetExceptionDetail(e));
+ } catch (const std::exception & e) {
+ LOG(LOG_ERROR, "RpcClientImpl's idle cleaner exit: %s", e.what());
+ }
+
+ cleaning = false;
+}
+
+void RpcClientImpl::close() {
+ lock_guard<mutex> lock(mut);
+ running = false;
+ unordered_map<RpcChannelKey, shared_ptr<RpcChannel> >::iterator s, e;
+ e = allChannels.end();
+
+ for (s = allChannels.begin(); s != e; ++s) {
+ s->second->waitForExit();
+ }
+
+ allChannels.clear();
+}
+
+bool RpcClientImpl::isRunning() {
+ return running;
+}
+
+RpcChannel & RpcClientImpl::getChannel(const RpcAuth &auth,
+ const RpcProtocolInfo &protocol, const RpcServerInfo &server,
+ const RpcConfig &conf) {
+ shared_ptr<RpcChannel> rc;
+ RpcChannelKey key(auth, protocol, server, conf);
+
+ try {
+ lock_guard<mutex> lock(mut);
+
+ if (!running) {
+ THROW(hdfs::HdfsRpcException,
+ "Cannot Setup RPC channel to \"%s:%s\" since RpcClient is closing",
+ key.getServer().getHost().c_str(), key.getServer().getPort().c_str());
+ }
+
+ unordered_map<RpcChannelKey, shared_ptr<RpcChannel> >::iterator it;
+ it = allChannels.find(key);
+
+ if (it != allChannels.end()) {
+ rc = it->second;
+ } else {
+ rc = createChannelInternal(key);
+ allChannels[key] = rc;
+ }
+
+ rc->addRef();
+
+ if (!cleaning) {
+ cleaning = true;
+
+ if (cleaner.joinable()) {
+ cleaner.join();
+ }
+
+ CREATE_THREAD(cleaner, bind(&RpcClientImpl::clean, this));
+ }
+ } catch (const HdfsRpcException & e) {
+ throw;
+ } catch (...) {
+ NESTED_THROW(HdfsRpcException,
+ "RpcClient failed to create a channel to \"%s:%s\"",
+ server.getHost().c_str(), server.getPort().c_str());
+ }
+
+ return *rc;
+}
+
+shared_ptr<RpcChannel> RpcClientImpl::createChannelInternal(
+ const RpcChannelKey & key) {
+ shared_ptr<RpcChannel> channel;
+#ifdef MOCK
+
+ if (stub) {
+ channel = shared_ptr < RpcChannel > (stub->getChannel(key, *this));
+ } else {
+ channel = shared_ptr < RpcChannel > (new RpcChannelImpl(key, *this));
+ }
+
+#else
+ channel = shared_ptr<RpcChannel>(new RpcChannelImpl(key, *this));
+#endif
+ return channel;
+}
+
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcClient.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcClient.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcClient.h
new file mode 100644
index 0000000..7a7a65a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcClient.h
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCLIENT_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCLIENT_H_
+
+#include "RpcAuth.h"
+#include "RpcCall.h"
+#include "RpcChannel.h"
+#include "RpcChannelKey.h"
+#include "RpcConfig.h"
+#include "RpcProtocolInfo.h"
+#include "RpcServerInfo.h"
+#include "SharedPtr.h"
+#include "Thread.h"
+#include "UnorderedMap.h"
+
+#include <stdint.h>
+#include <string>
+#include <vector>
+
+#ifdef MOCK
+#include "TestRpcChannelStub.h"
+#endif
+
+namespace hdfs {
+namespace internal {
+
+class RpcClient {
+public:
+ /**
+ * Destroy an RpcClient instance.
+ */
+ virtual ~RpcClient() {
+ }
+
+ /**
+ * Get a RPC channel, create a new one if necessary.
+ * @param auth Authentication information used to setup RPC connection.
+ * @param protocol The RPC protocol used in this call.
+ * @param server Remote server information.
+ * @param conf RPC connection configuration.
+ * @param once If true, the RPC channel will not be reused.
+ */
+ virtual RpcChannel &getChannel(const RpcAuth &auth,
+ const RpcProtocolInfo &protocol,
+ const RpcServerInfo &server, const RpcConfig &conf) = 0;
+
+ /**
+ * Check the RpcClient is still running.
+ * @return true if the RpcClient is still running.
+ */
+ virtual bool isRunning() = 0;
+
+ virtual std::string getClientId() const = 0;
+
+ virtual int32_t getCallId() = 0;
+
+public:
+ static RpcClient &getClient();
+ static void createSinglten();
+
+private:
+ static once_flag once;
+ static shared_ptr<RpcClient> client;
+};
+
+class RpcClientImpl: public RpcClient {
+public:
+ /**
+ * Construct a RpcClient.
+ */
+ RpcClientImpl();
+
+ /**
+ * Destroy an RpcClient instance.
+ */
+ ~RpcClientImpl();
+
+ /**
+ * Get a RPC channel, create a new one if necessary.
+ * @param auth Authentication information used to setup RPC connection.
+ * @param protocol The RPC protocol used in this call.
+ * @param server Remote server information.
+ * @param conf RPC connection configuration.
+ * @param once If true, the RPC channel will not be reused.
+ */
+ RpcChannel &getChannel(const RpcAuth &auth,
+ const RpcProtocolInfo &protocol, const RpcServerInfo &server,
+ const RpcConfig &conf);
+
+ /**
+ * Close the RPC channel.
+ */
+ void close();
+
+ /**
+ * Check the RpcClient is still running.
+ * @return true if the RpcClient is still running.
+ */
+ bool isRunning();
+
+ std::string getClientId() const {
+ return clientId;
+ }
+
+ int32_t getCallId() {
+ static mutex mutid;
+ lock_guard<mutex> lock(mutid);
+ ++count;
+ count = count < std::numeric_limits<int32_t>::max() ? count : 0;
+ return count;
+ }
+
+private:
+ shared_ptr<RpcChannel> createChannelInternal(
+ const RpcChannelKey &key);
+
+ void clean();
+
+private:
+ atomic<bool> cleaning;
+ atomic<bool> running;
+ condition_variable cond;
+ int64_t count;
+ mutex mut;
+ std::string clientId;
+ thread cleaner;
+ unordered_map<RpcChannelKey, shared_ptr<RpcChannel> > allChannels;
+
+#ifdef MOCK
+private:
+ hdfs::mock::TestRpcChannelStub *stub;
+#endif
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCLIENT_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcConfig.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcConfig.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcConfig.cc
new file mode 100644
index 0000000..84b5648
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcConfig.cc
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RpcConfig.h"
+
+#include <vector>
+
+namespace hdfs {
+namespace internal {
+
+size_t RpcConfig::hash_value() const {
+ size_t values[] = { Int32Hasher(maxIdleTime), Int32Hasher(pingTimeout),
+ Int32Hasher(connectTimeout), Int32Hasher(readTimeout), Int32Hasher(
+ writeTimeout), Int32Hasher(maxRetryOnConnect), Int32Hasher(
+ lingerTimeout), Int32Hasher(rpcTimeout), BoolHasher(tcpNoDelay)
+ };
+ return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcConfig.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcConfig.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcConfig.h
new file mode 100644
index 0000000..4b32611
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcConfig.h
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCONFIG_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCONFIG_H_
+
+#include "Hash.h"
+#include "SessionConfig.h"
+
+namespace hdfs {
+namespace internal {
+
+class RpcConfig {
+public:
+
+ RpcConfig(const SessionConfig &conf) {
+ connectTimeout = conf.getRpcConnectTimeout();
+ maxIdleTime = conf.getRpcMaxIdleTime();
+ maxRetryOnConnect = conf.getRpcMaxRetryOnConnect();
+ pingTimeout = conf.getRpcPingTimeout();
+ readTimeout = conf.getRpcReadTimeout();
+ writeTimeout = conf.getRpcWriteTimeout();
+ tcpNoDelay = conf.isRpcTcpNoDelay();
+ lingerTimeout = conf.getRpcSocketLingerTimeout();
+ rpcTimeout = conf.getRpcTimeout();
+ }
+
+ size_t hash_value() const;
+
+ int getConnectTimeout() const {
+ return connectTimeout;
+ }
+
+ void setConnectTimeout(int connectTimeout) {
+ this->connectTimeout = connectTimeout;
+ }
+
+ int getMaxIdleTime() const {
+ return maxIdleTime;
+ }
+
+ void setMaxIdleTime(int maxIdleTime) {
+ this->maxIdleTime = maxIdleTime;
+ }
+
+ int getMaxRetryOnConnect() const {
+ return maxRetryOnConnect;
+ }
+
+ void setMaxRetryOnConnect(int maxRetryOnConnect) {
+ this->maxRetryOnConnect = maxRetryOnConnect;
+ }
+
+ int getReadTimeout() const {
+ return readTimeout;
+ }
+
+ void setReadTimeout(int readTimeout) {
+ this->readTimeout = readTimeout;
+ }
+
+ bool isTcpNoDelay() const {
+ return tcpNoDelay;
+ }
+
+ void setTcpNoDelay(bool tcpNoDelay) {
+ this->tcpNoDelay = tcpNoDelay;
+ }
+
+ int getWriteTimeout() const {
+ return writeTimeout;
+ }
+
+ void setWriteTimeout(int writeTimeout) {
+ this->writeTimeout = writeTimeout;
+ }
+
+ int getPingTimeout() const {
+ return pingTimeout;
+ }
+
+ void setPingTimeout(int maxPingTimeout) {
+ this->pingTimeout = maxPingTimeout;
+ }
+
+ int getLingerTimeout() const {
+ return lingerTimeout;
+ }
+
+ void setLingerTimeout(int lingerTimeout) {
+ this->lingerTimeout = lingerTimeout;
+ }
+
+ int getRpcTimeout() const {
+ return rpcTimeout;
+ }
+
+ void setRpcTimeout(int rpcTimeout) {
+ this->rpcTimeout = rpcTimeout;
+ }
+
+ bool operator ==(const RpcConfig &other) const {
+ return this->maxIdleTime == other.maxIdleTime
+ && this->pingTimeout == other.pingTimeout
+ && this->connectTimeout == other.connectTimeout
+ && this->readTimeout == other.readTimeout
+ && this->writeTimeout == other.writeTimeout
+ && this->maxRetryOnConnect == other.maxRetryOnConnect
+ && this->tcpNoDelay == other.tcpNoDelay
+ && this->lingerTimeout == other.lingerTimeout
+ && this->rpcTimeout == other.rpcTimeout;
+ }
+
+private:
+ int maxIdleTime;
+ int pingTimeout;
+ int connectTimeout;
+ int readTimeout;
+ int writeTimeout;
+ int maxRetryOnConnect;
+ int lingerTimeout;
+ int rpcTimeout;
+ bool tcpNoDelay;
+};
+
+}
+}
+
+HDFS_HASH_DEFINE(::hdfs::internal::RpcConfig);
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCONFIG_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcContentWrapper.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcContentWrapper.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcContentWrapper.cc
new file mode 100644
index 0000000..c9ea2b8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcContentWrapper.cc
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <google/protobuf/io/coded_stream.h>
+
+#include "RpcContentWrapper.h"
+
+using namespace ::google::protobuf;
+using namespace ::google::protobuf::io;
+
+namespace hdfs {
+namespace internal {
+
+RpcContentWrapper::RpcContentWrapper(Message * header, Message * msg) :
+ header(header), msg(msg) {
+}
+
+int RpcContentWrapper::getLength() {
+ int headerLen, msgLen = 0;
+ headerLen = header->ByteSize();
+ msgLen = msg == NULL ? 0 : msg->ByteSize();
+ return headerLen + CodedOutputStream::VarintSize32(headerLen)
+ + (msg == NULL ?
+ 0 : msgLen + CodedOutputStream::VarintSize32(msgLen));
+}
+
+void RpcContentWrapper::writeTo(WriteBuffer & buffer) {
+ int size = header->ByteSize();
+ buffer.writeVarint32(size);
+ header->SerializeToArray(buffer.alloc(size), size);
+
+ if (msg != NULL) {
+ size = msg->ByteSize();
+ buffer.writeVarint32(size);
+ msg->SerializeToArray(buffer.alloc(size), size);
+ }
+}
+
+}
+}
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcContentWrapper.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcContentWrapper.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcContentWrapper.h
new file mode 100644
index 0000000..bad8736
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcContentWrapper.h
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCONTENTWRAPPER_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCONTENTWRAPPER_H_
+
+#include <google/protobuf/message.h>
+
+#include "WriteBuffer.h"
+
+namespace hdfs {
+namespace internal {
+
+class RpcContentWrapper {
+public:
+ RpcContentWrapper(::google::protobuf::Message *header,
+ ::google::protobuf::Message *msg);
+
+ int getLength();
+ void writeTo(WriteBuffer &buffer);
+
+public:
+ ::google::protobuf::Message *header;
+ ::google::protobuf::Message *msg;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCONTENTWRAPPER_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcProtocolInfo.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcProtocolInfo.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcProtocolInfo.cc
new file mode 100644
index 0000000..89ca9bb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcProtocolInfo.cc
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RpcProtocolInfo.h"
+
+namespace hdfs {
+namespace internal {
+
+size_t RpcProtocolInfo::hash_value() const {
+ size_t values[] = { Int32Hasher(version), StringHasher(protocol), StringHasher(tokenKind) };
+ return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcProtocolInfo.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcProtocolInfo.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcProtocolInfo.h
new file mode 100644
index 0000000..032a58c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcProtocolInfo.h
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCPROTOCOLINFO_H_
+#define _HDFS_LIBHDFS3_RPC_RPCPROTOCOLINFO_H_
+
+#include "Hash.h"
+
+#include <string>
+
+namespace hdfs {
+namespace internal {
+
+class RpcProtocolInfo {
+public:
+ RpcProtocolInfo(int v, const std::string &p,
+ const std::string &tokenKind) :
+ version(v), protocol(p), tokenKind(tokenKind) {
+ }
+
+ size_t hash_value() const;
+
+ bool operator ==(const RpcProtocolInfo &other) const {
+ return version == other.version && protocol == other.protocol &&
+ tokenKind == other.tokenKind;
+ }
+
+ const std::string &getProtocol() const {
+ return protocol;
+ }
+
+ void setProtocol(const std::string &protocol) {
+ this->protocol = protocol;
+ }
+
+ int getVersion() const {
+ return version;
+ }
+
+ void setVersion(int version) {
+ this->version = version;
+ }
+
+ const std::string getTokenKind() const {
+ return tokenKind;
+ }
+
+ void setTokenKind(const std::string &tokenKind) {
+ this->tokenKind = tokenKind;
+ }
+
+private:
+ int version;
+ std::string protocol;
+ std::string tokenKind;
+};
+
+}
+}
+
+HDFS_HASH_DEFINE(::hdfs::internal::RpcProtocolInfo);
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCPROTOCOLINFO_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcRemoteCall.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcRemoteCall.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcRemoteCall.cc
new file mode 100644
index 0000000..e53e88a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcRemoteCall.cc
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ProtobufRpcEngine.pb.h"
+#include "RpcCall.h"
+#include "RpcContentWrapper.h"
+#include "RpcHeader.pb.h"
+#include "RpcRemoteCall.h"
+#include "SharedPtr.h"
+#include "WriteBuffer.h"
+
+#include <google/protobuf/io/coded_stream.h>
+
+#define PING_CALL_ID -4
+
+using namespace google::protobuf::io;
+using namespace ::hadoop::common;
+
+namespace hdfs {
+namespace internal {
+
+void RpcRemoteCall::serialize(const RpcProtocolInfo & protocol,
+ WriteBuffer & buffer) {
+ RpcRequestHeaderProto rpcHeader;
+ rpcHeader.set_callid(identity);
+ rpcHeader.set_clientid(clientId);
+ rpcHeader.set_retrycount(-1);
+ rpcHeader.set_rpckind(RPC_PROTOCOL_BUFFER);
+ rpcHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET);
+ RequestHeaderProto requestHeader;
+ requestHeader.set_methodname(call.getName());
+ requestHeader.set_declaringclassprotocolname(protocol.getProtocol());
+ requestHeader.set_clientprotocolversion(protocol.getVersion());
+ RpcContentWrapper wrapper(&requestHeader, call.getRequest());
+ int rpcHeaderLen = rpcHeader.ByteSize();
+ int size = CodedOutputStream::VarintSize32(rpcHeaderLen) + rpcHeaderLen + wrapper.getLength();
+ buffer.writeBigEndian(size);
+ buffer.writeVarint32(rpcHeaderLen);
+ rpcHeader.SerializeToArray(buffer.alloc(rpcHeaderLen), rpcHeaderLen);
+ wrapper.writeTo(buffer);
+}
+
+std::vector<char> RpcRemoteCall::GetPingRequest(const std::string & clientid) {
+ WriteBuffer buffer;
+ std::vector<char> retval;
+ RpcRequestHeaderProto pingHeader;
+ pingHeader.set_callid(PING_CALL_ID);
+ pingHeader.set_clientid(clientid);
+ pingHeader.set_retrycount(INVALID_RETRY_COUNT);
+ pingHeader.set_rpckind(RpcKindProto::RPC_PROTOCOL_BUFFER);
+ pingHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET);
+ int rpcHeaderLen = pingHeader.ByteSize();
+ int size = CodedOutputStream::VarintSize32(rpcHeaderLen) + rpcHeaderLen;
+ buffer.writeBigEndian(size);
+ buffer.writeVarint32(rpcHeaderLen);
+ pingHeader.SerializeWithCachedSizesToArray(reinterpret_cast<unsigned char *>(buffer.alloc(pingHeader.ByteSize())));
+ retval.resize(buffer.getDataSize(0));
+ memcpy(&retval[0], buffer.getBuffer(0), retval.size());
+ return retval;
+}
+
+}
+}
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcRemoteCall.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcRemoteCall.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcRemoteCall.h
new file mode 100644
index 0000000..d5177b5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcRemoteCall.h
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCREMOTECALL_
+#define _HDFS_LIBHDFS3_RPC_RPCREMOTECALL_
+
+#include "DateTime.h"
+#include "ExceptionInternal.h"
+#include "RpcCall.h"
+#include "RpcProtocolInfo.h"
+#include "SharedPtr.h"
+#include "Thread.h"
+#include "WriteBuffer.h"
+
+#include <stdint.h>
+#include <string>
+
+#define INVALID_RETRY_COUNT -1
+
+namespace hdfs {
+namespace internal {
+
+class RpcRemoteCall;
+typedef shared_ptr<RpcRemoteCall> RpcRemoteCallPtr;
+
+class RpcRemoteCall {
+public:
+ RpcRemoteCall(const RpcCall &c, int32_t id, const std::string &clientId) :
+ complete(false), identity(id), call(c), clientId(clientId) {
+ }
+
+ virtual ~RpcRemoteCall() {
+ }
+
+ virtual void cancel(exception_ptr reason) {
+ unique_lock<mutex> lock(mut);
+ complete = true;
+ error = reason;
+ cond.notify_all();
+ }
+
+ virtual void serialize(const RpcProtocolInfo &protocol,
+ WriteBuffer &buffer);
+
+ const int32_t getIdentity() const {
+ return identity;
+ }
+
+ void wait() {
+ unique_lock<mutex> lock(mut);
+
+ if (!complete) {
+ cond.wait_for(lock, milliseconds(500));
+ }
+ }
+
+ void check() {
+ if (error != exception_ptr()) {
+ rethrow_exception(error);
+ }
+ }
+
+ RpcCall &getCall() {
+ return call;
+ }
+
+ void done() {
+ unique_lock<mutex> lock(mut);
+ complete = true;
+ cond.notify_all();
+ }
+
+ void wakeup() {
+ cond.notify_all();
+ }
+
+ bool finished() {
+ unique_lock<mutex> lock(mut);
+ return complete;
+ }
+
+public:
+ static std::vector<char> GetPingRequest(const std::string &clientid);
+
+private:
+ bool complete;
+ condition_variable cond;
+ const int32_t identity;
+ exception_ptr error;
+ mutex mut;
+ RpcCall call;
+ std::string clientId;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCREMOTECALL_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcServerInfo.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcServerInfo.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcServerInfo.cc
new file mode 100644
index 0000000..5a4c1a1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcServerInfo.cc
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RpcServerInfo.h"
+
+#include <string>
+
+namespace hdfs {
+namespace internal {
+
+size_t RpcServerInfo::hash_value() const {
+ size_t values[] = { StringHasher(host), StringHasher(port), StringHasher(tokenService) };
+ return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcServerInfo.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcServerInfo.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcServerInfo.h
new file mode 100644
index 0000000..fe437b7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcServerInfo.h
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCSERVERINFO_H_
+#define _HDFS_LIBHDFS3_RPC_RPCSERVERINFO_H_
+
+#include "Hash.h"
+
+#include <string>
+#include <sstream>
+
+namespace hdfs {
+namespace internal {
+
+class RpcServerInfo {
+public:
+ RpcServerInfo(const std::string &tokenService, const std::string &h,
+ const std::string &p) :
+ host(h), port(p), tokenService(tokenService) {
+ }
+
+ RpcServerInfo(const std::string &h, uint32_t p) :
+ host(h) {
+ std::stringstream ss;
+ ss << p;
+ port = ss.str();
+ }
+
+ size_t hash_value() const;
+
+ bool operator ==(const RpcServerInfo &other) const {
+ return this->host == other.host && this->port == other.port &&
+ tokenService == other.tokenService;
+ }
+
+ const std::string &getTokenService() const {
+ return tokenService;
+ }
+
+ const std::string &getHost() const {
+ return host;
+ }
+
+ const std::string &getPort() const {
+ return port;
+ }
+
+ void setTokenService(const std::string &tokenService) {
+ this->tokenService = tokenService;
+ }
+
+private:
+ std::string host;
+ std::string port;
+ std::string tokenService;
+};
+
+}
+}
+
+HDFS_HASH_DEFINE(::hdfs::internal::RpcServerInfo);
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCSERVERINFO_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/SaslClient.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/SaslClient.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/SaslClient.cc
new file mode 100644
index 0000000..bfe6868
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/SaslClient.cc
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <cctype>
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "SaslClient.h"
+
+#define SASL_SUCCESS 0
+
+namespace hdfs {
+namespace internal {
+
+SaslClient::SaslClient(const RpcSaslProto_SaslAuth & auth, const Token & token,
+ const std::string & principal) :
+ complete(false) {
+ int rc;
+ ctx = NULL;
+ RpcAuth method = RpcAuth(RpcAuth::ParseMethod(auth.method()));
+ rc = gsasl_init(&ctx);
+
+ if (rc != GSASL_OK) {
+ THROW(HdfsIOException, "cannot initialize libgsasl");
+ }
+
+ switch (method.getMethod()) {
+ case AuthMethod::KERBEROS:
+ initKerberos(auth, principal);
+ break;
+
+ case AuthMethod::TOKEN:
+ initDigestMd5(auth, token);
+ break;
+
+ default:
+ THROW(HdfsIOException, "unknown auth method.");
+ break;
+ }
+}
+
+SaslClient::~SaslClient() {
+ if (session != NULL) {
+ gsasl_finish(session);
+ }
+
+ if (ctx != NULL) {
+ gsasl_done(ctx);
+ }
+}
+
+void SaslClient::initKerberos(const RpcSaslProto_SaslAuth & auth,
+ const std::string & principal) {
+ int rc;
+
+ /* Create new authentication session. */
+ if ((rc = gsasl_client_start(ctx, auth.mechanism().c_str(), &session)) != GSASL_OK) {
+ THROW(HdfsIOException, "Cannot initialize client (%d): %s", rc,
+ gsasl_strerror(rc));
+ }
+
+ gsasl_property_set(session, GSASL_SERVICE, auth.protocol().c_str());
+ gsasl_property_set(session, GSASL_AUTHID, principal.c_str());
+ gsasl_property_set(session, GSASL_HOSTNAME, auth.serverid().c_str());
+}
+
+std::string Base64Encode(const std::string & in) {
+ char * temp;
+ size_t len;
+ std::string retval;
+ int rc = gsasl_base64_to(in.c_str(), in.size(), &temp, &len);
+
+ if (rc != GSASL_OK) {
+ throw std::bad_alloc();
+ }
+
+ if (temp) {
+ retval = temp;
+ free(temp);
+ }
+
+ if (!temp || retval.length() != len) {
+ THROW(HdfsIOException, "SaslClient: Failed to encode string to base64");
+ }
+
+ return retval;
+}
+
+void SaslClient::initDigestMd5(const RpcSaslProto_SaslAuth & auth,
+ const Token & token) {
+ int rc;
+
+ if ((rc = gsasl_client_start(ctx, auth.mechanism().c_str(), &session)) != GSASL_OK) {
+ THROW(HdfsIOException, "Cannot initialize client (%d): %s", rc, gsasl_strerror(rc));
+ }
+
+ std::string password = Base64Encode(token.getPassword());
+ std::string identifier = Base64Encode(token.getIdentifier());
+ gsasl_property_set(session, GSASL_PASSWORD, password.c_str());
+ gsasl_property_set(session, GSASL_AUTHID, identifier.c_str());
+ gsasl_property_set(session, GSASL_HOSTNAME, auth.serverid().c_str());
+ gsasl_property_set(session, GSASL_SERVICE, auth.protocol().c_str());
+}
+
+std::string SaslClient::evaluateChallenge(const std::string & challenge) {
+ int rc;
+ char * output = NULL;
+ size_t outputSize;
+ std::string retval;
+ rc = gsasl_step(session, &challenge[0], challenge.size(), &output,
+ &outputSize);
+
+ if (rc == GSASL_NEEDS_MORE || rc == GSASL_OK) {
+ retval.resize(outputSize);
+ memcpy(&retval[0], output, outputSize);
+
+ if (output) {
+ free(output);
+ }
+ } else {
+ if (output) {
+ free(output);
+ }
+
+ THROW(AccessControlException, "Failed to evaluate challenge: %s", gsasl_strerror(rc));
+ }
+
+ if (rc == GSASL_OK) {
+ complete = true;
+ }
+
+ return retval;
+}
+
+bool SaslClient::isComplete() {
+ return complete;
+}
+
+}
+}
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/SaslClient.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/SaslClient.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/SaslClient.h
new file mode 100644
index 0000000..5e95ebd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/SaslClient.h
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_SASLCLIENT_H_
+#define _HDFS_LIBHDFS3_RPC_SASLCLIENT_H_
+
+#include "RpcAuth.h"
+#include "RpcHeader.pb.h"
+#include "client/Token.h"
+#include "network/Socket.h"
+
+#include <gsasl.h>
+#include <string>
+
+#define SWITCH_TO_SIMPLE_AUTH -88
+
+namespace hdfs {
+namespace internal {
+
+using hadoop::common::RpcSaslProto_SaslAuth;
+
+class SaslClient {
+public:
+ SaslClient(const RpcSaslProto_SaslAuth &auth, const Token &token,
+ const std::string &principal);
+
+ ~SaslClient();
+
+ std::string evaluateChallenge(const std::string &challenge);
+
+ bool isComplete();
+
+private:
+ void initKerberos(const RpcSaslProto_SaslAuth &auth,
+ const std::string &principal);
+ void initDigestMd5(const RpcSaslProto_SaslAuth &auth, const Token &token);
+
+private:
+ Gsasl *ctx;
+ Gsasl_session *session;
+ bool complete;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_RPC_SASLCLIENT_H_ */