You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/10/03 20:28:52 UTC
[4/5] HDFS-7012. Add hdfs native client RPC functionality (Zhanwei
Wang via Colin P. McCabe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Permission.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Permission.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Permission.h
new file mode 100644
index 0000000..fafb7fe
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Permission.h
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_PERMISSION_H_
+#define _HDFS_LIBHDFS3_CLIENT_PERMISSION_H_
+
+#include <string>
+
+namespace hdfs {
+
+/**
+ * Action is used to describe a action the user is permitted to apply on a file.
+ */
+enum Action {
+ NONE, //("---"),
+ EXECUTE, //("--x"),
+ WRITE, //("-w-"),
+ WRITE_EXECUTE, //("-wx"),
+ READ, //("r--"),
+ READ_EXECUTE, //("r-x"),
+ READ_WRITE, //("rw-"),
+ ALL //("rwx");
+};
+
+/**
+ * To test Action a if implies Action b
+ * @param a Action to be tested.
+ * @param b Action target.
+ * @return return true if a implies b.
+ */
+static inline bool implies(const Action &a, const Action &b) {
+ return (a & b) == b;
+}
+
+/**
+ * To construct a new Action using a and b
+ * @param a Action to be used.
+ * @param b Action to be used.
+ * @return return a new Action.
+ */
+static inline Action operator &(const Action &a, const Action &b) {
+ return (Action)(((unsigned int) a) & (unsigned int) b);
+}
+/**
+ * To construct a new Action using a or b
+ * @param a Action to be used.
+ * @param b Action to be used.
+ * @return return a new Action.
+ */
+static inline Action operator |(const Action &a, const Action &b) {
+ return (Action)(((unsigned int) a) | (unsigned int) b);
+}
+/**
+ * To construct a new Action of complementary of a given Action
+ * @param a Action to be used.
+ * @return return a new Action
+ */
+static inline Action operator ~(const Action &a) {
+ return (Action)(7 - (unsigned int) a);
+}
+
+/**
+ * To convert a Action to a readable string.
+ * @param a the Action to be convert.
+ * @return a readable string
+ */
+static inline std::string toString(const Action &a) {
+ switch (a) {
+ case NONE:
+ return "---";
+
+ case EXECUTE:
+ return "--x";
+
+ case WRITE:
+ return "-w-";
+
+ case WRITE_EXECUTE:
+ return "-wx";
+
+ case READ:
+ return "r--";
+
+ case READ_EXECUTE:
+ return "r-x";
+
+ case READ_WRITE:
+ return "rw-";
+
+ case ALL:
+ return "rwx";
+ }
+}
+
+/**
+ * Permission is used to describe a file permission.
+ */
+class Permission {
+public:
+ /**
+ * To construct a Permission.
+ * @param u owner permission.
+ * @param g group permission.
+ * @param o other user permission.
+ */
+ Permission(const Action &u, const Action &g, const Action &o) :
+ userAction(u), groupAction(g), otherAction(o), stickyBit(false) {
+ }
+
+ /**
+ * To construct a Permission from a uint16.
+ * @param mode permission flag.
+ */
+ Permission(uint16_t mode);
+
+public:
+ /**
+ * To get group permission
+ * @return the group permission
+ */
+ Action getGroupAction() const {
+ return groupAction;
+ }
+
+ /**
+ * To set group permission
+ * @param groupAction the group permission
+ */
+ void setGroupAction(Action groupAction) {
+ this->groupAction = groupAction;
+ }
+
+ /**
+ * To get other user permission
+ * @return other user permission
+ */
+ Action getOtherAction() const {
+ return otherAction;
+ }
+
+ /**
+ * To set other user permission
+ * @param otherAction other user permission
+ */
+ void setOtherAction(Action otherAction) {
+ this->otherAction = otherAction;
+ }
+
+ /**
+ * To get owner permission
+ * @return the owner permission
+ */
+ Action getUserAction() const {
+ return userAction;
+ }
+
+ /**
+ * To set owner permission
+ * @param userAction the owner permission
+ */
+ void setUserAction(Action userAction) {
+ this->userAction = userAction;
+ }
+
+ /**
+ * To convert a Permission to a readable string
+ * @return a readable string
+ */
+ std::string toString() const {
+ return hdfs::toString(userAction) + hdfs::toString(groupAction)
+ + hdfs::toString(otherAction);
+ }
+
+ /**
+ * To convert a Permission to a uint16 flag
+ * @return a uint16 flag
+ */
+ uint16_t toShort() const {
+ return (uint16_t)((((uint16_t) userAction) << 6)
+ + (((uint16_t) groupAction) << 3) + (((uint16_t) otherAction))
+ + ((stickyBit ? 1 << 9 : 0)));
+ }
+
+ bool operator ==(const Permission &other) const {
+ return userAction == other.userAction
+ && groupAction == other.groupAction
+ && otherAction == other.otherAction
+ && stickyBit == other.stickyBit;
+ }
+
+private:
+ Action userAction;
+ Action groupAction;
+ Action otherAction;
+
+ bool stickyBit;
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.cc
new file mode 100644
index 0000000..02a2ddd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.cc
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BigEndian.h"
+#include "DataTransferProtocolSender.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "HWCrc32c.h"
+#include "RemoteBlockReader.h"
+#include "SWCrc32c.h"
+#include "WriteBuffer.h"
+#include "datatransfer.pb.h"
+
+#include <inttypes.h>
+#include <vector>
+
+using hadoop::hdfs::ClientReadStatusProto;
+using hadoop::hdfs::BlockOpResponseProto;
+using hadoop::hdfs::ChecksumProto;
+using hadoop::hdfs::ChecksumTypeProto;
+using hadoop::hdfs::ReadOpChecksumInfoProto;
+using hadoop::hdfs::Status;
+
+namespace hdfs {
+namespace internal {
+
+RemoteBlockReader::RemoteBlockReader(const ExtendedBlock &eb,
+ DatanodeInfo &datanode, int64_t start, int64_t len,
+ const Token &token, const char * clientName, bool verify,
+ SessionConfig &conf) :
+ verify(verify), datanode(datanode), binfo(eb), checksumSize(0),
+ chunkSize(0), position(0), size(0), cursor(
+ start), endOffset(len + start), lastSeqNo(-1) {
+ try {
+ assert(start >= 0);
+ readTimeout = conf.getInputReadTimeout();
+ writeTimeout = conf.getInputWriteTimeout();
+ connTimeout = conf.getInputConnTimeout();
+ sock = shared_ptr<Socket>(new TcpSocketImpl());
+ in = shared_ptr<BufferedSocketReader>(
+ new BufferedSocketReaderImpl(*sock));
+ sock->connect(datanode.getIpAddr().c_str(), datanode.getXferPort(),
+ connTimeout);
+ sender = shared_ptr<DataTransferProtocol>(
+ new DataTransferProtocolSender(*sock, writeTimeout,
+ datanode.formatAddress()));
+ sender->readBlock(eb, token, clientName, start, len);
+ checkResponse();
+ } catch (const HdfsTimeoutException &e) {
+ NESTED_THROW(HdfsIOException,
+ "RemoteBlockReader: Failed to setup remote block reader "
+ "for block %s from node %s",
+ eb.toString().c_str(), datanode.formatAddress().c_str());
+ }
+}
+
+RemoteBlockReader::~RemoteBlockReader() {
+ sock->close();
+}
+
+void RemoteBlockReader::checkResponse() {
+ std::vector<char> respBuffer;
+ int32_t respSize = in->readVarint32(readTimeout);
+
+ if (respSize <= 0 || respSize > 10 * 1024 * 1024) {
+ THROW(HdfsIOException, "RemoteBlockReader get a invalid response "
+ "size: %d, Block: %s, from Datanode: %s",
+ respSize, binfo.toString().c_str(),
+ datanode.formatAddress().c_str());
+ }
+
+ respBuffer.resize(respSize);
+ in->readFully(&respBuffer[0], respSize, readTimeout);
+ BlockOpResponseProto resp;
+
+ if (!resp.ParseFromArray(&respBuffer[0], respBuffer.size())) {
+ THROW(HdfsIOException, "RemoteBlockReader cannot parse "
+ "BlockOpResponseProto from Datanode response, "
+ "Block: %s, from Datanode: %s", binfo.toString().c_str(),
+ datanode.formatAddress().c_str());
+ }
+
+ if (resp.status() != hadoop::hdfs::SUCCESS) {
+ std::string msg;
+
+ if (resp.has_message()) {
+ msg = resp.message();
+ }
+
+ if (resp.status() == hadoop::hdfs::ERROR_ACCESS_TOKEN) {
+ THROW(HdfsInvalidBlockToken, "RemoteBlockReader: block's token "
+ "is invalid. Datanode: %s, Block: %s",
+ datanode.formatAddress().c_str(), binfo.toString().c_str());
+ } else {
+ THROW(HdfsIOException,
+ "RemoteBlockReader: Datanode return an error when sending "
+ "read request to Datanode: %s, Block: %s, %s.",
+ datanode.formatAddress().c_str(), binfo.toString().c_str(),
+ (msg.empty() ? "check Datanode's log for more information" :
+ msg.c_str()));
+ }
+ }
+
+ const ReadOpChecksumInfoProto &checksumInfo = resp.readopchecksuminfo();
+ const ChecksumProto &cs = checksumInfo.checksum();
+ chunkSize = cs.bytesperchecksum();
+
+ if (chunkSize < 0) {
+ THROW(HdfsIOException,
+ "RemoteBlockReader invalid chunk size: %d, expected range[0, %"
+ PRId64 "], Block: %s, from Datanode: %s",
+ chunkSize, binfo.getNumBytes(), binfo.toString().c_str(),
+ datanode.formatAddress().c_str());
+ }
+
+ switch (cs.type()) {
+ case ChecksumTypeProto::CHECKSUM_NULL:
+ verify = false;
+ checksumSize = 0;
+ break;
+
+ case ChecksumTypeProto::CHECKSUM_CRC32:
+ THROW(HdfsIOException, "RemoteBlockReader does not support CRC32 "
+ "checksum, Block: %s, from Datanode: %s",
+ binfo.toString().c_str(), datanode.formatAddress().c_str());
+ break;
+
+ case ChecksumTypeProto::CHECKSUM_CRC32C:
+ if (HWCrc32c::available()) {
+ checksum = shared_ptr<Checksum>(new HWCrc32c());
+ } else {
+ checksum = shared_ptr<Checksum>(new SWCrc32c());
+ }
+
+ checksumSize = sizeof(int32_t);
+ break;
+
+ default:
+ THROW(HdfsIOException, "RemoteBlockReader cannot recognize checksum "
+ "type: %d, Block: %s, from Datanode: %s",
+ static_cast<int>(cs.type()), binfo.toString().c_str(),
+ datanode.formatAddress().c_str());
+ }
+
+ /*
+ * The offset into the block at which the first packet
+ * will start. This is necessary since reads will align
+ * backwards to a checksum chunk boundary.
+ */
+ int64_t firstChunkOffset = checksumInfo.chunkoffset();
+
+ if (firstChunkOffset < 0 || firstChunkOffset > cursor ||
+ firstChunkOffset <= cursor - chunkSize) {
+ THROW(HdfsIOException,
+ "RemoteBlockReader invalid first chunk offset: %" PRId64
+ ", expected range[0, %" PRId64 "], " "Block: %s, from Datanode: %s",
+ firstChunkOffset, cursor, binfo.toString().c_str(),
+ datanode.formatAddress().c_str());
+ }
+}
+
+shared_ptr<PacketHeader> RemoteBlockReader::readPacketHeader() {
+ try {
+ shared_ptr<PacketHeader> retval;
+ static const int packetHeaderLen = PacketHeader::GetPkgHeaderSize();
+ std::vector<char> buf(packetHeaderLen);
+
+ if (lastHeader && lastHeader->isLastPacketInBlock()) {
+ THROW(HdfsIOException, "RemoteBlockReader: read over block end "
+ "from Datanode: %s, Block: %s.",
+ datanode.formatAddress().c_str(), binfo.toString().c_str());
+ }
+
+ in->readFully(&buf[0], packetHeaderLen, readTimeout);
+ retval = shared_ptr<PacketHeader>(new PacketHeader);
+ retval->readFields(&buf[0], packetHeaderLen);
+ return retval;
+ } catch (const HdfsIOException &e) {
+ NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read "
+ "block header for Block: %s from Datanode: %s.",
+ binfo.toString().c_str(), datanode.formatAddress().c_str());
+ }
+}
+
+void RemoteBlockReader::readNextPacket() {
+ assert(position >= size);
+ lastHeader = readPacketHeader();
+ int dataSize = lastHeader->getDataLen();
+ int64_t pendingAhead = 0;
+
+ if (!lastHeader->sanityCheck(lastSeqNo)) {
+ THROW(HdfsIOException, "RemoteBlockReader: Packet failed on sanity "
+ "check for block %s from Datanode %s.",
+ binfo.toString().c_str(), datanode.formatAddress().c_str());
+ }
+
+ assert(dataSize > 0 || lastHeader->getPacketLen() == sizeof(int32_t));
+
+ if (dataSize > 0) {
+ int chunks = (dataSize + chunkSize - 1) / chunkSize;
+ int checksumLen = chunks * checksumSize;
+ size = checksumLen + dataSize;
+ assert(size ==
+ lastHeader->getPacketLen() - static_cast<int>(sizeof(int32_t)));
+ buffer.resize(size);
+ in->readFully(&buffer[0], size, readTimeout);
+ lastSeqNo = lastHeader->getSeqno();
+
+ if (lastHeader->getPacketLen() != static_cast<int>(sizeof(int32_t)) +
+ dataSize + checksumLen) {
+ THROW(HdfsIOException, "Invalid Packet, packetLen is %d, "
+ "dataSize is %d, checksum size is %d",
+ lastHeader->getPacketLen(), dataSize, checksumLen);
+ }
+
+ if (verify) {
+ verifyChecksum(chunks);
+ }
+
+ /*
+ * skip checksum
+ */
+ position = checksumLen;
+ /*
+ * the first packet we get may start at the position before we required
+ */
+ pendingAhead = cursor - lastHeader->getOffsetInBlock();
+ pendingAhead = pendingAhead > 0 ? pendingAhead : 0;
+ position += pendingAhead;
+ }
+
+ /*
+ * we reach the end of the range we required, send status to datanode
+ * if datanode do not sending data anymore.
+ */
+
+ if (cursor + dataSize - pendingAhead >= endOffset && readTrailingEmptyPacket()) {
+ sendStatus();
+ }
+}
+
+bool RemoteBlockReader::readTrailingEmptyPacket() {
+ shared_ptr<PacketHeader> trailingHeader = readPacketHeader();
+
+ if (!trailingHeader->isLastPacketInBlock() || trailingHeader->getDataLen() != 0) {
+ return false;
+ }
+
+ return true;
+}
+
+void RemoteBlockReader::sendStatus() {
+ ClientReadStatusProto status;
+
+ if (verify) {
+ status.set_status(hadoop::hdfs::CHECKSUM_OK);
+ } else {
+ status.set_status(hadoop::hdfs::SUCCESS);
+ }
+
+ WriteBuffer buffer;
+ int size = status.ByteSize();
+ buffer.writeVarint32(size);
+ status.SerializeToArray(buffer.alloc(size), size);
+ sock->writeFully(buffer.getBuffer(0), buffer.getDataSize(0), writeTimeout);
+}
+
+void RemoteBlockReader::verifyChecksum(int chunks) {
+ int dataSize = lastHeader->getDataLen();
+ char * pchecksum = &buffer[0];
+ char * pdata = &buffer[0] + (chunks * checksumSize);
+
+ for (int i = 0; i < chunks; ++i) {
+ int size = chunkSize < dataSize ? chunkSize : dataSize;
+ dataSize -= size;
+ checksum->reset();
+ checksum->update(pdata + (i * chunkSize), size);
+ uint32_t result = checksum->getValue();
+ uint32_t target =
+ ReadBigEndian32FromArray(pchecksum + (i * checksumSize));
+
+ if (result != target && size == chunkSize) {
+ THROW(ChecksumException, "RemoteBlockReader: checksum not match "
+ "for Block: %s, on Datanode: %s",
+ binfo.toString().c_str(), datanode.formatAddress().c_str());
+ }
+ }
+
+ assert(0 == dataSize);
+}
+
+int64_t RemoteBlockReader::available() {
+ return size - position > 0 ? size - position : 0;
+}
+
+int32_t RemoteBlockReader::read(char * buf, int32_t len) {
+ assert(0 != len && NULL != buf);
+
+ if (cursor >= endOffset) {
+ THROW(HdfsIOException, "RemoteBlockReader: read over block end from "
+ "Datanode: %s, Block: %s.",
+ datanode.formatAddress().c_str(), binfo.toString().c_str());
+ }
+
+ try {
+ if (position >= size) {
+ readNextPacket();
+ }
+
+ int32_t todo = len < size - position ? len : size - position;
+ memcpy(buf, &buffer[position], todo);
+ position += todo;
+ cursor += todo;
+ return todo;
+ } catch (const HdfsTimeoutException &e) {
+ NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read "
+ "Block: %s from Datanode: %s.",
+ binfo.toString().c_str(), datanode.formatAddress().c_str());
+ } catch (const HdfsNetworkException &e) {
+ NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read "
+ "Block: %s from Datanode: %s.",
+ binfo.toString().c_str(), datanode.formatAddress().c_str());
+ }
+}
+
+void RemoteBlockReader::skip(int64_t len) {
+ int64_t todo = len;
+ assert(cursor + len <= endOffset);
+
+ try {
+ while (todo > 0) {
+ if (cursor >= endOffset) {
+ THROW(HdfsIOException, "RemoteBlockReader: skip over block "
+ "end from Datanode: %s, Block: %s.",
+ datanode.formatAddress().c_str(), binfo.toString().c_str());
+ }
+
+ if (position >= size) {
+ readNextPacket();
+ }
+
+ int batch = size - position;
+ batch = batch < todo ? batch : static_cast<int>(todo);
+ position += batch;
+ cursor += batch;
+ todo -= batch;
+ }
+ } catch (const HdfsTimeoutException &e) {
+ NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read "
+ "Block: %s from Datanode: %s.",
+ binfo.toString().c_str(), datanode.formatAddress().c_str());
+ } catch (const HdfsNetworkException &e) {
+ NESTED_THROW(HdfsIOException, "RemoteBlockReader: failed to read "
+ "Block: %s from Datanode: %s.",
+ binfo.toString().c_str(), datanode.formatAddress().c_str());
+ }
+}
+
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h
new file mode 100644
index 0000000..548118b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/RemoteBlockReader.h
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_CLIENT_REMOTEBLOCKREADER_H_
+#define _HDFS_LIBHDFS3_CLIENT_REMOTEBLOCKREADER_H_
+
+#include "BlockReader.h"
+#include "Checksum.h"
+#include "DataTransferProtocol.h"
+#include "PacketHeader.h"
+#include "SessionConfig.h"
+#include "common/SharedPtr.h"
+#include "network/BufferedSocketReader.h"
+#include "network/TcpSocket.h"
+#include "server/DatanodeInfo.h"
+#include "server/LocatedBlocks.h"
+
+#include <stdint.h>
+
+namespace hdfs {
+namespace internal {
+
+class RemoteBlockReader: public BlockReader {
+public:
+ RemoteBlockReader(const ExtendedBlock &eb, DatanodeInfo &datanode,
+ int64_t start, int64_t len, const Token &token,
+ const char *clientName, bool verify, SessionConfig &conf);
+
+ ~RemoteBlockReader();
+
+ /**
+ * Get how many bytes can be read without blocking.
+ * @return The number of bytes can be read without blocking.
+ */
+ virtual int64_t available();
+
+ /**
+ * To read data from block.
+ * @param buf the buffer used to filled.
+ * @param size the number of bytes to be read.
+ * @return return the number of bytes filled in the buffer,
+ * it may less than size. Return 0 if reach the end of block.
+ */
+ virtual int32_t read(char *buf, int32_t len);
+
+ /**
+ * Move the cursor forward len bytes.
+ * @param len The number of bytes to skip.
+ */
+ virtual void skip(int64_t len);
+
+private:
+ bool readTrailingEmptyPacket();
+ shared_ptr<PacketHeader> readPacketHeader();
+ void checkResponse();
+ void readNextPacket();
+ void sendStatus();
+ void verifyChecksum(int chunks);
+
+private:
+ bool verify; //verify checksum or not.
+ DatanodeInfo &datanode;
+ const ExtendedBlock &binfo;
+ int checksumSize;
+ int chunkSize;
+ int connTimeout;
+ int position; //point in buffer.
+ int readTimeout;
+ int size; //data size in buffer.
+ int writeTimeout;
+ int64_t cursor; //point in block.
+ int64_t endOffset; //offset in block requested to read to.
+ int64_t lastSeqNo; //segno of the last chunk received
+ shared_ptr<BufferedSocketReader> in;
+ shared_ptr<Checksum> checksum;
+ shared_ptr<DataTransferProtocol> sender;
+ shared_ptr<PacketHeader> lastHeader;
+ shared_ptr<Socket> sock;
+ std::vector<char> buffer;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_REMOTEBLOCKREADER_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Token.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Token.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Token.cc
new file mode 100644
index 0000000..16ef93f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Token.cc
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "Hash.h"
+#include "Token.h"
+#include "WritableUtils.h"
+
+#include <gsasl.h>
+#include <string>
+#include <vector>
+
+using namespace hdfs::internal;
+
+namespace hdfs {
+namespace internal {
+
+static std::string Base64Encode(const char *input, size_t len) {
+ int rc = 0;
+ size_t outLen;
+ char * output = NULL;
+ std::string retval;
+
+ if (GSASL_OK != (rc = gsasl_base64_to(input, len, &output, &outLen))) {
+ assert(GSASL_MALLOC_ERROR == rc);
+ throw std::bad_alloc();
+ }
+
+ assert(NULL != output);
+ retval = output;
+ gsasl_free(output);
+
+ for (size_t i = 0 ; i < retval.length(); ++i) {
+ switch (retval[i]) {
+ case '+':
+ retval[i] = '-';
+ break;
+
+ case '/':
+ retval[i] = '_';
+ break;
+
+ case '=':
+ retval.resize(i);
+ break;
+
+ default:
+ break;
+ }
+ }
+
+ return retval;
+}
+
+static void Base64Decode(const std::string &urlSafe,
+ std::vector<char> &buffer) {
+ int retval = 0, append = 0;
+ size_t outLen;
+ char * output = NULL;
+ std::string input = urlSafe;
+
+ for (size_t i = 0; i < input.length(); ++i) {
+ switch (input[i]) {
+ case '-':
+ input[i] = '+';
+ break;
+
+ case '_':
+ input[i] = '/';
+ break;
+
+ default:
+ break;
+ }
+ }
+
+ while (true) {
+ retval = gsasl_base64_from(&input[0], input.length(), &output, &outLen);
+
+ if (GSASL_OK != retval) {
+ switch (retval) {
+ case GSASL_BASE64_ERROR:
+ if (append++ < 2) {
+ input.append("=");
+ continue;
+ }
+
+ throw std::invalid_argument(
+ "invalid input of gsasl_base64_from");
+
+ case GSASL_MALLOC_ERROR:
+ throw std::bad_alloc();
+
+ default:
+ assert(
+ false
+ && "unexpected return value from gsasl_base64_from");
+ }
+ }
+
+ break;
+ }
+
+ assert(outLen >= 0);
+ buffer.resize(outLen);
+ memcpy(&buffer[0], output, outLen);
+ gsasl_free(output);
+}
+
+std::string Token::toString() const {
+ try {
+ size_t len = 0;
+ std::vector<char> buffer(1024);
+ WritableUtils out(&buffer[0], buffer.size());
+ len += out.WriteInt32(identifier.size());
+ len += out.WriteRaw(&identifier[0], identifier.size());
+ len += out.WriteInt32(password.size());
+ len += out.WriteRaw(&password[0], password.size());
+ len += out.WriteText(kind);
+ len += out.WriteText(service);
+ return Base64Encode(&buffer[0], len);
+ } catch (...) {
+ NESTED_THROW(HdfsIOException, "cannot convert token to string");
+ }
+}
+
+void Token::fromString(const std::string &str) {
+ int32_t len;
+
+ try {
+ std::vector<char> buffer;
+ Base64Decode(str, buffer);
+ WritableUtils in(&buffer[0], buffer.size());
+ len = in.ReadInt32();
+ identifier.resize(len);
+ in.ReadRaw(&identifier[0], len);
+ len = in.ReadInt32();
+ password.resize(len);
+ in.ReadRaw(&password[0], len);
+ kind = in.ReadText();
+ service = in.ReadText();
+ } catch (...) {
+ NESTED_THROW(HdfsInvalidBlockToken,
+ "cannot construct a token from the string");
+ }
+}
+
+size_t Token::hash_value() const {
+ size_t values[] = { StringHasher(identifier), StringHasher(password),
+ StringHasher(kind), StringHasher(service)
+ };
+ return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Token.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Token.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Token.h
new file mode 100644
index 0000000..c72cd86
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/Token.h
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_CLIENT_TOKEN_H_
+#define _HDFS_LIBHDFS3_CLIENT_TOKEN_H_
+
+#include <string>
+
+namespace hdfs {
+namespace internal {
+
+class Token {
+public:
+ std::string getIdentifier() const {
+ return identifier;
+ }
+
+ void setIdentifier(const std::string &identifier) {
+ this->identifier = identifier;
+ }
+
+ std::string getKind() const {
+ return kind;
+ }
+
+ void setKind(const std::string &kind) {
+ this->kind = kind;
+ }
+
+ std::string getPassword() const {
+ return password;
+ }
+
+ void setPassword(const std::string &password) {
+ this->password = password;
+ }
+
+ std::string getService() const {
+ return service;
+ }
+
+ void setService(const std::string &service) {
+ this->service = service;
+ }
+
+ bool operator ==(const Token &other) const {
+ return identifier == other.identifier && password == other.password
+ && kind == other.kind && service == other.service;
+ }
+
+ std::string toString() const;
+
+ void fromString(const std::string &str);
+
+ size_t hash_value() const;
+
+private:
+ std::string identifier;
+ std::string password;
+ std::string kind;
+ std::string service;
+};
+
+}
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/UserInfo.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/UserInfo.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/UserInfo.cc
new file mode 100644
index 0000000..a68bca0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/UserInfo.cc
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "UserInfo.h"
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+
+#include <pwd.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <vector>
+
+namespace hdfs {
+namespace internal {
+
+UserInfo UserInfo::LocalUser() {
+ UserInfo retval;
+ uid_t uid, euid;
+ int bufsize;
+ struct passwd pwd, epwd, *result = NULL;
+ euid = geteuid();
+ uid = getuid();
+
+ if ((bufsize = sysconf(_SC_GETPW_R_SIZE_MAX)) == -1) {
+ THROW(InvalidParameter,
+ "Invalid input: \"sysconf\" function failed to get the "
+ "configure with key \"_SC_GETPW_R_SIZE_MAX\".");
+ }
+
+ std::vector<char> buffer(bufsize);
+
+ if (getpwuid_r(euid, &epwd, &buffer[0], bufsize, &result) != 0 || !result) {
+ THROW(InvalidParameter,
+ "Invalid input: effective user name cannot be found with UID %u.",
+ euid);
+ }
+
+ retval.setEffectiveUser(epwd.pw_name);
+
+ if (getpwuid_r(uid, &pwd, &buffer[0], bufsize, &result) != 0 || !result) {
+ THROW(InvalidParameter,
+ "Invalid input: real user name cannot be found with UID %u.",
+ uid);
+ }
+
+ retval.setRealUser(pwd.pw_name);
+ return retval;
+}
+
+size_t UserInfo::hash_value() const {
+ size_t values[] = { StringHasher(realUser), effectiveUser.hash_value() };
+ return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/UserInfo.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/UserInfo.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/UserInfo.h
new file mode 100644
index 0000000..efc2c60
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/client/UserInfo.h
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _HDFS_LIBHDFS3_CLIENT_USERINFO_H_
+#define _HDFS_LIBHDFS3_CLIENT_USERINFO_H_
+
+#include "Hash.h"
+#include "KerberosName.h"
+#include "Logger.h"
+#include "Token.h"
+
+#include <map>
+#include <string>
+
+namespace hdfs {
+namespace internal {
+
+class UserInfo {
+public:
+ UserInfo() {
+ }
+
+ explicit UserInfo(const std::string &u) :
+ effectiveUser(u) {
+ }
+
+ const std::string &getRealUser() const {
+ return realUser;
+ }
+
+ void setRealUser(const std::string &user) {
+ this->realUser = user;
+ }
+
+ const std::string &getEffectiveUser() const {
+ return effectiveUser.getName();
+ }
+
+ void setEffectiveUser(const std::string &effectiveUser) {
+ this->effectiveUser = KerberosName(effectiveUser);
+ }
+
+ std::string getPrincipal() const {
+ return effectiveUser.getPrincipal();
+ }
+
+ bool operator ==(const UserInfo &other) const {
+ return realUser == other.realUser
+ && effectiveUser == other.effectiveUser;
+ }
+
+ void addToken(const Token &token) {
+ tokens[std::make_pair(token.getKind(), token.getService())] = token;
+ }
+
+ const Token * selectToken(const std::string &kind,
+ const std::string &service) const {
+ std::map<std::pair<std::string, std::string>,
+ Token>::const_iterator it;
+ it = tokens.find(std::make_pair(kind, service));
+ if (it == tokens.end()) {
+ return NULL;
+ }
+ return &it->second;
+ }
+
+ size_t hash_value() const;
+
+public:
+ static UserInfo LocalUser();
+
+private:
+ KerberosName effectiveUser;
+ std::map<std::pair<std::string, std::string>, Token> tokens;
+ std::string realUser;
+};
+
+}
+}
+
+HDFS_HASH_DEFINE(::hdfs::internal::UserInfo);
+
+#endif /* _HDFS_LIBHDFS3_CLIENT_USERINFO_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/LruMap.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/LruMap.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/LruMap.h
index a434aaf..fd62d5b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/LruMap.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/LruMap.h
@@ -20,7 +20,7 @@
#define _HDFS_LIBHDFS3_COMMON_LRUMAP_H_
#include "Thread.h"
-#include "Unordered.h"
+#include "UnorderedMap.h"
#include <list>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/SharedPtr.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/SharedPtr.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/SharedPtr.h
index 8e0a40e..76ab1d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/SharedPtr.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/SharedPtr.h
@@ -19,6 +19,18 @@
#ifndef _HDFS_LIBHDFS3_COMMON_SHARED_PTR_H_
#define _HDFS_LIBHDFS3_COMMON_SHARED_PTR_H_
+#ifdef _LIBCPP_VERSION
+#include <memory>
+
+namespace hdfs {
+namespace internal {
+
+using std::shared_ptr;
+
+}
+}
+
+#else
#include <tr1/memory>
namespace hdfs {
@@ -30,3 +42,4 @@ using std::tr1::shared_ptr;
}
#endif
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/StackPrinter.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/StackPrinter.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/StackPrinter.h
index 4dff889..00e3a2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/StackPrinter.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/StackPrinter.h
@@ -23,7 +23,7 @@
#include <string>
-#ifndef DEFAULT_STACK_PREFIX
+#ifndef DEFAULT_STACK_PREFIX
#define DEFAULT_STACK_PREFIX "\t@\t"
#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/UnorderedMap.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/UnorderedMap.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/UnorderedMap.h
index 3bb08af..8c2c549 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/UnorderedMap.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/common/UnorderedMap.h
@@ -19,7 +19,21 @@
#ifndef _HDFS_LIBHDFS3_COMMON_UNORDERED_MAP_H_
#define _HDFS_LIBHDFS3_COMMON_UNORDERED_MAP_H_
-#include <tr1/unordred_map>
+#ifdef _LIBCPP_VERSION
+
+#include <unordered_map>
+
+namespace hdfs {
+namespace internal {
+
+using std::unordered_map;
+
+}
+}
+
+#else
+
+#include <tr1/unordered_map>
namespace hdfs {
namespace internal {
@@ -30,3 +44,4 @@ using std::tr1::unordered_map;
}
#endif
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/BufferedSocketReader.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/BufferedSocketReader.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/BufferedSocketReader.cc
new file mode 100644
index 0000000..fe30d68
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/BufferedSocketReader.cc
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BufferedSocketReader.h"
+#include "DateTime.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+
+#include <google/protobuf/io/coded_stream.h>
+
+using namespace google::protobuf::io;
+
+namespace hdfs {
+namespace internal {
+
+BufferedSocketReaderImpl::BufferedSocketReaderImpl(Socket & s) :
+ cursor(0), size(0), sock(s), buffer(sizeof(int64_t)) {
+}
+
+int32_t BufferedSocketReaderImpl::read(char * b, int32_t s) {
+ assert(s > 0 && NULL != b);
+ int32_t done = s < size - cursor ? s : size - cursor;
+
+ if (done > 0) {
+ memcpy(b, &buffer[cursor], done);
+ cursor += done;
+ return done;
+ } else {
+ assert(size == cursor);
+ size = cursor = 0;
+ return sock.read(b, s);
+ }
+}
+
+void BufferedSocketReaderImpl::readFully(char * b, int32_t s, int timeout) {
+ assert(s > 0 && NULL != b);
+ int32_t done = s < size - cursor ? s : size - cursor;
+ memcpy(b, &buffer[cursor], done);
+ cursor += done;
+
+ if (done < s) {
+ assert(size == cursor);
+ size = cursor = 0;
+ sock.readFully(b + done, s - done, timeout);
+ }
+}
+
+int32_t BufferedSocketReaderImpl::readBigEndianInt32(int timeout) {
+ char buf[sizeof(int32_t)];
+ readFully(buf, sizeof(buf), timeout);
+ return ntohl(*reinterpret_cast<int32_t *>(buf));
+}
+
+int32_t BufferedSocketReaderImpl::readVarint32(int timeout) {
+ int32_t value;
+ bool rc = false;
+ int deadline = timeout;
+ memmove(&buffer[0], &buffer[cursor], size - cursor);
+ size -= cursor;
+ cursor = 0;
+
+ while (!rc) {
+ CodedInputStream in(reinterpret_cast<uint8_t *>(&buffer[cursor]),
+ size - cursor);
+ in.PushLimit(size - cursor);
+ rc = in.ReadVarint32(reinterpret_cast<uint32_t *>(&value));
+
+ if (rc) {
+ cursor += size - cursor - in.BytesUntilLimit();
+ return value;
+ }
+
+ steady_clock::time_point s = steady_clock::now();
+ CheckOperationCanceled();
+
+ if (size == static_cast<int32_t>(buffer.size())) {
+ THROW(HdfsNetworkException,
+ "Invalid varint type or buffer is too small, buffer size = %d.",
+ static_cast<int>(buffer.size()));
+ }
+
+ if (sock.poll(true, false, deadline)) {
+ size += sock.read(&buffer[size], buffer.size() - size);
+ }
+
+ steady_clock::time_point e = steady_clock::now();
+
+ if (timeout > 0) {
+ deadline -= ToMilliSeconds(s, e);
+ }
+
+ if (timeout >= 0 && deadline <= 0) {
+ THROW(HdfsTimeoutException, "Read %d bytes timeout", size);
+ }
+ }
+
+ return 0;
+}
+
+bool BufferedSocketReaderImpl::poll(int timeout) {
+ if (cursor < size) {
+ return true;
+ }
+
+ return sock.poll(true, false, timeout);
+}
+
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/BufferedSocketReader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/BufferedSocketReader.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/BufferedSocketReader.h
new file mode 100644
index 0000000..efe7826
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/BufferedSocketReader.h
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_NETWORK_BUFFEREDSOCKET_H_
+#define _HDFS_LIBHDFS3_NETWORK_BUFFEREDSOCKET_H_
+
+#include <vector>
+#include <stdint.h>
+#include <cstdlib>
+
+#include "Socket.h"
+
+namespace hdfs {
+namespace internal {
+
+/**
+ * A warper of Socket, read big endian int and varint from socket.
+ */
+class BufferedSocketReader {
+public:
+ virtual ~BufferedSocketReader() {
+ }
+
+ /**
+ * Read data from socket, if there is data buffered, read from buffer first.
+ * If there is nothing can be read, the caller will be blocked.
+ * @param b The buffer used to receive data.
+ * @param s The size of bytes to be read.
+ * @return The size of data already read.
+ * @throw HdfsNetworkException
+ * @throw HdfsEndOfStream
+ */
+ virtual int32_t read(char * b, int32_t s) = 0;
+
+ /**
+ * Read data form socket, if there is data buffered, read from buffer first.
+ * If there is not enough data can be read, the caller will be blocked.
+ * @param b The buffer used to receive data.
+ * @param s The size of bytes to read.
+ * @param timeout The timeout interval of this read operation, negative means infinite.
+ * @throw HdfsNetworkException
+ * @throw HdfsEndOfStream
+ * @throw HdfsTimeout
+ */
+ virtual void readFully(char * b, int32_t s, int timeout) = 0;
+
+ /**
+ * Read a 32 bit big endian integer from socket.
+ * If there is not enough data can be read, the caller will be blocked.
+ * @param timeout The timeout interval of this read operation, negative means infinite.
+ * @return A 32 bit integer.
+ * @throw HdfsNetworkException
+ * @throw HdfsEndOfStream
+ * @throw HdfsTimeout
+ */
+ virtual int32_t readBigEndianInt32(int timeout) = 0;
+
+ /**
+ * Read a variable length encoding 32bit integer from socket.
+ * If there is not enough data can be read, the caller will be blocked.
+ * @param timeout The timeout interval of this read operation, negative means infinite.
+ * @return A 32 bit integer.
+ * @throw HdfsNetworkException
+ * @throw HdfsEndOfStream
+ * @throw HdfsTimeout
+ */
+ virtual int32_t readVarint32(int timeout) = 0;
+
+ /**
+ * Test if the socket can be read without blocking.
+ * @param timeout Time timeout interval of this operation, negative means infinite.
+ * @return Return true if the socket can be read without blocking, false on timeout.
+ * @throw HdfsNetworkException
+ * @throw HdfsTimeout
+ */
+ virtual bool poll(int timeout) = 0;
+
+};
+
+/**
+ * An implement of BufferedSocketReader.
+ */
+class BufferedSocketReaderImpl: public BufferedSocketReader {
+public:
+ BufferedSocketReaderImpl(Socket & s);
+
+ int32_t read(char * b, int32_t s);
+
+ void readFully(char * b, int32_t s, int timeout);
+
+ int32_t readBigEndianInt32(int timeout);
+
+ int32_t readVarint32(int timeout);
+
+ bool poll(int timeout);
+
+private:
+ //for test
+ BufferedSocketReaderImpl(Socket & s, const std::vector<char> & buffer) :
+ cursor(0), size(buffer.size()), sock(s), buffer(buffer) {
+ }
+
+private:
+ int32_t cursor;
+ int32_t size;
+ Socket & sock;
+ std::vector<char> buffer;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_NETWORK_BUFFEREDSOCKET_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/Socket.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/Socket.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/Socket.h
new file mode 100644
index 0000000..43968dc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/Socket.h
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_NETWORK_SOCKET_H_
+#define _HDFS_LIBHDFS3_NETWORK_SOCKET_H_
+
+#include <netdb.h>
+
+#include <string>
+
+namespace hdfs {
+namespace internal {
+
+class Socket {
+public:
+
+ virtual ~Socket() {
+ }
+
+ /**
+ * Read data from socket.
+ * If there is nothing can be read, the caller will be blocked.
+ * @param buffer The buffer to store the data.
+ * @param size The size of bytes to be read.
+ * @return The size of data already read.
+ * @throw HdfsNetworkException
+ * @throw HdfsEndOfStream
+ */
+ virtual int32_t read(char * buffer, int32_t size) = 0;
+
+ /**
+ * Read data from socket until get enough data.
+ * If there is not enough data can be read, the caller will be blocked.
+ * @param buffer The buffer to store the data.
+ * @param size The size of bytes to be read.
+ * @param timeout The timeout interval of this read operation, negative means infinite.
+ * @throw HdfsNetworkException
+ * @throw HdfsEndOfStream
+ * @throw HdfsTimeout
+ */
+ virtual void readFully(char * buffer, int32_t size, int timeout) = 0;
+
+ /**
+ * Send data to socket.
+ * The caller will be blocked until send operation finished,
+ * but not guarantee that all data has been sent.
+ * @param buffer The data to be sent.
+ * @param size The size of bytes to be sent.
+ * @return The size of data already be sent.
+ * @throw HdfsNetworkException
+ */
+ virtual int32_t write(const char * buffer, int32_t size) = 0;
+
+ /**
+ * Send all data to socket.
+ * The caller will be blocked until all data has been sent.
+ * @param buffer The data to be sent.
+ * @param size The size of bytes to be sent.
+ * @param timeout The timeout interval of this write operation, negative means infinite.
+ * @throw HdfsNetworkException
+ * @throw HdfsTimeout
+ */
+ virtual void writeFully(const char * buffer, int32_t size, int timeout) = 0;
+
+ /**
+ * Connection to a tcp server.
+ * @param host The host of server.
+ * @param port The port of server.
+ * @param timeout The timeout interval of this read operation, negative means infinite.
+ * @throw HdfsNetworkException
+ * @throw HdfsTimeout
+ */
+ virtual void connect(const char * host, int port, int timeout) = 0;
+
+ /**
+ * Connection to a tcp server.
+ * @param host The host of server.
+ * @param port The port of server.
+ * @param timeout The timeout interval of this read operation, negative means infinite.
+ * @throw HdfsNetworkException
+ * @throw HdfsTimeout
+ */
+ virtual void connect(const char * host, const char * port, int timeout) = 0;
+
+ /**
+ * Connection to a tcp server.
+ * @param paddr The address of server.
+ * @param host The host of server used in error message.
+ * @param port The port of server used in error message.
+ * @param timeout The timeout interval of this read operation, negative means infinite.
+ * @throw HdfsNetworkException
+ * @throw HdfsTimeout
+ */
+ virtual void connect(struct addrinfo * paddr, const char * host,
+ const char * port, int timeout) = 0;
+
+ /**
+ * Test if the socket can be read or written without blocking.
+ * @param read Test socket if it can be read.
+ * @param write Test socket if it can be written.
+ * @param timeout Time timeout interval of this operation, negative means infinite.
+ * @return Return true if the socket can be read or written without blocking, false on timeout.
+ * @throw HdfsNetworkException
+ * @throw HdfsTimeout
+ */
+ virtual bool poll(bool read, bool write, int timeout) = 0;
+
+ /**
+ * Set socket no delay mode.
+ * @param enable If true, set socket into no delay mode, else delay mode.
+ * @throw HdfsNetworkException
+ */
+ virtual void setNoDelay(bool enable) = 0;
+
+ /**
+ * Set socket blocking mode.
+ * @param enable If true, set socket into blocking mode, else non-block mode.
+ * @throw HdfsNetworkException
+ */
+ virtual void setBlockMode(bool enable) = 0;
+
+ /**
+ * Set socket linger timeout
+ * @param timeout Linger timeout of the socket in millisecond, disable linger if it is less than 0.
+ * @throw HdfsNetworkException
+ */
+ virtual void setLingerTimeout(int timeout) = 0;
+
+ /**
+ * Shutdown and close the socket.
+ * @throw nothrow
+ */
+ virtual void close() = 0;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_NETWORK_SOCKET_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/Syscall.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/Syscall.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/Syscall.h
new file mode 100644
index 0000000..5dab57c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/Syscall.h
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_NETWORK_SYSCALL_H_
+#define _HDFS_LIBHDFS3_NETWORK_SYSCALL_H_
+
+#include <fcntl.h>
+#include <netdb.h>
+#include <poll.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+namespace real_syscalls {
+
+using ::recv;
+using ::send;
+using ::getaddrinfo;
+using ::freeaddrinfo;
+using ::socket;
+using ::connect;
+using ::getpeername;
+using ::fcntl;
+using ::setsockopt;
+using ::poll;
+using ::shutdown;
+using ::close;
+
+}
+
+#ifdef MOCK
+
+#include "MockSystem.h"
+namespace syscalls = mock_systems;
+
+#else
+
+namespace syscalls = real_syscalls;
+
+#endif
+
+#endif /* _HDFS_LIBHDFS3_NETWORK_SYSCALL_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/TcpSocket.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/TcpSocket.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/TcpSocket.cc
new file mode 100644
index 0000000..de2db9d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/TcpSocket.cc
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "platform.h"
+
+#include <arpa/inet.h>
+#include <cassert>
+#include <climits>
+#include <cstring>
+#include <errno.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <poll.h>
+#include <stdint.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <sstream>
+
+#include "DateTime.h"
+#include "Exception.h"
+#include "ExceptionInternal.h"
+#include "TcpSocket.h"
+#include "Syscall.h"
+
+// Linux defines a constant that you can use inside send() to prevent SIGPIPE
+// from being raised. When this constant is present, we want to use it. When
+// it is not present, we just pass 0 (no flag).
+#ifndef MSG_NOSIGNAL
+#define MSG_NOSIGNAL 0
+#endif
+
+namespace hdfs {
+namespace internal {
+
+// MacOS and some other BSD-based operating systems allow you to set
+// SO_NOSIGPIPE on a socket to prevent writes to that socket from raising
+// SIGPIPE.
+void TcpSocketImpl::setNoSigPipe() {
+#ifdef SO_NOSIGPIPE
+ int flag = 1;
+
+ if (syscalls::setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, (char *) &flag,
+ sizeof(flag))) {
+ THROW(HdfsNetworkException, "Set socket flag failed for remote "
+ "node %s: %s", remoteAddr.c_str(), GetSystemErrorInfo(errno));
+ }
+#endif
+}
+
+TcpSocketImpl::TcpSocketImpl() :
+ sock(-1), lingerTimeout(-1) {
+}
+
+TcpSocketImpl::~TcpSocketImpl() {
+ close();
+}
+
+int32_t TcpSocketImpl::read(char * buffer, int32_t size) {
+ assert(-1 != sock);
+ assert(NULL != buffer && size > 0);
+ int32_t rc;
+
+ do {
+ rc = syscalls::recv(sock, buffer, size, 0);
+ } while (-1 == rc && EINTR == errno && !CheckOperationCanceled());
+
+ if (-1 == rc) {
+ THROW(HdfsNetworkException, "Read %d bytes failed from %s: %s",
+ size, remoteAddr.c_str(), GetSystemErrorInfo(errno));
+ }
+
+ if (0 == rc) {
+ THROW(HdfsEndOfStream, "Read %d bytes failed from %s: End of the stream", size, remoteAddr.c_str());
+ }
+
+ return rc;
+}
+
+void TcpSocketImpl::readFully(char * buffer, int32_t size, int timeout) {
+ assert(-1 != sock);
+ assert(NULL != buffer && size > 0);
+ int32_t todo = size, rc;
+ int deadline = timeout;
+
+ while (todo > 0) {
+ steady_clock::time_point s = steady_clock::now();
+ CheckOperationCanceled();
+
+ if (poll(true, false, deadline)) {
+ rc = read(buffer + (size - todo), todo);
+ todo -= rc;
+ }
+
+ steady_clock::time_point e = steady_clock::now();
+
+ if (timeout > 0) {
+ deadline -= ToMilliSeconds(s, e);
+ }
+
+ if (todo > 0 && timeout >= 0 && deadline <= 0) {
+ THROW(HdfsTimeoutException, "Read %d bytes timeout from %s", size, remoteAddr.c_str());
+ }
+ }
+}
+
+int32_t TcpSocketImpl::write(const char * buffer, int32_t size) {
+ assert(-1 != sock);
+ assert(NULL != buffer && size > 0);
+ int32_t rc;
+
+ do {
+ rc = syscalls::send(sock, buffer, size, MSG_NOSIGNAL);
+ } while (-1 == rc && EINTR == errno && !CheckOperationCanceled());
+
+ if (-1 == rc) {
+ THROW(HdfsNetworkException, "Write %d bytes failed to %s: %s",
+ size, remoteAddr.c_str(), GetSystemErrorInfo(errno));
+ }
+
+ return rc;
+}
+
+void TcpSocketImpl::writeFully(const char * buffer, int32_t size, int timeout) {
+ assert(-1 != sock);
+ assert(NULL != buffer && size > 0);
+ int32_t todo = size, rc;
+ int deadline = timeout;
+
+ while (todo > 0) {
+ steady_clock::time_point s = steady_clock::now();
+ CheckOperationCanceled();
+
+ if (poll(false, true, deadline)) {
+ rc = write(buffer + (size - todo), todo);
+ todo -= rc;
+ }
+
+ steady_clock::time_point e = steady_clock::now();
+
+ if (timeout > 0) {
+ deadline -= ToMilliSeconds(s, e);
+ }
+
+ if (todo > 0 && timeout >= 0 && deadline <= 0) {
+ THROW(HdfsTimeoutException, "Write %d bytes timeout to %s",
+ size, remoteAddr.c_str());
+ }
+ }
+}
+
+void TcpSocketImpl::connect(const char * host, int port, int timeout) {
+ std::stringstream ss;
+ ss << port;
+ connect(host, ss.str().c_str(), timeout);
+}
+
+void TcpSocketImpl::connect(const char * host, const char * port, int timeout) {
+ assert(-1 == sock);
+ struct addrinfo hints, *addrs, *paddr;
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = PF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ int retval = syscalls::getaddrinfo(host, port, &hints, &addrs);
+
+ if (0 != retval) {
+ THROW(HdfsNetworkConnectException, "Failed to resolve address \"%s:%s\" %s",
+ host, port, gai_strerror(retval));
+ }
+
+ int deadline = timeout;
+ std::stringstream ss;
+ ss << "\"" << host << ":" << port << "\"";
+ remoteAddr = ss.str();
+
+ try {
+ for (paddr = addrs; NULL != paddr; paddr = paddr->ai_next) {
+ steady_clock::time_point s = steady_clock::now();
+ CheckOperationCanceled();
+
+ try {
+ connect(paddr, host, port, deadline);
+ } catch (HdfsNetworkConnectException & e) {
+ if (NULL == paddr->ai_next) {
+ throw;
+ }
+ } catch (HdfsTimeoutException & e) {
+ if (NULL == paddr->ai_next) {
+ throw;
+ }
+ }
+
+ if (-1 != sock) {
+ syscalls::freeaddrinfo(addrs);
+ return;
+ }
+
+ steady_clock::time_point e = steady_clock::now();
+
+ if (timeout > 0) {
+ deadline -= ToMilliSeconds(s, e);
+ }
+
+ if (-1 == sock && timeout >= 0 && deadline <= 0) {
+ THROW(HdfsTimeoutException, "Connect to \"%s:%s\" timeout", host, port);
+ }
+ }
+ } catch (...) {
+ syscalls::freeaddrinfo(addrs);
+ throw;
+ }
+}
+
+void TcpSocketImpl::connect(struct addrinfo * paddr, const char * host,
+ const char * port, int timeout) {
+ assert(-1 == sock);
+ sock = syscalls::socket(paddr->ai_family, paddr->ai_socktype,
+ paddr->ai_protocol);
+
+ if (-1 == sock) {
+ THROW(HdfsNetworkException,
+ "Create socket failed when connect to %s: %s",
+ remoteAddr.c_str(), GetSystemErrorInfo(errno));
+ }
+
+ if (lingerTimeout >= 0) {
+ setLingerTimeoutInternal(lingerTimeout);
+ }
+
+#ifdef __linux__
+ /*
+ * on linux some kernel use SO_SNDTIMEO as connect timeout.
+ * It is OK to set a very large value here since the user has its own timeout mechanism.
+ */
+ setSendTimeout(3600000);
+#endif
+
+ try {
+ setBlockMode(false);
+ setNoSigPipe();
+
+ int rc = 0;
+ do {
+ rc = syscalls::connect(sock, paddr->ai_addr, paddr->ai_addrlen);
+ } while (rc < 0 && EINTR == errno && !CheckOperationCanceled());
+
+ if (rc < 0) {
+ if (EINPROGRESS != errno && EWOULDBLOCK != errno) {
+ if (ETIMEDOUT == errno) {
+ THROW(HdfsTimeoutException, "Connect to \"%s:%s\" timeout",
+ host, port);
+ } else {
+ THROW(HdfsNetworkConnectException,
+ "Connect to \"%s:%s\" failed: %s",
+ host, port, GetSystemErrorInfo(errno));
+ }
+ }
+
+ if (!poll(false, true, timeout)) {
+ THROW(HdfsTimeoutException, "Connect to \"%s:%s\" timeout", host, port);
+ }
+
+ struct sockaddr peer;
+
+ unsigned int len = sizeof(peer);
+
+ memset(&peer, 0, sizeof(peer));
+
+ if (syscalls::getpeername(sock, &peer, &len)) {
+ /*
+ * connect failed, find out the error info.
+ */
+ char c;
+ rc = syscalls::recv(sock, &c, 1, 0);
+ assert(rc < 0);
+
+ if (ETIMEDOUT == errno) {
+ THROW(HdfsTimeoutException, "Connect to \"%s:%s\" timeout",
+ host, port);
+ }
+
+ THROW(HdfsNetworkConnectException, "Connect to \"%s:%s\" failed: %s",
+ host, port, GetSystemErrorInfo(errno));
+ }
+ }
+
+ setBlockMode(true);
+ } catch (...) {
+ close();
+ throw;
+ }
+}
+
+void TcpSocketImpl::setBlockMode(bool enable) {
+ int flag;
+ flag = syscalls::fcntl(sock, F_GETFL, 0);
+
+ if (-1 == flag) {
+ THROW(HdfsNetworkException, "Get socket flag failed for remote node %s: %s",
+ remoteAddr.c_str(), GetSystemErrorInfo(errno));
+ }
+
+ flag = enable ? (flag & ~O_NONBLOCK) : (flag | O_NONBLOCK);
+
+ if (-1 == syscalls::fcntl(sock, F_SETFL, flag)) {
+ THROW(HdfsNetworkException, "Set socket flag failed for remote "
+ "node %s: %s", remoteAddr.c_str(), GetSystemErrorInfo(errno));
+ }
+}
+
+bool TcpSocketImpl::poll(bool read, bool write, int timeout) {
+ assert(-1 != sock);
+ int rc;
+ struct pollfd pfd;
+
+ do {
+ memset(&pfd, 0, sizeof(pfd));
+ pfd.fd = sock;
+
+ if (read) {
+ pfd.events |= POLLIN;
+ }
+
+ if (write) {
+ pfd.events |= POLLOUT;
+ }
+
+ rc = syscalls::poll(&pfd, 1, timeout);
+ } while (-1 == rc && EINTR == errno && !CheckOperationCanceled());
+
+ if (-1 == rc) {
+ THROW(HdfsNetworkException, "Poll failed for remote node %s: %s",
+ remoteAddr.c_str(), GetSystemErrorInfo(errno));
+ }
+
+ return 0 != rc;
+}
+
+void TcpSocketImpl::setNoDelay(bool enable) {
+ assert(-1 != sock);
+ int flag = enable ? 1 : 0;
+
+ if (syscalls::setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &flag,
+ sizeof(flag))) {
+ THROW(HdfsNetworkException, "Set socket flag failed for remote "
+ "node %s: %s", remoteAddr.c_str(), GetSystemErrorInfo(errno));
+ }
+}
+
+void TcpSocketImpl::setLingerTimeout(int timeout) {
+ lingerTimeout = timeout;
+}
+
+void TcpSocketImpl::setLingerTimeoutInternal(int timeout) {
+ assert(-1 != sock);
+ struct linger l;
+ l.l_onoff = timeout > 0 ? true : false;
+ l.l_linger = timeout > 0 ? timeout : 0;
+
+ if (syscalls::setsockopt(sock, SOL_SOCKET, SO_LINGER, &l, sizeof(l))) {
+ THROW(HdfsNetworkException, "Set socket flag failed for remote "
+ "node %s: %s", remoteAddr.c_str(), GetSystemErrorInfo(errno));
+ }
+}
+
+void TcpSocketImpl::setSendTimeout(int timeout) {
+ assert(-1 != sock);
+ struct timeval timeo;
+ timeo.tv_sec = timeout / 1000;
+ timeo.tv_usec = (timeout % 1000) * 1000;
+
+ if (syscalls::setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO,
+ &timeo, sizeof(timeo))) {
+ THROW(HdfsNetworkException, "Set socket flag failed for remote "
+ "node %s: %s", remoteAddr.c_str(), GetSystemErrorInfo(errno));
+ }
+}
+
+void TcpSocketImpl::close() {
+ if (-1 != sock) {
+ syscalls::shutdown(sock, SHUT_RDWR);
+ syscalls::close(sock);
+ sock = -1;
+ }
+}
+
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/TcpSocket.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/TcpSocket.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/TcpSocket.h
new file mode 100644
index 0000000..ff90b20
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/network/TcpSocket.h
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_NETWORK_TCPSOCKET_H_
+#define _HDFS_LIBHDFS3_NETWORK_TCPSOCKET_H_
+
+#include "Socket.h"
+
+namespace hdfs {
+namespace internal {
+
+/**
+ * A tcp socket client
+ */
+class TcpSocketImpl: public Socket {
+public:
+ /**
+ * Construct a Socket object.
+ * @throw nothrow
+ */
+ TcpSocketImpl();
+
+ /**
+ * Destroy a TcpSocketImpl instance.
+ */
+ ~TcpSocketImpl();
+
+ /**
+ * Read data from socket.
+ * If there is nothing can be read, the caller will be blocked.
+ * @param buffer The buffer to store the data.
+ * @param size The size of bytes to be read.
+ * @return The size of data already read.
+ * @throw HdfsNetworkException
+ * @throw HdfsEndOfStream
+ */
+ int32_t read(char * buffer, int32_t size);
+
+ /**
+ * Read data from socket until get enough data.
+ * If there is not enough data can be read, the caller will be blocked.
+ * @param buffer The buffer to store the data.
+ * @param size The size of bytes to be read.
+ * @param timeout The timeout interval of this read operation, negative means infinite.
+ * @throw HdfsNetworkException
+ * @throw HdfsEndOfStream
+ * @throw HdfsTimeout
+ */
+ void readFully(char * buffer, int32_t size, int timeout);
+
+ /**
+ * Send data to socket.
+ * The caller will be blocked until send operation finished,
+ * but not guarantee that all data has been sent.
+ * @param buffer The data to be sent.
+ * @param size The size of bytes to be sent.
+ * @return The size of data already be sent.
+ * @throw HdfsNetworkException
+ */
+ int32_t write(const char * buffer, int32_t size);
+
+ /**
+ * Send all data to socket.
+ * The caller will be blocked until all data has been sent.
+ * @param buffer The data to be sent.
+ * @param size The size of bytes to be sent.
+ * @param timeout The timeout interval of this write operation, negative means infinite.
+ * @throw HdfsNetworkException
+ * @throw HdfsTimeout
+ */
+ void writeFully(const char * buffer, int32_t size, int timeout);
+
+ /**
+ * Connection to a tcp server.
+ * @param host The host of server.
+ * @param port The port of server.
+ * @param timeout The timeout interval of this read operation, negative means infinite.
+ * @throw HdfsNetworkException
+ * @throw HdfsTimeout
+ */
+ void connect(const char * host, int port, int timeout);
+
+ /**
+ * Connection to a tcp server.
+ * @param host The host of server.
+ * @param port The port of server.
+ * @param timeout The timeout interval of this read operation, negative means infinite.
+ * @throw HdfsNetworkException
+ * @throw HdfsTimeout
+ */
+ void connect(const char * host, const char * port, int timeout);
+
+ /**
+ * Connection to a tcp server.
+ * @param paddr The address of server.
+ * @param host The host of server used in error message.
+ * @param port The port of server used in error message.
+ * @param timeout The timeout interval of this read operation, negative means infinite.
+ * @throw HdfsNetworkException
+ * @throw HdfsTimeout
+ */
+ void connect(struct addrinfo * paddr, const char * host, const char * port,
+ int timeout);
+
+ /**
+ * Test if the socket can be read or written without blocking.
+ * @param read Test socket if it can be read.
+ * @param write Test socket if it can be written.
+ * @param timeout Time timeout interval of this operation, negative means infinite.
+ * @return Return true if the socket can be read or written without blocking, false on timeout.
+ * @throw HdfsNetworkException
+ * @throw HdfsTimeout
+ */
+ bool poll(bool read, bool write, int timeout);
+
+ /**
+ * Set socket no delay mode.
+ * @param enable If true, set socket into no delay mode, else delay mode.
+ * @throw HdfsNetworkException
+ */
+ void setNoDelay(bool enable);
+
+ /**
+ * Set socket blocking mode.
+ * @param enable If true, set socket into blocking mode, else non-block mode.
+ * @throw HdfsNetworkException
+ */
+ void setBlockMode(bool enable);
+
+ /**
+ * Set socket linger timeout
+ * @param timeout Linger timeout of the socket in millisecond, disable linger if it is less than 0.
+ * @throw HdfsNetworkException
+ */
+ void setLingerTimeout(int timeout);
+
+ /**
+ * Shutdown and close the socket.
+ * @throw nothrow
+ */
+ void close();
+
+private:
+ void setNoSigPipe();
+ void setLingerTimeoutInternal(int timeout);
+ void setSendTimeout(int timeout);
+
+private:
+ int sock;
+ int lingerTimeout;
+ std::string remoteAddr; //used for error message
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_NETWORK_TCPSOCKET_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcAuth.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcAuth.cc b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcAuth.cc
new file mode 100644
index 0000000..3f63ff6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcAuth.cc
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RpcAuth.h"
+
+#include "Exception.h"
+#include "ExceptionInternal.h"
+
+namespace hdfs {
+namespace internal {
+
+AuthMethod RpcAuth::ParseMethod(const std::string &str) {
+ if (0 == strcasecmp(str.c_str(), "SIMPLE")) {
+ return AuthMethod::SIMPLE;
+ } else if (0 == strcasecmp(str.c_str(), "KERBEROS")) {
+ return AuthMethod::KERBEROS;
+ } else if (0 == strcasecmp(str.c_str(), "TOKEN")) {
+ return AuthMethod::TOKEN;
+ } else {
+ THROW(InvalidParameter, "RpcAuth: Unknown auth mechanism type: %s",
+ str.c_str());
+ }
+}
+
+size_t RpcAuth::hash_value() const {
+ size_t values[] = { Int32Hasher(method), user.hash_value() };
+ return CombineHasher(values, sizeof(values) / sizeof(values[0]));
+}
+
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcAuth.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcAuth.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcAuth.h
new file mode 100644
index 0000000..e82c28d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcAuth.h
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCAUTH_H_
+#define _HDFS_LIBHDFS3_RPC_RPCAUTH_H_
+
+#include "client/UserInfo.h"
+#include "Hash.h"
+
+#include <string>
+
+namespace hdfs {
+namespace internal {
+
+enum AuthMethod {
+ SIMPLE = 80, KERBEROS = 81, //"GSSAPI"
+ TOKEN = 82, //"DIGEST-MD5"
+ UNKNOWN = 255
+};
+
+enum AuthProtocol {
+ NONE = 0, SASL = -33
+};
+
+class RpcAuth {
+public:
+ RpcAuth() :
+ method(SIMPLE) {
+ }
+
+ explicit RpcAuth(AuthMethod mech) :
+ method(mech) {
+ }
+
+ RpcAuth(const UserInfo &ui, AuthMethod mech) :
+ method(mech), user(ui) {
+ }
+
+ AuthProtocol getProtocol() const {
+ return method == SIMPLE ? AuthProtocol::NONE : AuthProtocol::SASL;
+ }
+
+ const UserInfo &getUser() const {
+ return user;
+ }
+
+ UserInfo &getUser() {
+ return user;
+ }
+
+ void setUser(const UserInfo &user) {
+ this->user = user;
+ }
+
+ AuthMethod getMethod() const {
+ return method;
+ }
+
+ size_t hash_value() const;
+
+ bool operator ==(const RpcAuth &other) const {
+ return method == other.method && user == other.user;
+ }
+
+public:
+ static AuthMethod ParseMethod(const std::string &str);
+
+private:
+ AuthMethod method;
+ UserInfo user;
+};
+
+}
+}
+
+HDFS_HASH_DEFINE(::hdfs::internal::RpcAuth);
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCAUTH_H_ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b2cc72f/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcCall.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcCall.h b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcCall.h
new file mode 100644
index 0000000..7c6e316
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/libhdfs3/src/rpc/RpcCall.h
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _HDFS_LIBHDFS3_RPC_RPCCALL_H_
+#define _HDFS_LIBHDFS3_RPC_RPCCALL_H_
+
+#include "google/protobuf/message.h"
+
+#include <string>
+
+namespace hdfs {
+namespace internal {
+
+class RpcCall {
+public:
+ RpcCall(bool idemp, std::string n, google::protobuf::Message *req,
+ google::protobuf::Message *resp) :
+ idempotent(idemp), name(n), request(req), response(resp) {
+ }
+
+ bool isIdempotent() const {
+ return idempotent;
+ }
+
+ const char *getName() const {
+ return name.c_str();
+ }
+
+ void setIdempotent(bool idempotent) {
+ this->idempotent = idempotent;
+ }
+
+ void setName(const std::string &name) {
+ this->name = name;
+ }
+
+ google::protobuf::Message *getRequest() {
+ return request;
+ }
+
+ void setRequest(google::protobuf::Message *request) {
+ this->request = request;
+ }
+
+ google::protobuf::Message *getResponse() {
+ return response;
+ }
+
+ void setResponse(google::protobuf::Message *response) {
+ this->response = response;
+ }
+
+private:
+ bool idempotent;
+ std::string name;
+ google::protobuf::Message *request;
+ google::protobuf::Message *response;
+};
+
+}
+}
+
+#endif /* _HDFS_LIBHDFS3_RPC_RPCCALL_H_ */