You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2015/12/03 13:31:34 UTC
[1/2] hadoop git commit: HDFS-9144. Refactoring libhdfs++ into
stateful/ephemeral objects. Contributed by Bob Hansen.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-8707 d6d056d3b -> a06bc8e12
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
new file mode 100644
index 0000000..a4e21de
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "reader/block_reader.h"
+#include "reader/datatransfer.h"
+#include "common/continuation/continuation.h"
+#include "common/continuation/asio.h"
+
+#include <future>
+
+namespace hdfs {
+
+hadoop::hdfs::OpReadBlockProto
+ReadBlockProto(const std::string &client_name, bool verify_checksum,
+ const hadoop::common::TokenProto *token,
+ const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+ uint64_t offset) {
+ using namespace hadoop::hdfs;
+ using namespace hadoop::common;
+ BaseHeaderProto *base_h = new BaseHeaderProto();
+ base_h->set_allocated_block(new ExtendedBlockProto(*block));
+ if (token) {
+ base_h->set_allocated_token(new TokenProto(*token));
+ }
+ ClientOperationHeaderProto *h = new ClientOperationHeaderProto();
+ h->set_clientname(client_name);
+ h->set_allocated_baseheader(base_h);
+
+ OpReadBlockProto p;
+ p.set_allocated_header(h);
+ p.set_offset(offset);
+ p.set_len(length);
+ p.set_sendchecksums(verify_checksum);
+ // TODO: p.set_allocated_cachingstrategy();
+ return p;
+}
+
+void BlockReaderImpl::AsyncRequestBlock(
+ const std::string &client_name,
+ const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+ uint64_t offset, const std::function<void(Status)> &handler) {
+ // The total number of bytes that we need to transfer from the DN is
+ // the amount that the user wants (bytesToRead), plus the padding at
+ // the beginning in order to chunk-align. Note that the DN may elect
+ // to send more than this amount if the read starts/ends mid-chunk.
+ bytes_to_read_ = length;
+
+ struct State {
+ std::string header;
+ hadoop::hdfs::OpReadBlockProto request;
+ hadoop::hdfs::BlockOpResponseProto response;
+ };
+
+ auto m = continuation::Pipeline<State>::Create();
+ State *s = &m->state();
+
+ s->header.insert(s->header.begin(),
+ {0, kDataTransferVersion, Operation::kReadBlock});
+ s->request = std::move(ReadBlockProto(client_name, options_.verify_checksum,
+ dn_->token_.get(), block, length, offset));
+
+ auto read_pb_message =
+ new continuation::ReadDelimitedPBMessageContinuation<AsyncStream, 16384>(
+ dn_, &s->response);
+
+ m->Push(asio_continuation::Write(dn_.get(), asio::buffer(s->header)))
+ .Push(asio_continuation::WriteDelimitedPBMessage(dn_, &s->request))
+ .Push(read_pb_message);
+
+ m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status;
+ if (stat.ok()) {
+ const auto &resp = s.response;
+ if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
+ if (resp.has_readopchecksuminfo()) {
+ const auto &checksum_info = resp.readopchecksuminfo();
+ chunk_padding_bytes_ = offset - checksum_info.chunkoffset();
+ }
+ state_ = kReadPacketHeader;
+ } else {
+ stat = Status::Error(s.response.message().c_str());
+ }
+ }
+ handler(stat);
+ });
+}
+
+Status BlockReaderImpl::RequestBlock(
+ const std::string &client_name,
+ const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+ uint64_t offset) {
+ auto stat = std::make_shared<std::promise<Status>>();
+ std::future<Status> future(stat->get_future());
+ AsyncRequestBlock(client_name, block, length, offset,
+ [stat](const Status &status) { stat->set_value(status); });
+ return future.get();
+}
+
+hadoop::hdfs::OpReadBlockProto
+ReadBlockProto(const std::string &client_name, bool verify_checksum,
+ const hadoop::common::TokenProto *token,
+ const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+ uint64_t offset);
+
+struct BlockReaderImpl::ReadPacketHeader
+ : continuation::Continuation {
+ ReadPacketHeader(BlockReaderImpl *parent) : parent_(parent) {}
+
+ virtual void Run(const Next &next) override {
+ parent_->packet_data_read_bytes_ = 0;
+ parent_->packet_len_ = 0;
+ auto handler = [next, this](const asio::error_code &ec, size_t) {
+ Status status;
+ if (ec) {
+ status = Status(ec.value(), ec.message().c_str());
+ } else {
+ parent_->packet_len_ = packet_length();
+ parent_->header_.Clear();
+ bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart],
+ header_length());
+ assert(v && "Failed to parse the header");
+ parent_->state_ = kReadChecksum;
+ }
+ next(status);
+ };
+
+ asio::async_read(*parent_->dn_, asio::buffer(buf_),
+ std::bind(&ReadPacketHeader::CompletionHandler, this,
+ std::placeholders::_1, std::placeholders::_2),
+ handler);
+ }
+
+private:
+ static const size_t kMaxHeaderSize = 512;
+ static const size_t kPayloadLenOffset = 0;
+ static const size_t kPayloadLenSize = sizeof(int32_t);
+ static const size_t kHeaderLenOffset = 4;
+ static const size_t kHeaderLenSize = sizeof(int16_t);
+ static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize;
+
+ BlockReaderImpl *parent_;
+ std::array<char, kMaxHeaderSize> buf_;
+
+ size_t packet_length() const {
+ return ntohl(*reinterpret_cast<const unsigned *>(&buf_[kPayloadLenOffset]));
+ }
+
+ size_t header_length() const {
+ return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset]));
+ }
+
+ size_t CompletionHandler(const asio::error_code &ec, size_t transferred) {
+ if (ec) {
+ return 0;
+ } else if (transferred < kHeaderStart) {
+ return kHeaderStart - transferred;
+ } else {
+ return kHeaderStart + header_length() - transferred;
+ }
+ }
+};
+
+struct BlockReaderImpl::ReadChecksum : continuation::Continuation {
+ ReadChecksum(BlockReaderImpl *parent) : parent_(parent) {}
+
+ virtual void Run(const Next &next) override {
+ auto parent = parent_;
+ if (parent->state_ != kReadChecksum) {
+ next(Status::OK());
+ return;
+ }
+
+ auto handler = [parent, next](const asio::error_code &ec, size_t) {
+ Status status;
+ if (ec) {
+ status = Status(ec.value(), ec.message().c_str());
+ } else {
+ parent->state_ =
+ parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
+ }
+ next(status);
+ };
+ parent->checksum_.resize(parent->packet_len_ - sizeof(int) -
+ parent->header_.datalen());
+ asio::async_read(*parent->dn_, asio::buffer(parent->checksum_), handler);
+ }
+
+private:
+ BlockReaderImpl *parent_;
+};
+
+struct BlockReaderImpl::ReadData : continuation::Continuation {
+ ReadData(BlockReaderImpl *parent,
+ std::shared_ptr<size_t> bytes_transferred,
+ const asio::mutable_buffers_1 &buf)
+ : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) {
+ buf_.begin();
+ }
+
+ ~ReadData() {
+ buf_.end();
+ }
+
+ virtual void Run(const Next &next) override {
+ auto handler =
+ [next, this](const asio::error_code &ec, size_t transferred) {
+ Status status;
+ if (ec) {
+ status = Status(ec.value(), ec.message().c_str());
+ }
+ *bytes_transferred_ += transferred;
+ parent_->bytes_to_read_ -= transferred;
+ parent_->packet_data_read_bytes_ += transferred;
+ if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
+ parent_->state_ = kReadPacketHeader;
+ }
+ next(status);
+ };
+
+ auto data_len =
+ parent_->header_.datalen() - parent_->packet_data_read_bytes_;
+ asio::async_read(*parent_->dn_, buf_, asio::transfer_exactly(data_len),
+ handler);
+ }
+
+private:
+ BlockReaderImpl *parent_;
+ std::shared_ptr<size_t> bytes_transferred_;
+ const asio::mutable_buffers_1 buf_;
+};
+
+struct BlockReaderImpl::ReadPadding : continuation::Continuation {
+ ReadPadding(BlockReaderImpl *parent)
+ : parent_(parent), padding_(parent->chunk_padding_bytes_),
+ bytes_transferred_(std::make_shared<size_t>(0)),
+ read_data_(new ReadData(
+ parent, bytes_transferred_, asio::buffer(padding_))) {}
+
+ virtual void Run(const Next &next) override {
+ if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) {
+ next(Status::OK());
+ return;
+ }
+
+ auto h = [next, this](const Status &status) {
+ if (status.ok()) {
+ assert(reinterpret_cast<const int &>(*bytes_transferred_) ==
+ parent_->chunk_padding_bytes_);
+ parent_->chunk_padding_bytes_ = 0;
+ parent_->state_ = kReadData;
+ }
+ next(status);
+ };
+ read_data_->Run(h);
+ }
+
+private:
+ BlockReaderImpl *parent_;
+ std::vector<char> padding_;
+ std::shared_ptr<size_t> bytes_transferred_;
+ std::shared_ptr<continuation::Continuation> read_data_;
+ ReadPadding(const ReadPadding &) = delete;
+ ReadPadding &operator=(const ReadPadding &) = delete;
+};
+
+
+struct BlockReaderImpl::AckRead : continuation::Continuation {
+ AckRead(BlockReaderImpl *parent) : parent_(parent) {}
+
+ virtual void Run(const Next &next) override {
+ if (parent_->bytes_to_read_ > 0) {
+ next(Status::OK());
+ return;
+ }
+
+ auto m =
+ continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create();
+ m->state().set_status(parent_->options_.verify_checksum
+ ? hadoop::hdfs::Status::CHECKSUM_OK
+ : hadoop::hdfs::Status::SUCCESS);
+
+ m->Push(
+ continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state()));
+
+ m->Run([this, next](const Status &status,
+ const hadoop::hdfs::ClientReadStatusProto &) {
+ if (status.ok()) {
+ parent_->state_ = BlockReaderImpl::kFinished;
+ }
+ next(status);
+ });
+ }
+
+private:
+ BlockReaderImpl *parent_;
+};
+
+void BlockReaderImpl::AsyncReadPacket(
+ const MutableBuffers &buffers,
+ const std::function<void(const Status &, size_t bytes_transferred)> &handler) {
+ assert(state_ != kOpen && "Not connected");
+
+ struct State {
+ std::shared_ptr<size_t> bytes_transferred;
+ };
+ auto m = continuation::Pipeline<State>::Create();
+ m->state().bytes_transferred = std::make_shared<size_t>(0);
+
+ m->Push(new ReadPacketHeader(this))
+ .Push(new ReadChecksum(this))
+ .Push(new ReadPadding(this))
+ .Push(new ReadData(
+ this, m->state().bytes_transferred, buffers))
+ .Push(new AckRead(this));
+
+ auto self = this->shared_from_this();
+ m->Run([self, handler](const Status &status, const State &state) {
+ handler(status, *state.bytes_transferred);
+ });
+}
+
+
+size_t
+BlockReaderImpl::ReadPacket(const MutableBuffers &buffers,
+ Status *status) {
+ size_t transferred = 0;
+ auto done = std::make_shared<std::promise<void>>();
+ auto future = done->get_future();
+ AsyncReadPacket(buffers,
+ [status, &transferred, done](const Status &stat, size_t t) {
+ *status = stat;
+ transferred = t;
+ done->set_value();
+ });
+ future.wait();
+ return transferred;
+}
+
+
+struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation {
+ RequestBlockContinuation(BlockReader *reader, const std::string &client_name,
+ const hadoop::hdfs::ExtendedBlockProto *block,
+ uint64_t length, uint64_t offset)
+ : reader_(reader), client_name_(client_name), length_(length),
+ offset_(offset) {
+ block_.CheckTypeAndMergeFrom(*block);
+ }
+
+ virtual void Run(const Next &next) override {
+ reader_->AsyncRequestBlock(client_name_, &block_, length_,
+ offset_, next);
+ }
+
+private:
+ BlockReader *reader_;
+ const std::string client_name_;
+ hadoop::hdfs::ExtendedBlockProto block_;
+ uint64_t length_;
+ uint64_t offset_;
+};
+
+struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation {
+ ReadBlockContinuation(BlockReader *reader, MutableBuffers buffer,
+ size_t *transferred)
+ : reader_(reader), buffer_(buffer),
+ buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) {
+ }
+
+ virtual void Run(const Next &next) override {
+ *transferred_ = 0;
+ next_ = next;
+ OnReadData(Status::OK(), 0);
+ }
+
+private:
+ BlockReader *reader_;
+ const MutableBuffers buffer_;
+ const size_t buffer_size_;
+ size_t *transferred_;
+ std::function<void(const Status &)> next_;
+
+ void OnReadData(const Status &status, size_t transferred) {
+ using std::placeholders::_1;
+ using std::placeholders::_2;
+ *transferred_ += transferred;
+ if (!status.ok()) {
+ next_(status);
+ } else if (*transferred_ >= buffer_size_) {
+ next_(status);
+ } else {
+ reader_->AsyncReadPacket(
+ asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
+ std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
+ }
+ }
+};
+
+void BlockReaderImpl::AsyncReadBlock(
+ const std::string & client_name,
+ const hadoop::hdfs::LocatedBlockProto &block,
+ size_t offset,
+ const MutableBuffers &buffers,
+ const std::function<void(const Status &, size_t)> handler) {
+
+ auto m = continuation::Pipeline<size_t>::Create();
+ size_t * bytesTransferred = &m->state();
+
+ size_t size = asio::buffer_size(buffers);
+
+ m->Push(new RequestBlockContinuation(this, client_name,
+ &block.b(), size, offset))
+ .Push(new ReadBlockContinuation(this, buffers, bytesTransferred));
+
+ m->Run([handler] (const Status &status,
+ const size_t totalBytesTransferred) {
+ handler(status, totalBytesTransferred);
+ });
+}
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
index 81636b9..140286b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h
@@ -19,7 +19,9 @@
#define BLOCK_READER_H_
#include "libhdfspp/status.h"
+#include "common/async_stream.h"
#include "datatransfer.pb.h"
+#include "connection/datanodeconnection.h"
#include <memory>
@@ -55,38 +57,73 @@ struct BlockReaderOptions {
: verify_checksum(true), encryption_scheme(EncryptionScheme::kNone) {}
};
-template <class Stream>
-class RemoteBlockReader
- : public std::enable_shared_from_this<RemoteBlockReader<Stream>> {
+/**
+ * Handles the operational state of request and reading a block (or portion of
+ * a block) from a DataNode.
+ *
+ * Threading model: not thread-safe.
+ * Lifecycle: should be created, used for a single read, then freed.
+ */
+class BlockReader {
public:
- explicit RemoteBlockReader(const BlockReaderOptions &options, Stream *stream)
- : stream_(stream), state_(kOpen), options_(options),
+ virtual void AsyncReadBlock(
+ const std::string & client_name,
+ const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
+ const MutableBuffers &buffers,
+ const std::function<void(const Status &, size_t)> handler) = 0;
+
+ virtual void AsyncReadPacket(
+ const MutableBuffers &buffers,
+ const std::function<void(const Status &, size_t bytes_transferred)> &handler) = 0;
+
+ virtual void AsyncRequestBlock(
+ const std::string &client_name,
+ const hadoop::hdfs::ExtendedBlockProto *block,
+ uint64_t length,
+ uint64_t offset,
+ const std::function<void(Status)> &handler) = 0;
+};
+
+class BlockReaderImpl
+ : public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> {
+public:
+ explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn)
+ : dn_(dn), state_(kOpen), options_(options),
chunk_padding_bytes_(0) {}
- template <class MutableBufferSequence, class ReadHandler>
- void async_read_some(const MutableBufferSequence &buffers,
- const ReadHandler &handler);
+ virtual void AsyncReadPacket(
+ const MutableBuffers &buffers,
+ const std::function<void(const Status &, size_t bytes_transferred)> &handler) override;
+
+ virtual void AsyncRequestBlock(
+ const std::string &client_name,
+ const hadoop::hdfs::ExtendedBlockProto *block,
+ uint64_t length,
+ uint64_t offset,
+ const std::function<void(Status)> &handler) override;
- template <class MutableBufferSequence>
- size_t read_some(const MutableBufferSequence &buffers, Status *status);
+ virtual void AsyncReadBlock(
+ const std::string & client_name,
+ const hadoop::hdfs::LocatedBlockProto &block, size_t offset,
+ const MutableBuffers &buffers,
+ const std::function<void(const Status &, size_t)> handler) override;
- Status connect(const std::string &client_name,
- const hadoop::common::TokenProto *token,
- const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
- uint64_t offset);
+ size_t ReadPacket(const MutableBuffers &buffers, Status *status);
- template <class ConnectHandler>
- void async_connect(const std::string &client_name,
- const hadoop::common::TokenProto *token,
- const hadoop::hdfs::ExtendedBlockProto *block,
- uint64_t length, uint64_t offset,
- const ConnectHandler &handler);
+ Status RequestBlock(
+ const std::string &client_name,
+ const hadoop::hdfs::ExtendedBlockProto *block,
+ uint64_t length,
+ uint64_t offset);
private:
+ struct RequestBlockContinuation;
+ struct ReadBlockContinuation;
+
struct ReadPacketHeader;
struct ReadChecksum;
struct ReadPadding;
- template <class MutableBufferSequence> struct ReadData;
+ struct ReadData;
struct AckRead;
enum State {
kOpen,
@@ -97,7 +134,7 @@ private:
kFinished,
};
- Stream *stream_;
+ std::shared_ptr<DataNodeConnection> dn_;
hadoop::hdfs::PacketHeaderProto header_;
State state_;
BlockReaderOptions options_;
@@ -109,6 +146,4 @@ private:
};
}
-#include "remote_block_reader_impl.h"
-
#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
index 511c2eb..8be9ef8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
@@ -19,6 +19,10 @@
#define LIB_READER_DATA_TRANSFER_H_
#include "common/sasl_authenticator.h"
+#include "common/async_stream.h"
+#include "connection/datanodeconnection.h"
+#include <memory>
+
namespace hdfs {
@@ -32,26 +36,32 @@ enum Operation {
kReadBlock = 81,
};
-template <class Stream> class DataTransferSaslStream {
+template <class Stream> class DataTransferSaslStream : public DataNodeConnection {
public:
- DataTransferSaslStream(Stream *stream, const std::string &username,
+ DataTransferSaslStream(std::shared_ptr<Stream> stream, const std::string &username,
const std::string &password)
: stream_(stream), authenticator_(username, password) {}
template <class Handler> void Handshake(const Handler &next);
- template <class MutableBufferSequence, class ReadHandler>
- void async_read_some(const MutableBufferSequence &buffers,
- ReadHandler &&handler);
+ void async_read_some(const MutableBuffers &buf,
+ std::function<void (const asio::error_code & error,
+ std::size_t bytes_transferred) > handler) override {
+ stream_->async_read_some(buf, handler);
+ }
- template <class ConstBufferSequence, class WriteHandler>
- void async_write_some(const ConstBufferSequence &buffers,
- WriteHandler &&handler);
+ void async_write_some(const ConstBuffers &buf,
+ std::function<void (const asio::error_code & error,
+ std::size_t bytes_transferred) > handler) override {
+ stream_->async_write_some(buf, handler);
+ }
+ void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override
+ {(void)handler; /*TODO: Handshaking goes here*/};
private:
DataTransferSaslStream(const DataTransferSaslStream &) = delete;
DataTransferSaslStream &operator=(const DataTransferSaslStream &) = delete;
- Stream *stream_;
+ std::shared_ptr<Stream> stream_;
DigestMD5Authenticator authenticator_;
struct ReadSaslMessage;
struct Authenticator;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
index 088b86e..3ca16e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h
@@ -70,7 +70,7 @@ private:
template <class Stream>
struct DataTransferSaslStream<Stream>::ReadSaslMessage
: continuation::Continuation {
- ReadSaslMessage(Stream *stream, std::string *data)
+ ReadSaslMessage(std::shared_ptr<Stream> stream, std::string *data)
: stream_(stream), data_(data), read_pb_(stream, &resp_) {}
virtual void Run(const Next &next) override {
@@ -87,7 +87,7 @@ struct DataTransferSaslStream<Stream>::ReadSaslMessage
}
private:
- Stream *stream_;
+ std::shared_ptr<Stream> stream_;
std::string *data_;
hadoop::hdfs::DataTransferEncryptorMessageProto resp_;
continuation::ReadDelimitedPBMessageContinuation<Stream, 1024> read_pb_;
@@ -97,7 +97,7 @@ template <class Stream>
template <class Handler>
void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
using ::hadoop::hdfs::DataTransferEncryptorMessageProto;
- using ::hdfs::continuation::Write;
+ using ::hdfs::asio_continuation::Write;
using ::hdfs::continuation::WriteDelimitedPBMessage;
static const int kMagicNumber = htonl(kDataTransferSasl);
@@ -109,7 +109,7 @@ void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
std::string resp0;
DataTransferEncryptorMessageProto req1;
std::string resp1;
- Stream *stream;
+ std::shared_ptr<Stream> stream;
};
auto m = continuation::Pipeline<State>::Create();
State *s = &m->state();
@@ -117,7 +117,7 @@ void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
DataTransferSaslStreamUtil::PrepareInitialHandshake(&s->req0);
- m->Push(Write(stream_, kMagicNumberBuffer))
+ m->Push(Write(stream_.get(), kMagicNumberBuffer))
.Push(WriteDelimitedPBMessage(stream_, &s->req0))
.Push(new ReadSaslMessage(stream_, &s->resp0))
.Push(new Authenticator(&authenticator_, &s->resp0, &s->req1))
@@ -126,19 +126,6 @@ void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
m->Run([next](const Status &status, const State &) { next(status); });
}
-template <class Stream>
-template <class MutableBufferSequence, class ReadHandler>
-void DataTransferSaslStream<Stream>::async_read_some(
- const MutableBufferSequence &buffers, ReadHandler &&handler) {
- stream_->async_read_some(buffers, handler);
-}
-
-template <class Stream>
-template <typename ConstBufferSequence, typename WriteHandler>
-void DataTransferSaslStream<Stream>::async_write_some(
- const ConstBufferSequence &buffers, WriteHandler &&handler) {
- stream_->async_write_some(buffers, handler);
-}
}
#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h
new file mode 100644
index 0000000..ad10165
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_READER_FILEINFO_H_
+#define LIB_READER_FILEINFO_H_
+
+#include "ClientNamenodeProtocol.pb.h"
+
+namespace hdfs {
+
+/**
+ * Information that is assumed to be unchanging about a file for the duration of
+ * the operations.
+ */
+struct FileInfo {
+ unsigned long long file_length_;
+ std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_;
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
deleted file mode 100644
index 68bc4ee..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "block_reader.h"
-
-namespace hdfs {
-
-hadoop::hdfs::OpReadBlockProto
-ReadBlockProto(const std::string &client_name, bool verify_checksum,
- const hadoop::common::TokenProto *token,
- const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
- uint64_t offset) {
- using namespace hadoop::hdfs;
- using namespace hadoop::common;
- BaseHeaderProto *base_h = new BaseHeaderProto();
- base_h->set_allocated_block(new ExtendedBlockProto(*block));
- if (token) {
- base_h->set_allocated_token(new TokenProto(*token));
- }
- ClientOperationHeaderProto *h = new ClientOperationHeaderProto();
- h->set_clientname(client_name);
- h->set_allocated_baseheader(base_h);
-
- OpReadBlockProto p;
- p.set_allocated_header(h);
- p.set_offset(offset);
- p.set_len(length);
- p.set_sendchecksums(verify_checksum);
- // TODO: p.set_allocated_cachingstrategy();
- return p;
-}
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
deleted file mode 100644
index 35c2ce4..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
+++ /dev/null
@@ -1,342 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_
-#define LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_
-
-#include "datatransfer.h"
-#include "common/continuation/asio.h"
-#include "common/continuation/protobuf.h"
-
-#include <asio/buffers_iterator.hpp>
-#include <asio/streambuf.hpp>
-#include <asio/write.hpp>
-
-#include <arpa/inet.h>
-
-#include <future>
-
-namespace hdfs {
-
-hadoop::hdfs::OpReadBlockProto
-ReadBlockProto(const std::string &client_name, bool verify_checksum,
- const hadoop::common::TokenProto *token,
- const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
- uint64_t offset);
-
-template <class Stream>
-template <class ConnectHandler>
-void RemoteBlockReader<Stream>::async_connect(
- const std::string &client_name, const hadoop::common::TokenProto *token,
- const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
- uint64_t offset, const ConnectHandler &handler) {
- // The total number of bytes that we need to transfer from the DN is
- // the amount that the user wants (bytesToRead), plus the padding at
- // the beginning in order to chunk-align. Note that the DN may elect
- // to send more than this amount if the read starts/ends mid-chunk.
- bytes_to_read_ = length;
-
- struct State {
- std::string header;
- hadoop::hdfs::OpReadBlockProto request;
- hadoop::hdfs::BlockOpResponseProto response;
- };
-
- auto m = continuation::Pipeline<State>::Create();
- State *s = &m->state();
-
- s->header.insert(s->header.begin(),
- {0, kDataTransferVersion, Operation::kReadBlock});
- s->request = std::move(ReadBlockProto(client_name, options_.verify_checksum,
- token, block, length, offset));
-
- auto read_pb_message =
- new continuation::ReadDelimitedPBMessageContinuation<Stream, 16384>(
- stream_, &s->response);
-
- m->Push(continuation::Write(stream_, asio::buffer(s->header)))
- .Push(continuation::WriteDelimitedPBMessage(stream_, &s->request))
- .Push(read_pb_message);
-
- m->Run([this, handler, offset](const Status &status, const State &s) {
- Status stat = status;
- if (stat.ok()) {
- const auto &resp = s.response;
- if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
- if (resp.has_readopchecksuminfo()) {
- const auto &checksum_info = resp.readopchecksuminfo();
- chunk_padding_bytes_ = offset - checksum_info.chunkoffset();
- }
- state_ = kReadPacketHeader;
- } else {
- stat = Status::Error(s.response.message().c_str());
- }
- }
- handler(stat);
- });
-}
-
-template <class Stream>
-struct RemoteBlockReader<Stream>::ReadPacketHeader
- : continuation::Continuation {
- ReadPacketHeader(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
-
- virtual void Run(const Next &next) override {
- parent_->packet_data_read_bytes_ = 0;
- parent_->packet_len_ = 0;
- auto handler = [next, this](const asio::error_code &ec, size_t) {
- Status status;
- if (ec) {
- status = Status(ec.value(), ec.message().c_str());
- } else {
- parent_->packet_len_ = packet_length();
- parent_->header_.Clear();
- bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart],
- header_length());
- assert(v && "Failed to parse the header");
- parent_->state_ = kReadChecksum;
- }
- next(status);
- };
-
- asio::async_read(*parent_->stream_, asio::buffer(buf_),
- std::bind(&ReadPacketHeader::CompletionHandler, this,
- std::placeholders::_1, std::placeholders::_2),
- handler);
- }
-
-private:
- static const size_t kMaxHeaderSize = 512;
- static const size_t kPayloadLenOffset = 0;
- static const size_t kPayloadLenSize = sizeof(int32_t);
- static const size_t kHeaderLenOffset = 4;
- static const size_t kHeaderLenSize = sizeof(int16_t);
- static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize;
-
- RemoteBlockReader<Stream> *parent_;
- std::array<char, kMaxHeaderSize> buf_;
-
- size_t packet_length() const {
- return ntohl(*reinterpret_cast<const unsigned *>(&buf_[kPayloadLenOffset]));
- }
-
- size_t header_length() const {
- return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset]));
- }
-
- size_t CompletionHandler(const asio::error_code &ec, size_t transferred) {
- if (ec) {
- return 0;
- } else if (transferred < kHeaderStart) {
- return kHeaderStart - transferred;
- } else {
- return kHeaderStart + header_length() - transferred;
- }
- }
-};
-
-template <class Stream>
-struct RemoteBlockReader<Stream>::ReadChecksum : continuation::Continuation {
- ReadChecksum(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
-
- virtual void Run(const Next &next) override {
- auto parent = parent_;
- if (parent->state_ != kReadChecksum) {
- next(Status::OK());
- return;
- }
-
- auto handler = [parent, next](const asio::error_code &ec, size_t) {
- Status status;
- if (ec) {
- status = Status(ec.value(), ec.message().c_str());
- } else {
- parent->state_ =
- parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
- }
- next(status);
- };
- parent->checksum_.resize(parent->packet_len_ - sizeof(int) -
- parent->header_.datalen());
- asio::async_read(*parent->stream_, asio::buffer(parent->checksum_),
- handler);
- }
-
-private:
- RemoteBlockReader<Stream> *parent_;
-};
-
-template <class Stream>
-struct RemoteBlockReader<Stream>::ReadPadding : continuation::Continuation {
- ReadPadding(RemoteBlockReader<Stream> *parent)
- : parent_(parent), padding_(parent->chunk_padding_bytes_),
- bytes_transferred_(std::make_shared<size_t>(0)),
- read_data_(new ReadData<asio::mutable_buffers_1>(
- parent, bytes_transferred_, asio::buffer(padding_))) {}
-
- virtual void Run(const Next &next) override {
- if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) {
- next(Status::OK());
- return;
- }
-
- auto h = [next, this](const Status &status) {
- if (status.ok()) {
- assert(reinterpret_cast<const int &>(*bytes_transferred_) ==
- parent_->chunk_padding_bytes_);
- parent_->chunk_padding_bytes_ = 0;
- parent_->state_ = kReadData;
- }
- next(status);
- };
- read_data_->Run(h);
- }
-
-private:
- RemoteBlockReader<Stream> *parent_;
- std::vector<char> padding_;
- std::shared_ptr<size_t> bytes_transferred_;
- std::shared_ptr<continuation::Continuation> read_data_;
- ReadPadding(const ReadPadding &) = delete;
- ReadPadding &operator=(const ReadPadding &) = delete;
-};
-
-template <class Stream>
-template <class MutableBufferSequence>
-struct RemoteBlockReader<Stream>::ReadData : continuation::Continuation {
- ReadData(RemoteBlockReader<Stream> *parent,
- std::shared_ptr<size_t> bytes_transferred,
- const MutableBufferSequence &buf)
- : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) {}
-
- virtual void Run(const Next &next) override {
- auto handler =
- [next, this](const asio::error_code &ec, size_t transferred) {
- Status status;
- if (ec) {
- status = Status(ec.value(), ec.message().c_str());
- }
- *bytes_transferred_ += transferred;
- parent_->bytes_to_read_ -= transferred;
- parent_->packet_data_read_bytes_ += transferred;
- if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
- parent_->state_ = kReadPacketHeader;
- }
- next(status);
- };
-
- auto data_len =
- parent_->header_.datalen() - parent_->packet_data_read_bytes_;
- async_read(*parent_->stream_, buf_, asio::transfer_exactly(data_len),
- handler);
- }
-
-private:
- RemoteBlockReader<Stream> *parent_;
- std::shared_ptr<size_t> bytes_transferred_;
- MutableBufferSequence buf_;
-};
-
-template <class Stream>
-struct RemoteBlockReader<Stream>::AckRead : continuation::Continuation {
- AckRead(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
-
- virtual void Run(const Next &next) override {
- if (parent_->bytes_to_read_ > 0) {
- next(Status::OK());
- return;
- }
-
- auto m =
- continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create();
- m->state().set_status(parent_->options_.verify_checksum
- ? hadoop::hdfs::Status::CHECKSUM_OK
- : hadoop::hdfs::Status::SUCCESS);
-
- m->Push(
- continuation::WriteDelimitedPBMessage(parent_->stream_, &m->state()));
-
- m->Run([this, next](const Status &status,
- const hadoop::hdfs::ClientReadStatusProto &) {
- if (status.ok()) {
- parent_->state_ = RemoteBlockReader<Stream>::kFinished;
- }
- next(status);
- });
- }
-
-private:
- RemoteBlockReader<Stream> *parent_;
-};
-
-template <class Stream>
-template <class MutableBufferSequence, class ReadHandler>
-void RemoteBlockReader<Stream>::async_read_some(
- const MutableBufferSequence &buffers, const ReadHandler &handler) {
- assert(state_ != kOpen && "Not connected");
-
- struct State {
- std::shared_ptr<size_t> bytes_transferred;
- };
- auto m = continuation::Pipeline<State>::Create();
- m->state().bytes_transferred = std::make_shared<size_t>(0);
-
- m->Push(new ReadPacketHeader(this))
- .Push(new ReadChecksum(this))
- .Push(new ReadPadding(this))
- .Push(new ReadData<MutableBufferSequence>(
- this, m->state().bytes_transferred, buffers))
- .Push(new AckRead(this));
-
- auto self = this->shared_from_this();
- m->Run([self, handler](const Status &status, const State &state) {
- handler(status, *state.bytes_transferred);
- });
-}
-
-template <class Stream>
-template <class MutableBufferSequence>
-size_t
-RemoteBlockReader<Stream>::read_some(const MutableBufferSequence &buffers,
- Status *status) {
- size_t transferred = 0;
- auto done = std::make_shared<std::promise<void>>();
- auto future = done->get_future();
- async_read_some(buffers,
- [status, &transferred, done](const Status &stat, size_t t) {
- *status = stat;
- transferred = t;
- done->set_value();
- });
- future.wait();
- return transferred;
-}
-
-template <class Stream>
-Status RemoteBlockReader<Stream>::connect(
- const std::string &client_name, const hadoop::common::TokenProto *token,
- const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
- uint64_t offset) {
- auto stat = std::make_shared<std::promise<Status>>();
- std::future<Status> future(stat->get_future());
- async_connect(client_name, token, block, length, offset,
- [stat](const Status &status) { stat->set_value(status); });
- return future.get();
-}
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
index 83721a7..29d455f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc
@@ -19,9 +19,6 @@
#include "rpc_connection.h"
#include "common/util.h"
-#include <openssl/rand.h>
-
-#include <sstream>
#include <future>
namespace hdfs {
@@ -83,15 +80,4 @@ Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req,
return future.get();
}
-std::string RpcEngine::GetRandomClientName() {
- unsigned char buf[6] = {
- 0,
- };
- RAND_pseudo_bytes(buf, sizeof(buf));
-
- std::stringstream ss;
- ss << "libhdfs++_"
- << Base64Encode(std::string(reinterpret_cast<char *>(buf), sizeof(buf)));
- return ss.str();
-}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
index 7d06141..cc5ab01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
@@ -32,7 +32,7 @@ include_directories(
${LIBHDFS_SRC_DIR}
${OS_DIR}
)
-add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfspp_wrapper.cc ${LIBHDFSPP_BINDING_C}/hdfs_cpp.cc)
+add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfspp_wrapper.cc ${LIBHDFSPP_BINDING_C}/hdfs.cc)
add_library(test_common OBJECT mock_connection.cc)
@@ -44,24 +44,20 @@ protobuf_generate_cpp(PROTO_TEST_SRCS PROTO_TEST_HDRS
)
add_executable(remote_block_reader_test remote_block_reader_test.cc $<TARGET_OBJECTS:test_common>)
-target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
+target_link_libraries(remote_block_reader_test reader proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
add_test(remote_block_reader remote_block_reader_test)
add_executable(sasl_digest_md5_test sasl_digest_md5_test.cc)
target_link_libraries(sasl_digest_md5_test common ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
add_test(sasl_digest_md5 sasl_digest_md5_test)
-add_executable(inputstream_test inputstream_test.cc)
-target_link_libraries(inputstream_test fs rpc reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
-add_test(inputstream inputstream_test)
-
include_directories(${CMAKE_CURRENT_BINARY_DIR})
add_executable(rpc_engine_test rpc_engine_test.cc ${PROTO_TEST_SRCS} ${PROTO_TEST_HDRS} $<TARGET_OBJECTS:test_common>)
target_link_libraries(rpc_engine_test rpc proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
add_test(rpc_engine rpc_engine_test)
add_executable(bad_datanode_test bad_datanode_test.cc)
-target_link_libraries(bad_datanode_test rpc reader proto fs bindings_c rpc proto common reader ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
+target_link_libraries(bad_datanode_test rpc reader proto fs bindings_c rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
add_test(bad_datanode bad_datanode_test)
add_executable(node_exclusion_test node_exclusion_test.cc)
@@ -73,5 +69,5 @@ target_link_libraries(configuration_test common gmock_main ${CMAKE_THREAD_LIBS_I
add_test(configuration configuration_test)
build_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static expect.c test_libhdfs_threaded.c ${OS_DIR}/thread.c)
-link_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static fs reader rpc proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY})
+link_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY})
add_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
index cf1fdbb..0f69195 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc
@@ -19,6 +19,8 @@
#include "fs/filesystem.h"
#include "fs/bad_datanode_tracker.h"
+#include "common/util.h"
+
#include <gmock/gmock.h>
using hadoop::common::TokenProto;
@@ -34,70 +36,140 @@ using ::testing::Return;
using namespace hdfs;
-class MockReader {
- public:
- virtual ~MockReader() {}
+class MockReader : public BlockReader {
+public:
MOCK_METHOD2(
- async_read_some,
+ AsyncReadPacket,
void(const asio::mutable_buffers_1 &,
const std::function<void(const Status &, size_t transferred)> &));
- MOCK_METHOD6(async_connect,
- void(const std::string &, TokenProto *, ExtendedBlockProto *,
- uint64_t, uint64_t,
- const std::function<void(const Status &)> &));
+ MOCK_METHOD5(AsyncRequestBlock,
+ void(const std::string &client_name,
+ const hadoop::hdfs::ExtendedBlockProto *block,
+ uint64_t length, uint64_t offset,
+ const std::function<void(Status)> &handler));
+
+ MOCK_METHOD5(AsyncReadBlock, void(
+ const std::string & client_name,
+ const hadoop::hdfs::LocatedBlockProto &block,
+ size_t offset,
+ const MutableBuffers &buffers,
+ const std::function<void(const Status &, size_t)> handler));
};
-template <class Trait>
-struct MockBlockReaderTrait {
- typedef MockReader Reader;
- struct State {
- MockReader reader_;
- size_t transferred_;
- Reader *reader() { return &reader_; }
- size_t *transferred() { return &transferred_; }
- const size_t *transferred() const { return &transferred_; }
- };
+class MockDNConnection : public DataNodeConnection, public std::enable_shared_from_this<MockDNConnection> {
+ void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override {
+ handler(Status::OK(), shared_from_this());
+ }
+
+ void async_read_some(const MutableBuffers &buf,
+ std::function<void (const asio::error_code & error,
+ std::size_t bytes_transferred) > handler) override {
+ (void)buf;
+ handler(asio::error::fault, 0);
+ }
- static continuation::Pipeline<State> *CreatePipeline(
- ::asio::io_service *, const DatanodeInfoProto &) {
- auto m = continuation::Pipeline<State>::Create();
- *m->state().transferred() = 0;
- Trait::InitializeMockReader(m->state().reader());
- return m;
+ void async_write_some(const ConstBuffers &buf,
+ std::function<void (const asio::error_code & error,
+ std::size_t bytes_transferred) > handler) override {
+ (void)buf;
+ handler(asio::error::fault, 0);
}
};
+
+class PartialMockFileHandle : public FileHandleImpl {
+ using FileHandleImpl::FileHandleImpl;
+public:
+ std::shared_ptr<MockReader> mock_reader_ = std::make_shared<MockReader>();
+protected:
+ std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
+ std::shared_ptr<DataNodeConnection> dn) override
+ {
+ (void) options; (void) dn;
+ assert(mock_reader_);
+ return mock_reader_;
+ }
+ std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
+ ::asio::io_service *io_service,
+ const ::hadoop::hdfs::DatanodeInfoProto & dn,
+ const hadoop::common::TokenProto * token) override {
+ (void) io_service; (void) dn; (void) token;
+ return std::make_shared<MockDNConnection>();
+ }
+
+
+};
+
+TEST(BadDataNodeTest, TestNoNodes) {
+ auto file_info = std::make_shared<struct FileInfo>();
+ file_info->blocks_.push_back(LocatedBlockProto());
+ LocatedBlockProto & block = file_info->blocks_[0];
+ ExtendedBlockProto *b = block.mutable_b();
+ b->set_poolid("");
+ b->set_blockid(1);
+ b->set_generationstamp(1);
+ b->set_numbytes(4096);
+
+ // Set up the one block to have one datanode holding it
+ DatanodeInfoProto *di = block.add_locs();
+ DatanodeIDProto *dnid = di->mutable_id();
+ dnid->set_datanodeuuid("foo");
+
+ char buf[4096] = {
+ 0,
+ };
+ IoServiceImpl io_service;
+ auto bad_node_tracker = std::make_shared<BadDataNodeTracker>();
+ bad_node_tracker->AddBadNode("foo");
+
+ PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, bad_node_tracker);
+ Status stat;
+ size_t read = 0;
+
+ // Exclude the one datanode with the data
+ is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), nullptr,
+ [&stat, &read](const Status &status, const std::string &, size_t transferred) {
+ stat = status;
+ read = transferred;
+ });
+
+ // Should fail with no resource available
+ ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again), stat.code());
+ ASSERT_EQ(0UL, read);
+}
+
TEST(BadDataNodeTest, RecoverableError) {
- LocatedBlocksProto blocks;
- LocatedBlockProto block;
- DatanodeInfoProto dn;
+ auto file_info = std::make_shared<struct FileInfo>();
+ file_info->blocks_.push_back(LocatedBlockProto());
+ LocatedBlockProto & block = file_info->blocks_[0];
+ ExtendedBlockProto *b = block.mutable_b();
+ b->set_poolid("");
+ b->set_blockid(1);
+ b->set_generationstamp(1);
+ b->set_numbytes(4096);
+
+ // Set up the one block to have one datanode holding it
+ DatanodeInfoProto *di = block.add_locs();
+ DatanodeIDProto *dnid = di->mutable_id();
+ dnid->set_datanodeuuid("foo");
+
char buf[4096] = {
0,
};
IoServiceImpl io_service;
- Options default_options;
- FileSystemImpl fs(&io_service, default_options);
auto tracker = std::make_shared<BadDataNodeTracker>();
- InputStreamImpl is(&fs, &blocks, tracker);
+ PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, tracker);
Status stat;
size_t read = 0;
- struct Trait {
- static void InitializeMockReader(MockReader *reader) {
- EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
- .WillOnce(InvokeArgument<5>(Status::OK()));
-
- EXPECT_CALL(*reader, async_read_some(_, _))
- // resource unavailable error
- .WillOnce(InvokeArgument<1>(
- Status::ResourceUnavailable(
- "Unable to get some resource, try again later"),
- sizeof(buf)));
- }
- };
+ EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))
+ // resource unavailable error
+ .WillOnce(InvokeArgument<4>(
+ Status::ResourceUnavailable("Unable to get some resource, try again later"), 0));
- is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
- "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
+
+ is.AsyncPreadSome(
+ 0, asio::buffer(buf, sizeof(buf)), nullptr,
[&stat, &read](const Status &status, const std::string &,
size_t transferred) {
stat = status;
@@ -108,7 +180,7 @@ TEST(BadDataNodeTest, RecoverableError) {
std::string failing_dn = "id_of_bad_datanode";
if (!stat.ok()) {
- if (InputStream::ShouldExclude(stat)) {
+ if (FileHandle::ShouldExclude(stat)) {
tracker->AddBadNode(failing_dn);
}
}
@@ -117,35 +189,37 @@ TEST(BadDataNodeTest, RecoverableError) {
}
TEST(BadDataNodeTest, InternalError) {
- LocatedBlocksProto blocks;
- LocatedBlockProto block;
- DatanodeInfoProto dn;
+ auto file_info = std::make_shared<struct FileInfo>();
+ file_info->blocks_.push_back(LocatedBlockProto());
+ LocatedBlockProto & block = file_info->blocks_[0];
+ ExtendedBlockProto *b = block.mutable_b();
+ b->set_poolid("");
+ b->set_blockid(1);
+ b->set_generationstamp(1);
+ b->set_numbytes(4096);
+
+ // Set up the one block to have one datanode holding it
+ DatanodeInfoProto *di = block.add_locs();
+ DatanodeIDProto *dnid = di->mutable_id();
+ dnid->set_datanodeuuid("foo");
+
char buf[4096] = {
0,
};
IoServiceImpl io_service;
- Options default_options;
auto tracker = std::make_shared<BadDataNodeTracker>();
- FileSystemImpl fs(&io_service, default_options);
- InputStreamImpl is(&fs, &blocks, tracker);
+ PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, tracker);
Status stat;
size_t read = 0;
- struct Trait {
- static void InitializeMockReader(MockReader *reader) {
- EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
- .WillOnce(InvokeArgument<5>(Status::OK()));
-
- EXPECT_CALL(*reader, async_read_some(_, _))
- // something bad happened on the DN, calling again isn't going to help
- .WillOnce(
- InvokeArgument<1>(Status::Exception("server_explosion_exception",
- "the server exploded"),
+ EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_))
+ // resource unavailable error
+ .WillOnce(InvokeArgument<4>(
+ Status::Exception("server_explosion_exception",
+ "the server exploded"),
sizeof(buf)));
- }
- };
- is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
- "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
+ is.AsyncPreadSome(
+ 0, asio::buffer(buf, sizeof(buf)), nullptr,
[&stat, &read](const Status &status, const std::string &,
size_t transferred) {
stat = status;
@@ -156,7 +230,7 @@ TEST(BadDataNodeTest, InternalError) {
std::string failing_dn = "id_of_bad_datanode";
if (!stat.ok()) {
- if (InputStream::ShouldExclude(stat)) {
+ if (FileHandle::ShouldExclude(stat)) {
tracker->AddBadNode(failing_dn);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc
deleted file mode 100644
index 786b846..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "fs/filesystem.h"
-#include "fs/bad_datanode_tracker.h"
-#include <gmock/gmock.h>
-
-using hadoop::common::TokenProto;
-using hadoop::hdfs::DatanodeInfoProto;
-using hadoop::hdfs::DatanodeIDProto;
-using hadoop::hdfs::ExtendedBlockProto;
-using hadoop::hdfs::LocatedBlockProto;
-using hadoop::hdfs::LocatedBlocksProto;
-
-using ::testing::_;
-using ::testing::InvokeArgument;
-using ::testing::Return;
-
-using namespace hdfs;
-
-namespace hdfs {
-
-class MockReader {
- public:
- virtual ~MockReader() {}
- MOCK_METHOD2(
- async_read_some,
- void(const asio::mutable_buffers_1 &,
- const std::function<void(const Status &, size_t transferred)> &));
-
- MOCK_METHOD6(async_connect,
- void(const std::string &, TokenProto *, ExtendedBlockProto *,
- uint64_t, uint64_t,
- const std::function<void(const Status &)> &));
-};
-
-template <class Trait>
-struct MockBlockReaderTrait {
- typedef MockReader Reader;
- struct State {
- MockReader reader_;
- size_t transferred_;
- Reader *reader() { return &reader_; }
- size_t *transferred() { return &transferred_; }
- const size_t *transferred() const { return &transferred_; }
- };
-
- static continuation::Pipeline<State> *CreatePipeline(
- ::asio::io_service *, const DatanodeInfoProto &) {
- auto m = continuation::Pipeline<State>::Create();
- *m->state().transferred() = 0;
- Trait::InitializeMockReader(m->state().reader());
- return m;
- }
-};
-}
-
-TEST(InputStreamTest, TestReadSingleTrunk) {
- LocatedBlocksProto blocks;
- LocatedBlockProto block;
- DatanodeInfoProto dn;
- char buf[4096] = {
- 0,
- };
- IoServiceImpl io_service;
- Options options;
- FileSystemImpl fs(&io_service, options);
- InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>());
- Status stat;
- size_t read = 0;
- struct Trait {
- static void InitializeMockReader(MockReader *reader) {
- EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
- .WillOnce(InvokeArgument<5>(Status::OK()));
-
- EXPECT_CALL(*reader, async_read_some(_, _))
- .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
- }
- };
-
- is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
- "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
- [&stat, &read](const Status &status, const std::string &,
- size_t transferred) {
- stat = status;
- read = transferred;
- });
- ASSERT_TRUE(stat.ok());
- ASSERT_EQ(sizeof(buf), read);
- read = 0;
-}
-
-TEST(InputStreamTest, TestReadMultipleTrunk) {
- LocatedBlocksProto blocks;
- LocatedBlockProto block;
- DatanodeInfoProto dn;
- char buf[4096] = {
- 0,
- };
- IoServiceImpl io_service;
- Options options;
- FileSystemImpl fs(&io_service, options);
- InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>());
- Status stat;
- size_t read = 0;
- struct Trait {
- static void InitializeMockReader(MockReader *reader) {
- EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
- .WillOnce(InvokeArgument<5>(Status::OK()));
-
- EXPECT_CALL(*reader, async_read_some(_, _))
- .Times(4)
- .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4));
- }
- };
-
- is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
- "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
- [&stat, &read](const Status &status, const std::string &,
- size_t transferred) {
- stat = status;
- read = transferred;
- });
- ASSERT_TRUE(stat.ok());
- ASSERT_EQ(sizeof(buf), read);
- read = 0;
-}
-
-TEST(InputStreamTest, TestReadError) {
- LocatedBlocksProto blocks;
- LocatedBlockProto block;
- DatanodeInfoProto dn;
- char buf[4096] = {
- 0,
- };
- IoServiceImpl io_service;
- Options options;
- FileSystemImpl fs(&io_service, options);
- InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>());
- Status stat;
- size_t read = 0;
- struct Trait {
- static void InitializeMockReader(MockReader *reader) {
- EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
- .WillOnce(InvokeArgument<5>(Status::OK()));
-
- EXPECT_CALL(*reader, async_read_some(_, _))
- .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
- .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
- .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
- .WillOnce(InvokeArgument<1>(Status::Error("error"), 0));
- }
- };
-
- is.AsyncReadBlock<MockBlockReaderTrait<Trait>>(
- "client", block, dn, 0, asio::buffer(buf, sizeof(buf)),
- [&stat, &read](const Status &status, const std::string &,
- size_t transferred) {
- stat = status;
- read = transferred;
- });
- ASSERT_FALSE(stat.ok());
- ASSERT_EQ(sizeof(buf) / 4 * 3, read);
- read = 0;
-}
-
-TEST(InputStreamTest, TestExcludeDataNode) {
- LocatedBlocksProto blocks;
- LocatedBlockProto *block = blocks.add_blocks();
- ExtendedBlockProto *b = block->mutable_b();
- b->set_poolid("");
- b->set_blockid(1);
- b->set_generationstamp(1);
- b->set_numbytes(4096);
-
- DatanodeInfoProto *di = block->add_locs();
- DatanodeIDProto *dnid = di->mutable_id();
- dnid->set_datanodeuuid("foo");
-
- char buf[4096] = {
- 0,
- };
- IoServiceImpl io_service;
- Options options;
- FileSystemImpl fs(&io_service, options);
- InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>());
- Status stat;
- size_t read = 0;
- struct Trait {
- static void InitializeMockReader(MockReader *reader) {
- EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _))
- .WillOnce(InvokeArgument<5>(Status::OK()));
-
- EXPECT_CALL(*reader, async_read_some(_, _))
- .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
- }
- };
-
- std::shared_ptr<NodeExclusionRule> exclude_set =
- std::make_shared<ExclusionSet>(std::set<std::string>({"foo"}));
- is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), exclude_set,
- [&stat, &read](const Status &status, const std::string &,
- size_t transferred) {
- stat = status;
- read = transferred;
- });
- ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again),
- stat.code());
- ASSERT_EQ(0UL, read);
-}
-
-int main(int argc, char *argv[]) {
- // The following line must be executed to initialize Google Mock
- // (and Google Test) before running the tests.
- ::testing::InitGoogleMock(&argc, argv);
- return RUN_ALL_TESTS();
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
index 8c0ef8c..4c15375 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h
@@ -18,6 +18,8 @@
#ifndef LIBHDFSPP_TEST_MOCK_CONNECTION_H_
#define LIBHDFSPP_TEST_MOCK_CONNECTION_H_
+#include "common/async_stream.h"
+
#include <asio/error_code.hpp>
#include <asio/buffer.hpp>
#include <asio/streambuf.hpp>
@@ -27,13 +29,15 @@
namespace hdfs {
-class MockConnectionBase {
+class MockConnectionBase : public AsyncStream{
public:
MockConnectionBase(::asio::io_service *io_service);
virtual ~MockConnectionBase();
typedef std::pair<asio::error_code, std::string> ProducerResult;
- template <class MutableBufferSequence, class Handler>
- void async_read_some(const MutableBufferSequence &buf, Handler &&handler) {
+
+ void async_read_some(const MutableBuffers &buf,
+ std::function<void (const asio::error_code & error,
+ std::size_t bytes_transferred) > handler) override {
if (produced_.size() == 0) {
ProducerResult r = Produce();
if (r.first) {
@@ -51,8 +55,9 @@ public:
io_service_->post(std::bind(handler, asio::error_code(), len));
}
- template <class ConstBufferSequence, class Handler>
- void async_write_some(const ConstBufferSequence &buf, Handler &&handler) {
+ void async_write_some(const ConstBuffers &buf,
+ std::function<void (const asio::error_code & error,
+ std::size_t bytes_transferred) > handler) override {
// CompletionResult res = OnWrite(buf);
io_service_->post(std::bind(handler, asio::error_code(), asio::buffer_size(buf)));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
index f54b14f..6f3122e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
@@ -21,6 +21,8 @@
#include "datatransfer.pb.h"
#include "common/util.h"
#include "reader/block_reader.h"
+#include "reader/datatransfer.h"
+#include "reader/fileinfo.h"
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
@@ -36,10 +38,14 @@ using ::hadoop::hdfs::DataTransferEncryptorMessageProto;
using ::hadoop::hdfs::ExtendedBlockProto;
using ::hadoop::hdfs::PacketHeaderProto;
using ::hadoop::hdfs::ReadOpChecksumInfoProto;
+using ::hadoop::hdfs::LocatedBlockProto;
+using ::hadoop::hdfs::LocatedBlocksProto;
using ::asio::buffer;
using ::asio::error_code;
using ::asio::mutable_buffers_1;
+using ::testing::_;
+using ::testing::InvokeArgument;
using ::testing::Return;
using std::make_pair;
using std::string;
@@ -49,12 +55,47 @@ namespace pbio = pb::io;
namespace hdfs {
-class MockDNConnection : public MockConnectionBase {
- public:
+class MockDNConnection : public MockConnectionBase, public DataNodeConnection{
+public:
MockDNConnection(::asio::io_service &io_service)
: MockConnectionBase(&io_service) {}
MOCK_METHOD0(Produce, ProducerResult());
+
+ MOCK_METHOD1(Connect, void(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)>));
+
+ void async_read_some(const MutableBuffers &buf,
+ std::function<void (const asio::error_code & error,
+ std::size_t bytes_transferred) > handler) override {
+ this->MockConnectionBase::async_read_some(buf, handler);
+ }
+
+ void async_write_some(const ConstBuffers &buf,
+ std::function<void (const asio::error_code & error,
+ std::size_t bytes_transferred) > handler) override {
+ this->MockConnectionBase::async_write_some(buf, handler);
+ }
+};
+
+// Mocks AsyncReadPacket and AsyncRequestBlock but not AsyncReadBlock, so we
+// can test the logic of AsyncReadBlock
+class PartialMockReader : public BlockReaderImpl {
+public:
+ PartialMockReader() :
+ BlockReaderImpl(BlockReaderOptions(), std::shared_ptr<DataNodeConnection>()) {};
+
+ MOCK_METHOD2(
+ AsyncReadPacket,
+ void(const asio::mutable_buffers_1 &,
+ const std::function<void(const Status &, size_t transferred)> &));
+
+ MOCK_METHOD5(AsyncRequestBlock,
+ void(const std::string &client_name,
+ const hadoop::hdfs::ExtendedBlockProto *block,
+ uint64_t length, uint64_t offset,
+ const std::function<void(Status)> &handler));
};
+
+
}
static inline string ToDelimitedString(const pb::MessageLite *msg) {
@@ -94,20 +135,102 @@ static inline std::pair<error_code, string> ProducePacket(
return std::make_pair(error_code(), std::move(payload));
}
+TEST(RemoteBlockReaderTest, TestReadSingleTrunk) {
+ auto file_info = std::make_shared<struct FileInfo>();
+ LocatedBlocksProto blocks;
+ LocatedBlockProto block;
+ char buf[4096] = {
+ 0,
+ };
+
+ Status stat;
+ size_t read = 0;
+ PartialMockReader reader;
+ EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _))
+ .WillOnce(InvokeArgument<4>(Status::OK()));
+ EXPECT_CALL(reader, AsyncReadPacket(_, _))
+ .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf)));
+
+ reader.AsyncReadBlock(
+ GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)),
+ [&stat, &read](const Status &status, size_t transferred) {
+ stat = status;
+ read = transferred;
+ });
+ ASSERT_TRUE(stat.ok());
+ ASSERT_EQ(sizeof(buf), read);
+ read = 0;
+}
+
+TEST(RemoteBlockReaderTest, TestReadMultipleTrunk) {
+ LocatedBlockProto block;
+ char buf[4096] = {
+ 0,
+ };
+ Status stat;
+ size_t read = 0;
+
+ PartialMockReader reader;
+ EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _))
+ .WillOnce(InvokeArgument<4>(Status::OK()));
+
+ EXPECT_CALL(reader, AsyncReadPacket(_, _))
+ .Times(4)
+ .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4));
+
+ reader.AsyncReadBlock(
+ GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)),
+ [&stat, &read](const Status &status, size_t transferred) {
+ stat = status;
+ read = transferred;
+ });
+ ASSERT_TRUE(stat.ok());
+ ASSERT_EQ(sizeof(buf), read);
+ read = 0;
+}
+
+TEST(RemoteBlockReaderTest, TestReadError) {
+ LocatedBlockProto block;
+ char buf[4096] = {
+ 0,
+ };
+ Status stat;
+ size_t read = 0;
+ PartialMockReader reader;
+ EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _))
+ .WillOnce(InvokeArgument<4>(Status::OK()));
+
+ EXPECT_CALL(reader, AsyncReadPacket(_, _))
+ .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+ .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+ .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4))
+ .WillOnce(InvokeArgument<1>(Status::Error("error"), 0));
+
+ reader.AsyncReadBlock(
+ GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)),
+ [&stat, &read](const Status &status, size_t transferred) {
+ stat = status;
+ read = transferred;
+ });
+ ASSERT_FALSE(stat.ok());
+ ASSERT_EQ(sizeof(buf) / 4 * 3, read);
+ read = 0;
+}
+
template <class Stream = MockDNConnection, class Handler>
-static std::shared_ptr<RemoteBlockReader<Stream>> ReadContent(
- Stream *conn, TokenProto *token, const ExtendedBlockProto &block,
- uint64_t length, uint64_t offset, const mutable_buffers_1 &buf,
- const Handler &handler) {
+static std::shared_ptr<BlockReaderImpl>
+ReadContent(std::shared_ptr<Stream> conn, const ExtendedBlockProto &block,
+ uint64_t length, uint64_t offset, const mutable_buffers_1 &buf,
+ const Handler &handler) {
BlockReaderOptions options;
- auto reader = std::make_shared<RemoteBlockReader<Stream>>(options, conn);
+ auto reader = std::make_shared<BlockReaderImpl>(options, conn);
Status result;
- reader->async_connect("libhdfs++", token, &block, length, offset,
+ reader->AsyncRequestBlock("libhdfs++", &block, length, offset,
[buf, reader, handler](const Status &stat) {
if (!stat.ok()) {
handler(stat, 0);
} else {
- reader->async_read_some(buf, handler);
+ reader->AsyncReadPacket(buf, handler);
}
});
return reader;
@@ -117,11 +240,11 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
static const size_t kChunkSize = 512;
static const string kChunkData(kChunkSize, 'a');
::asio::io_service io_service;
- MockDNConnection conn(io_service);
+ auto conn = std::make_shared<MockDNConnection>(io_service);
BlockOpResponseProto block_op_resp;
block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
- EXPECT_CALL(conn, Produce())
+ EXPECT_CALL(*conn, Produce())
.WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
.WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
@@ -130,16 +253,19 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
block.set_blockid(0);
block.set_generationstamp(0);
+ bool done = false;
std::string data(kChunkSize, 0);
- ReadContent(&conn, nullptr, block, kChunkSize, 0,
+ ReadContent(conn, block, kChunkSize, 0,
buffer(const_cast<char *>(data.c_str()), data.size()),
- [&data, &io_service](const Status &stat, size_t transferred) {
+ [&data, &io_service, &done](const Status &stat, size_t transferred) {
ASSERT_TRUE(stat.ok());
ASSERT_EQ(kChunkSize, transferred);
ASSERT_EQ(kChunkData, data);
+ done = true;
io_service.stop();
});
io_service.run();
+ ASSERT_TRUE(done);
}
TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
@@ -149,7 +275,7 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b');
::asio::io_service io_service;
- MockDNConnection conn(io_service);
+ auto conn = std::make_shared<MockDNConnection>(io_service);
BlockOpResponseProto block_op_resp;
ReadOpChecksumInfoProto *checksum_info =
block_op_resp.mutable_readopchecksuminfo();
@@ -159,7 +285,7 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
checksum->set_bytesperchecksum(512);
block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
- EXPECT_CALL(conn, Produce())
+ EXPECT_CALL(*conn, Produce())
.WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
.WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true)));
@@ -168,16 +294,20 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
block.set_blockid(0);
block.set_generationstamp(0);
+ bool done = false;
+
string data(kLength, 0);
- ReadContent(&conn, nullptr, block, data.size(), kOffset,
+ ReadContent(conn, block, data.size(), kOffset,
buffer(const_cast<char *>(data.c_str()), data.size()),
- [&data, &io_service](const Status &stat, size_t transferred) {
+ [&data, &io_service,&done](const Status &stat, size_t transferred) {
ASSERT_TRUE(stat.ok());
ASSERT_EQ(kLength, transferred);
ASSERT_EQ(kChunkData.substr(kOffset, kLength), data);
+ done = true;
io_service.stop();
});
io_service.run();
+ ASSERT_TRUE(done);
}
TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
@@ -185,11 +315,11 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
static const string kChunkData(kChunkSize, 'a');
::asio::io_service io_service;
- MockDNConnection conn(io_service);
+ auto conn = std::make_shared<MockDNConnection>(io_service);
BlockOpResponseProto block_op_resp;
block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
- EXPECT_CALL(conn, Produce())
+ EXPECT_CALL(*conn, Produce())
.WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
.WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false)))
.WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true)));
@@ -202,25 +332,22 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
string data(kChunkSize, 0);
mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size());
BlockReaderOptions options;
- auto reader =
- std::make_shared<RemoteBlockReader<MockDNConnection>>(options, &conn);
+ auto reader = std::make_shared<BlockReaderImpl>(options, conn);
Status result;
- reader->async_connect(
- "libhdfs++", nullptr, &block, data.size(), 0,
+ reader->AsyncRequestBlock(
+ "libhdfs++", &block, data.size(), 0,
[buf, reader, &data, &io_service](const Status &stat) {
ASSERT_TRUE(stat.ok());
- reader->async_read_some(
- buf, [buf, reader, &data, &io_service](const Status &stat,
- size_t transferred) {
+ reader->AsyncReadPacket(
+ buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) {
ASSERT_TRUE(stat.ok());
ASSERT_EQ(kChunkSize, transferred);
ASSERT_EQ(kChunkData, data);
data.clear();
data.resize(kChunkSize);
transferred = 0;
- reader->async_read_some(
- buf,
- [&data, &io_service](const Status &stat, size_t transferred) {
+ reader->AsyncReadPacket(
+ buf, [&data,&io_service](const Status &stat, size_t transferred) {
ASSERT_TRUE(stat.ok());
ASSERT_EQ(kChunkSize, transferred);
ASSERT_EQ(kChunkData, data);
@@ -234,12 +361,11 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
TEST(RemoteBlockReaderTest, TestSaslConnection) {
static const size_t kChunkSize = 512;
static const string kChunkData(kChunkSize, 'a');
- static const string kAuthPayload =
- "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/"
- "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\","
- "charset=utf-8,algorithm=md5-sess";
+ static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/"
+ "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\","
+ "charset=utf-8,algorithm=md5-sess";
::asio::io_service io_service;
- MockDNConnection conn(io_service);
+ auto conn = std::make_shared<MockDNConnection>(io_service);
BlockOpResponseProto block_op_resp;
block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
@@ -252,23 +378,23 @@ TEST(RemoteBlockReaderTest, TestSaslConnection) {
::hadoop::hdfs::
DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS);
- EXPECT_CALL(conn, Produce())
+ EXPECT_CALL(*conn, Produce())
.WillOnce(Return(Produce(ToDelimitedString(&sasl_resp0))))
.WillOnce(Return(Produce(ToDelimitedString(&sasl_resp1))))
.WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
.WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
- DataTransferSaslStream<MockDNConnection> sasl_conn(&conn, "foo", "bar");
+ auto sasl_conn = std::make_shared<DataTransferSaslStream<MockDNConnection> >(conn, "foo", "bar");
ExtendedBlockProto block;
block.set_poolid("foo");
block.set_blockid(0);
block.set_generationstamp(0);
std::string data(kChunkSize, 0);
- sasl_conn.Handshake([&sasl_conn, &block, &data, &io_service](
+ sasl_conn->Handshake([sasl_conn, &block, &data, &io_service](
const Status &s) {
ASSERT_TRUE(s.ok());
- ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0,
+ ReadContent(sasl_conn, block, kChunkSize, 0,
buffer(const_cast<char *>(data.c_str()), data.size()),
[&data, &io_service](const Status &stat, size_t transferred) {
ASSERT_TRUE(stat.ok());
[2/2] hadoop git commit: HDFS-9144. Refactoring libhdfs++ into
stateful/ephemeral objects. Contributed by Bob Hansen.
Posted by jh...@apache.org.
HDFS-9144. Refactoring libhdfs++ into stateful/ephemeral objects. Contributed by Bob Hansen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a06bc8e1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a06bc8e1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a06bc8e1
Branch: refs/heads/HDFS-8707
Commit: a06bc8e1232622aab0b7e4f26b547125de9ab300
Parents: d6d056d
Author: James <jh...@apache.org>
Authored: Thu Dec 3 07:30:22 2015 -0500
Committer: James <jh...@apache.org>
Committed: Thu Dec 3 07:30:22 2015 -0500
----------------------------------------------------------------------
.../native/libhdfspp/include/libhdfspp/hdfs.h | 38 +-
.../libhdfspp/include/libhdfspp/options.h | 11 +
.../main/native/libhdfspp/lib/CMakeLists.txt | 1 +
.../libhdfspp/lib/bindings/c/CMakeLists.txt | 2 +-
.../native/libhdfspp/lib/bindings/c/hdfs.cc | 37 +-
.../native/libhdfspp/lib/bindings/c/hdfs_cpp.cc | 216 ---------
.../native/libhdfspp/lib/bindings/c/hdfs_cpp.h | 105 -----
.../native/libhdfspp/lib/common/CMakeLists.txt | 19 +-
.../native/libhdfspp/lib/common/async_stream.h | 49 +++
.../libhdfspp/lib/common/continuation/asio.h | 4 +-
.../lib/common/continuation/protobuf.h | 21 +-
.../libhdfspp/lib/common/hdfs_public_api.cc | 16 -
.../main/native/libhdfspp/lib/common/options.cc | 3 +-
.../main/native/libhdfspp/lib/common/util.cc | 35 ++
.../src/main/native/libhdfspp/lib/common/util.h | 7 +
.../libhdfspp/lib/connection/CMakeLists.txt | 2 +
.../lib/connection/datanodeconnection.cc | 57 +++
.../lib/connection/datanodeconnection.h | 66 +++
.../main/native/libhdfspp/lib/fs/CMakeLists.txt | 2 +-
.../main/native/libhdfspp/lib/fs/filehandle.cc | 240 ++++++++++
.../main/native/libhdfspp/lib/fs/filehandle.h | 115 +++++
.../main/native/libhdfspp/lib/fs/filesystem.cc | 212 +++++++--
.../main/native/libhdfspp/lib/fs/filesystem.h | 137 +++---
.../main/native/libhdfspp/lib/fs/inputstream.cc | 48 --
.../native/libhdfspp/lib/fs/inputstream_impl.h | 207 ---------
.../native/libhdfspp/lib/reader/CMakeLists.txt | 2 +-
.../native/libhdfspp/lib/reader/block_reader.cc | 433 +++++++++++++++++++
.../native/libhdfspp/lib/reader/block_reader.h | 83 +++-
.../native/libhdfspp/lib/reader/datatransfer.h | 28 +-
.../libhdfspp/lib/reader/datatransfer_impl.h | 23 +-
.../main/native/libhdfspp/lib/reader/fileinfo.h | 36 ++
.../libhdfspp/lib/reader/remote_block_reader.cc | 46 --
.../lib/reader/remote_block_reader_impl.h | 342 ---------------
.../main/native/libhdfspp/lib/rpc/rpc_engine.cc | 14 -
.../main/native/libhdfspp/tests/CMakeLists.txt | 12 +-
.../native/libhdfspp/tests/bad_datanode_test.cc | 208 ++++++---
.../native/libhdfspp/tests/inputstream_test.cc | 232 ----------
.../native/libhdfspp/tests/mock_connection.h | 15 +-
.../libhdfspp/tests/remote_block_reader_test.cc | 202 +++++++--
39 files changed, 1802 insertions(+), 1524 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
index 06619fd..dfff20b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
@@ -24,6 +24,7 @@
#include <functional>
#include <memory>
#include <set>
+#include <iostream>
namespace hdfs {
@@ -68,10 +69,10 @@ class NodeExclusionRule {
};
/**
- * Applications opens an InputStream to read files in HDFS.
+ * Applications opens a FileHandle to read files in HDFS.
**/
-class InputStream {
- public:
+class FileHandle {
+public:
/**
* Read data from a specific position. The current implementation
* stops at the block boundary.
@@ -83,10 +84,14 @@ class InputStream {
* The handler returns the datanode that serves the block and the number of
* bytes has read.
**/
- virtual void PositionRead(
- void *buf, size_t nbyte, uint64_t offset,
- const std::function<void(const Status &, const std::string &, size_t)> &
- handler) = 0;
+ virtual void
+ PositionRead(void *buf, size_t nbyte, uint64_t offset,
+ const std::function<void(const Status &, size_t)> &handler) = 0;
+
+ virtual Status PositionRead(void *buf, size_t *nbyte, off_t offset) = 0;
+ virtual Status Read(void *buf, size_t *nbyte) = 0;
+ virtual Status Seek(off_t *offset, std::ios_base::seekdir whence) = 0;
+
/**
* Determine if a datanode should be excluded from future operations
* based on the return Status.
@@ -97,7 +102,7 @@ class InputStream {
**/
static bool ShouldExclude(const Status &status);
- virtual ~InputStream();
+ virtual ~FileHandle();
};
/**
@@ -114,15 +119,24 @@ class FileSystem {
IoService *io_service, const Options &options, const std::string &server,
const std::string &service,
const std::function<void(const Status &, FileSystem *)> &handler);
+
+ /* Synchronous call of New*/
+ static FileSystem *
+ New(IoService *io_service, const Options &options, const std::string &server,
+ const std::string &service);
+
/**
* Open a file on HDFS. The call issues an RPC to the NameNode to
* gather the locations of all blocks in the file and to return a
* new instance of the @ref InputStream object.
**/
- virtual void Open(
- const std::string &path,
- const std::function<void(const Status &, InputStream *)> &handler) = 0;
- virtual ~FileSystem();
+ virtual void
+ Open(const std::string &path,
+ const std::function<void(const Status &, FileHandle *)> &handler) = 0;
+ virtual Status Open(const std::string &path, FileHandle **handle) = 0;
+
+ virtual ~FileSystem() {};
+
};
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h
index 0abdfa0..79b9c54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h
@@ -31,6 +31,17 @@ struct Options {
int rpc_timeout;
/**
+ * Maximum number of retries for RPC operations
+ **/
+ const static int NO_RPC_RETRY = -1;
+ int max_rpc_retries;
+
+ /**
+ * Number of ms to wait between retry of RPC operations
+ **/
+ int rpc_retry_delay_ms;
+
+ /**
* Exclusion time for failed datanodes in milliseconds.
* Default: 60000
**/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt
index 434dc4e..c851597 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt
@@ -21,4 +21,5 @@ add_subdirectory(fs)
add_subdirectory(reader)
add_subdirectory(rpc)
add_subdirectory(proto)
+add_subdirectory(connection)
add_subdirectory(bindings)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt
index e170370..664518a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt
@@ -16,5 +16,5 @@
# under the License.
-add_library(bindings_c hdfs.cc hdfs_cpp.cc)
+add_library(bindings_c hdfs.cc)
add_dependencies(bindings_c fs rpc reader proto common fs rpc reader proto common)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
index 853e9d3..802b3ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
@@ -16,8 +16,10 @@
* limitations under the License.
*/
-#include "hdfs_cpp.h"
+#include "fs/filesystem.h"
+#include <hdfs/hdfs.h>
+#include <string>
#include <cstring>
#include <iostream>
@@ -25,15 +27,15 @@ using namespace hdfs;
/* Seperate the handles used by the C api from the C++ API*/
struct hdfs_internal {
- hdfs_internal(HadoopFileSystem *p) : filesystem_(p) {}
- hdfs_internal(std::unique_ptr<HadoopFileSystem> p)
+ hdfs_internal(FileSystem *p) : filesystem_(p) {}
+ hdfs_internal(std::unique_ptr<FileSystem> p)
: filesystem_(std::move(p)) {}
virtual ~hdfs_internal(){};
- HadoopFileSystem *get_impl() { return filesystem_.get(); }
- const HadoopFileSystem *get_impl() const { return filesystem_.get(); }
+ FileSystem *get_impl() { return filesystem_.get(); }
+ const FileSystem *get_impl() const { return filesystem_.get(); }
private:
- std::unique_ptr<HadoopFileSystem> filesystem_;
+ std::unique_ptr<FileSystem> filesystem_;
};
struct hdfsFile_internal {
@@ -102,17 +104,23 @@ bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) {
int hdfsFileIsOpenForRead(hdfsFile file) {
/* files can only be open for reads at the moment, do a quick check */
if (file) {
- return file->get_impl()->IsOpenForRead();
+ return true; // Update implementation when we get file writing
}
return false;
}
hdfsFS hdfsConnect(const char *nn, tPort port) {
- HadoopFileSystem *fs = new HadoopFileSystem();
- Status stat = fs->Connect(nn, port);
- if (!stat.ok()) {
+ std::string port_as_string = std::to_string(port);
+ IoService * io_service = IoService::New();
+ FileSystem *fs = FileSystem::New(io_service, Options(), nn, port_as_string);
+ if (!fs) {
ReportError(ENODEV, "Unable to connect to NameNode.");
- delete fs;
+
+ // FileSystem's ctor might take ownership of the io_service; if it does,
+ // it will null out the pointer
+ if (io_service)
+ delete io_service;
+
return nullptr;
}
return new hdfs_internal(fs);
@@ -139,7 +147,7 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
return nullptr;
}
FileHandle *f = nullptr;
- Status stat = fs->get_impl()->OpenFileForRead(path, &f);
+ Status stat = fs->get_impl()->Open(path, &f);
if (!stat.ok()) {
return nullptr;
}
@@ -150,7 +158,6 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
if (!CheckSystemAndHandle(fs, file)) {
return -1;
}
-
delete file;
return 0;
}
@@ -162,8 +169,8 @@ tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
}
size_t len = length;
- Status stat = file->get_impl()->Pread(buffer, &len, position);
- if (!stat.ok()) {
+ Status stat = file->get_impl()->PositionRead(buffer, &len, position);
+ if(!stat.ok()) {
return Error(stat);
}
return (tSize)len;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc
deleted file mode 100644
index 8872b1d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "hdfs_cpp.h"
-
-#include <cstdint>
-#include <cerrno>
-#include <string>
-#include <future>
-#include <memory>
-#include <thread>
-#include <vector>
-#include <set>
-#include <tuple>
-
-#include <hdfs/hdfs.h>
-#include "libhdfspp/hdfs.h"
-#include "libhdfspp/status.h"
-#include "fs/filesystem.h"
-#include "common/hdfs_public_api.h"
-
-namespace hdfs {
-
-FileHandle::FileHandle(InputStream *is) : input_stream_(is), offset_(0){}
-
-Status FileHandle::Pread(void *buf, size_t *nbyte, off_t offset) {
- auto callstate = std::make_shared<std::promise<std::tuple<Status, std::string, size_t>>>();
- std::future<std::tuple<Status, std::string, size_t>> future(callstate->get_future());
-
- /* wrap async call with promise/future to make it blocking */
- auto callback = [callstate](
- const Status &s, const std::string &dn, size_t bytes) {
- callstate->set_value(std::make_tuple(s, dn, bytes));
- };
-
- input_stream_->PositionRead(buf, *nbyte, offset, callback);
-
- /* wait for async to finish */
- auto returnstate = future.get();
- auto stat = std::get<0>(returnstate);
-
- if (!stat.ok()) {
- /* determine if DN gets marked bad */
- if (InputStream::ShouldExclude(stat)) {
- InputStreamImpl *impl =
- static_cast<InputStreamImpl *>(input_stream_.get());
- impl->bad_node_tracker_->AddBadNode(std::get<1>(returnstate));
- }
-
- return stat;
- }
- *nbyte = std::get<2>(returnstate);
- return Status::OK();
-}
-
-Status FileHandle::Read(void *buf, size_t *nbyte) {
- Status stat = Pread(buf, nbyte, offset_);
- if (!stat.ok()) {
- return stat;
- }
-
- offset_ += *nbyte;
- return Status::OK();
-}
-
-Status FileHandle::Seek(off_t *offset, std::ios_base::seekdir whence) {
- off_t new_offset = -1;
-
- switch (whence) {
- case std::ios_base::beg:
- new_offset = *offset;
- break;
- case std::ios_base::cur:
- new_offset = offset_ + *offset;
- break;
- case std::ios_base::end:
- new_offset = static_cast<InputStreamImpl *>(input_stream_.get())
- ->get_file_length() +
- *offset;
- break;
- default:
- /* unsupported */
- return Status::InvalidArgument("Invalid Seek whence argument");
- }
-
- if (!CheckSeekBounds(new_offset)) {
- return Status::InvalidArgument("Seek offset out of bounds");
- }
- offset_ = new_offset;
-
- *offset = offset_;
- return Status::OK();
-}
-
-/* return false if seek will be out of bounds */
-bool FileHandle::CheckSeekBounds(ssize_t desired_position) {
- ssize_t file_length =
- static_cast<InputStreamImpl *>(input_stream_.get())->get_file_length();
-
- if (desired_position < 0 || desired_position >= file_length) {
- return false;
- }
-
- return true;
-}
-
-bool FileHandle::IsOpenForRead() {
- /* for now just check if InputStream exists */
- if (!input_stream_) {
- return false;
- }
- return true;
-}
-
-HadoopFileSystem::~HadoopFileSystem() {
- /**
- * Note: IoService must be stopped before getting rid of worker threads.
- * Once worker threads are joined and deleted the service can be deleted.
- **/
-
- file_system_.reset(nullptr);
- service_->Stop();
- worker_threads_.clear();
- service_.reset(nullptr);
-}
-
-Status HadoopFileSystem::Connect(const char *nn, tPort port,
- unsigned int threads) {
- /* IoService::New can return nullptr */
- if (!service_) {
- return Status::Error("Null IoService");
- }
- /* spawn background threads for asio delegation */
- for (unsigned int i = 0; i < threads; i++) {
- AddWorkerThread();
- }
- /* synchronized */
- auto callstate = std::make_shared<std::promise<std::tuple<Status, FileSystem*>>>();
- std::future<std::tuple<Status, FileSystem*>> future(callstate->get_future());
-
- auto callback = [callstate](const Status &s, FileSystem *f) {
- callstate->set_value(std::make_tuple(s,f));
- };
-
- /* dummy options object until this is hooked up to HDFS-9117 */
- Options options_object;
- FileSystem::New(service_.get(), options_object, nn, std::to_string(port),
- callback);
-
- /* block until promise is set */
- auto returnstate = future.get();
- Status stat = std::get<0>(returnstate);
- FileSystem *fs = std::get<1>(returnstate);
-
- /* check and see if it worked */
- if (!stat.ok() || !fs) {
- service_->Stop();
- worker_threads_.clear();
- return stat;
- }
-
- file_system_ = std::unique_ptr<FileSystem>(fs);
- return stat;
-}
-
-int HadoopFileSystem::AddWorkerThread() {
- auto service_task = [](IoService *service) { service->Run(); };
- worker_threads_.push_back(
- WorkerPtr(new std::thread(service_task, service_.get())));
- return worker_threads_.size();
-}
-
-Status HadoopFileSystem::OpenFileForRead(const std::string &path,
- FileHandle **handle) {
- auto callstate = std::make_shared<std::promise<std::tuple<Status, InputStream*>>>();
- std::future<std::tuple<Status, InputStream*>> future(callstate->get_future());
-
- /* wrap async FileSystem::Open with promise to make it a blocking call */
- auto h = [callstate](const Status &s, InputStream *is) {
- callstate->set_value(std::make_tuple(s, is));
- };
-
- file_system_->Open(path, h);
-
- /* block until promise is set */
- auto returnstate = future.get();
- Status stat = std::get<0>(returnstate);
- InputStream *input_stream = std::get<1>(returnstate);
-
- if (!stat.ok()) {
- delete input_stream;
- return stat;
- }
- if (!input_stream) {
- return stat;
- }
-
- *handle = new FileHandle(input_stream);
- return stat;
-}
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h
deleted file mode 100644
index b36ee8f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef LIBHDFSPP_BINDINGS_HDFSCPP_H
-#define LIBHDFSPP_BINDINGS_HDFSCPP_H
-
-#include <cstdint>
-#include <thread>
-#include <vector>
-#include <mutex>
-#include <chrono>
-#include <iostream>
-
-#include "libhdfspp/hdfs.h"
-#include "fs/bad_datanode_tracker.h"
-#include <hdfs/hdfs.h>
-
-namespace hdfs {
-
-/**
- * Implement a very simple 'it just works' interface in C++
- * that provides posix-like file operations + extra stuff for hadoop.
- * Then provide very thin C wrappers over each method.
- */
-
-class HadoopFileSystem;
-
-class FileHandle {
- public:
- virtual ~FileHandle(){};
- /**
- * Note: The nbyte argument for Read and Pread as well as the
- * offset argument for Seek are in/out parameters.
- *
- * For Read and Pread the value referenced by nbyte should
- * be set to the number of bytes to read. Before returning
- * the value referenced will be set by the callee to the number
- * of bytes that was successfully read.
- *
- * For Seek the value referenced by offset should be the number
- * of bytes to shift from the specified whence position. The
- * referenced value will be set to the new offset before returning.
- **/
- Status Pread(void *buf, size_t *nbyte, off_t offset);
- Status Read(void *buf, size_t *nbyte);
- Status Seek(off_t *offset, std::ios_base::seekdir whence);
- bool IsOpenForRead();
-
- private:
- /* handle should only be created by fs */
- friend class HadoopFileSystem;
- FileHandle(InputStream *is);
- bool CheckSeekBounds(ssize_t desired_position);
- std::unique_ptr<InputStream> input_stream_;
- off_t offset_;
-};
-
-class HadoopFileSystem {
- public:
- HadoopFileSystem() : service_(IoService::New()) {}
- virtual ~HadoopFileSystem();
-
- /* attempt to connect to namenode, return false on failure */
- Status Connect(const char *nn, tPort port, unsigned int threads = 1);
-
- /* how many worker threads are servicing asio requests */
- int WorkerThreadCount() { return worker_threads_.size(); }
-
- /* add a new thread to handle asio requests, return number of threads in pool
- */
- int AddWorkerThread();
-
- Status OpenFileForRead(const std::string &path, FileHandle **handle);
-
- private:
- std::unique_ptr<IoService> service_;
- /* std::thread needs to join before deletion */
- struct WorkerDeleter {
- void operator()(std::thread *t) {
- t->join();
- delete t;
- }
- };
- typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
- std::vector<WorkerPtr> worker_threads_;
- std::unique_ptr<FileSystem> file_system_;
-};
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
index 8dbcd03..0d3752a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
@@ -1 +1,18 @@
-add_library(common base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc)
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_library(common base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc util.cc)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h
new file mode 100644
index 0000000..575904c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIB_COMMON_ASYNC_STREAM_H_
+#define LIB_COMMON_ASYNC_STREAM_H_
+
+#include <asio.hpp>
+
+namespace hdfs {
+
+typedef asio::mutable_buffers_1 MutableBuffers;
+typedef asio::const_buffers_1 ConstBuffers;
+
+/*
+ * asio-compatible stream implementation.
+ *
+ * Lifecycle: should be managed using std::shared_ptr so the object can be
+ * handed from consumer to consumer
+ * Threading model: async_read_some and async_write_some are not thread-safe.
+ */
+class AsyncStream {
+public:
+ virtual void async_read_some(const MutableBuffers &buf,
+ std::function<void (const asio::error_code & error,
+ std::size_t bytes_transferred) > handler) = 0;
+
+ virtual void async_write_some(const ConstBuffers &buf,
+ std::function<void (const asio::error_code & error,
+ std::size_t bytes_transferred) > handler) = 0;
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
index 5630934..a5a0446 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
@@ -29,7 +29,9 @@
#include <asio/ip/tcp.hpp>
namespace hdfs {
-namespace continuation {
+namespace asio_continuation {
+
+using namespace continuation;
template <class Stream, class MutableBufferSequence>
class ReadContinuation : public Continuation {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
index 08caf0d..54caeed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
@@ -33,7 +33,7 @@ namespace continuation {
template <class Stream, size_t MaxMessageSize = 512>
struct ReadDelimitedPBMessageContinuation : public Continuation {
- ReadDelimitedPBMessageContinuation(Stream *stream,
+ ReadDelimitedPBMessageContinuation(std::shared_ptr<Stream> stream,
::google::protobuf::MessageLite *msg)
: stream_(stream), msg_(msg) {}
@@ -56,8 +56,8 @@ struct ReadDelimitedPBMessageContinuation : public Continuation {
}
next(status);
};
- asio::async_read(
- *stream_, asio::buffer(buf_),
+ asio::async_read(*stream_,
+ asio::buffer(buf_),
std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this,
std::placeholders::_1, std::placeholders::_2),
handler);
@@ -82,14 +82,14 @@ private:
return offset ? len + offset - transferred : 1;
}
- Stream *stream_;
+ std::shared_ptr<Stream> stream_;
::google::protobuf::MessageLite *msg_;
std::array<char, MaxMessageSize> buf_;
};
template <class Stream>
struct WriteDelimitedPBMessageContinuation : Continuation {
- WriteDelimitedPBMessageContinuation(Stream *stream,
+ WriteDelimitedPBMessageContinuation(std::shared_ptr<Stream> stream,
const google::protobuf::MessageLite *msg)
: stream_(stream), msg_(msg) {}
@@ -101,28 +101,25 @@ struct WriteDelimitedPBMessageContinuation : Continuation {
pbio::CodedOutputStream os(&ss);
os.WriteVarint32(size);
msg_->SerializeToCodedStream(&os);
- write_coroutine_ =
- std::shared_ptr<Continuation>(Write(stream_, asio::buffer(buf_)));
- write_coroutine_->Run([next](const Status &stat) { next(stat); });
+ asio::async_write(*stream_, asio::buffer(buf_), [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); } );
}
private:
- Stream *stream_;
+ std::shared_ptr<Stream> stream_;
const google::protobuf::MessageLite *msg_;
std::string buf_;
- std::shared_ptr<Continuation> write_coroutine_;
};
template <class Stream, size_t MaxMessageSize = 512>
static inline Continuation *
-ReadDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
+ReadDelimitedPBMessage(std::shared_ptr<Stream> stream, ::google::protobuf::MessageLite *msg) {
return new ReadDelimitedPBMessageContinuation<Stream, MaxMessageSize>(stream,
msg);
}
template <class Stream>
static inline Continuation *
-WriteDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
+WriteDelimitedPBMessage(std::shared_ptr<Stream> stream, ::google::protobuf::MessageLite *msg) {
return new WriteDelimitedPBMessageContinuation<Stream>(stream, msg);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc
index 37408de..0251ce5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc
@@ -24,20 +24,4 @@ IoService::~IoService() {}
IoService *IoService::New() { return new IoServiceImpl(); }
-bool InputStream::ShouldExclude(const Status &s) {
- if (s.ok()) {
- return false;
- }
-
- switch (s.code()) {
- /* client side resource exhaustion */
- case Status::kResourceUnavailable:
- return false;
- case Status::kInvalidArgument:
- case Status::kUnimplemented:
- case Status::kException:
- default:
- return true;
- }
-}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
index e71b6b3..9e1acbd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc
@@ -20,5 +20,6 @@
namespace hdfs {
-Options::Options() : rpc_timeout(30000), host_exclusion_duration(600000) {}
+Options::Options() : rpc_timeout(30000), max_rpc_retries(0),
+ rpc_retry_delay_ms(10000), host_exclusion_duration(600000) {}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
new file mode 100644
index 0000000..eaef2d0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/util.h"
+
+namespace hdfs {
+
+std::string GetRandomClientName() {
+ unsigned char buf[6] = {
+ 0,
+ };
+ RAND_pseudo_bytes(buf, sizeof(buf));
+
+ std::stringstream ss;
+ ss << "libhdfs++_"
+ << Base64Encode(std::string(reinterpret_cast<char *>(buf), sizeof(buf)));
+ return ss.str();
+}
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
index ff9f36c..a6acc4e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h
@@ -20,7 +20,10 @@
#include "libhdfspp/status.h"
+#include <sstream>
+
#include <asio/error_code.hpp>
+#include <openssl/rand.h>
#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/coded_stream.h>
@@ -53,6 +56,10 @@ static inline void ReadDelimitedPBMessage(
std::string Base64Encode(const std::string &src);
+/*
+ * Returns a new high-entropy client name
+ */
+std::string GetRandomClientName();
}
#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt
new file mode 100644
index 0000000..54ba96c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt
@@ -0,0 +1,2 @@
+add_library(connection datanodeconnection.cc)
+add_dependencies(connection proto)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
new file mode 100644
index 0000000..b053e7f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "datanodeconnection.h"
+#include "common/util.h"
+
+namespace hdfs {
+
+DataNodeConnection::~DataNodeConnection(){}
+DataNodeConnectionImpl::~DataNodeConnectionImpl(){}
+
+DataNodeConnectionImpl::DataNodeConnectionImpl(asio::io_service * io_service,
+ const ::hadoop::hdfs::DatanodeInfoProto &dn_proto,
+ const hadoop::common::TokenProto *token)
+{
+ using namespace ::asio::ip;
+
+ conn_.reset(new tcp::socket(*io_service));
+ auto datanode_addr = dn_proto.id();
+ endpoints_[0] = tcp::endpoint(address::from_string(datanode_addr.ipaddr()),
+ datanode_addr.xferport());
+ uuid_ = dn_proto.id().datanodeuuid();
+
+ if (token) {
+ token_.reset(new hadoop::common::TokenProto());
+ token_->CheckTypeAndMergeFrom(*token);
+ }
+}
+
+
+void DataNodeConnectionImpl::Connect(
+ std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) {
+ // Keep the DN from being freed until we're done
+ auto shared_this = shared_from_this();
+ asio::async_connect(*conn_, endpoints_.begin(), endpoints_.end(),
+ [shared_this, handler](const asio::error_code &ec, std::array<asio::ip::tcp::endpoint, 1>::iterator it) {
+ (void)it;
+ handler(ToStatus(ec), shared_this); });
+}
+
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
new file mode 100644
index 0000000..d5bbb92
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
+#define LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
+
+#include "common/hdfs_public_api.h"
+#include "common/async_stream.h"
+#include "ClientNamenodeProtocol.pb.h"
+
+#include "asio.hpp"
+
+namespace hdfs {
+
+class DataNodeConnection : public AsyncStream {
+public:
+ std::string uuid_;
+ std::unique_ptr<hadoop::common::TokenProto> token_;
+
+ virtual ~DataNodeConnection();
+ virtual void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) = 0;
+};
+
+
+class DataNodeConnectionImpl : public DataNodeConnection, public std::enable_shared_from_this<DataNodeConnectionImpl>{
+public:
+ std::unique_ptr<asio::ip::tcp::socket> conn_;
+ std::array<asio::ip::tcp::endpoint, 1> endpoints_;
+ std::string uuid_;
+
+ virtual ~DataNodeConnectionImpl();
+ DataNodeConnectionImpl(asio::io_service * io_service, const ::hadoop::hdfs::DatanodeInfoProto &dn_proto,
+ const hadoop::common::TokenProto *token);
+
+ void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override;
+
+ void async_read_some(const MutableBuffers &buf,
+ std::function<void (const asio::error_code & error,
+ std::size_t bytes_transferred) > handler) override {
+ conn_->async_read_some(buf, handler);
+ };
+
+ void async_write_some(const ConstBuffers &buf,
+ std::function<void (const asio::error_code & error,
+ std::size_t bytes_transferred) > handler) override {
+ conn_->async_write_some(buf, handler);
+ }
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
index 0870fb3..ae56b3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
@@ -1,2 +1,2 @@
-add_library(fs filesystem.cc inputstream.cc bad_datanode_tracker.cc)
+add_library(fs filesystem.cc filehandle.cc bad_datanode_tracker.cc)
add_dependencies(fs proto)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
new file mode 100644
index 0000000..8cf41ce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filehandle.h"
+#include "common/continuation/continuation.h"
+#include "connection/datanodeconnection.h"
+#include "reader/block_reader.h"
+
+#include <future>
+#include <tuple>
+
+namespace hdfs {
+
+using ::hadoop::hdfs::LocatedBlocksProto;
+
+FileHandle::~FileHandle() {}
+
+FileHandleImpl::FileHandleImpl(::asio::io_service *io_service, const std::string &client_name,
+ const std::shared_ptr<const struct FileInfo> file_info,
+ std::shared_ptr<BadDataNodeTracker> bad_data_nodes)
+ : io_service_(io_service), client_name_(client_name), file_info_(file_info),
+ bad_node_tracker_(bad_data_nodes), offset_(0) {
+}
+
+void FileHandleImpl::PositionRead(
+ void *buf, size_t nbyte, uint64_t offset,
+ const std::function<void(const Status &, size_t)>
+ &handler) {
+
+ auto callback = [this, handler](const Status &status,
+ const std::string &contacted_datanode,
+ size_t bytes_read) {
+ /* determine if DN gets marked bad */
+ if (ShouldExclude(status)) {
+ bad_node_tracker_->AddBadNode(contacted_datanode);
+ }
+
+ handler(status, bytes_read);
+ };
+
+ AsyncPreadSome(offset, asio::buffer(buf, nbyte), bad_node_tracker_, callback);
+}
+
+Status FileHandleImpl::PositionRead(void *buf, size_t *nbyte, off_t offset) {
+ auto callstate = std::make_shared<std::promise<std::tuple<Status, size_t>>>();
+ std::future<std::tuple<Status, size_t>> future(callstate->get_future());
+
+ /* wrap async call with promise/future to make it blocking */
+ auto callback = [callstate](const Status &s, size_t bytes) {
+ callstate->set_value(std::make_tuple(s,bytes));
+ };
+
+ PositionRead(buf, *nbyte, offset, callback);
+
+ /* wait for async to finish */
+ auto returnstate = future.get();
+ auto stat = std::get<0>(returnstate);
+
+ if (!stat.ok()) {
+ return stat;
+ }
+
+ *nbyte = std::get<1>(returnstate);
+ return stat;
+}
+
+Status FileHandleImpl::Read(void *buf, size_t *nbyte) {
+ Status stat = PositionRead(buf, nbyte, offset_);
+ if(!stat.ok()) {
+ return stat;
+ }
+
+ offset_ += *nbyte;
+ return Status::OK();
+}
+
+Status FileHandleImpl::Seek(off_t *offset, std::ios_base::seekdir whence) {
+ off_t new_offset = -1;
+
+ switch (whence) {
+ case std::ios_base::beg:
+ new_offset = *offset;
+ break;
+ case std::ios_base::cur:
+ new_offset = offset_ + *offset;
+ break;
+ case std::ios_base::end:
+ new_offset = file_info_->file_length_ + *offset;
+ break;
+ default:
+ /* unsupported */
+ return Status::InvalidArgument("Invalid Seek whence argument");
+ }
+
+ if(!CheckSeekBounds(new_offset)) {
+ return Status::InvalidArgument("Seek offset out of bounds");
+ }
+ offset_ = new_offset;
+
+ *offset = offset_;
+ return Status::OK();
+}
+
+/* return false if seek will be out of bounds */
+bool FileHandleImpl::CheckSeekBounds(ssize_t desired_position) {
+ ssize_t file_length = file_info_->file_length_;
+
+ if (desired_position < 0 || desired_position >= file_length) {
+ return false;
+ }
+
+ return true;
+}
+
+/*
+ * Note that this method must be thread-safe w.r.t. the unsafe operations occurring
+ * on the FileHandle
+ */
+void FileHandleImpl::AsyncPreadSome(
+ size_t offset, const MutableBuffers &buffers,
+ std::shared_ptr<NodeExclusionRule> excluded_nodes,
+ const std::function<void(const Status &, const std::string &, size_t)> handler) {
+ using ::hadoop::hdfs::DatanodeInfoProto;
+ using ::hadoop::hdfs::LocatedBlockProto;
+
+ /**
+ * Note: block and chosen_dn will end up pointing to things inside
+ * the blocks_ vector. They shouldn't be directly deleted.
+ **/
+ auto block = std::find_if(
+ file_info_->blocks_.begin(), file_info_->blocks_.end(), [offset](const LocatedBlockProto &p) {
+ return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
+ });
+
+ if (block == file_info_->blocks_.end()) {
+ handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0);
+ return;
+ }
+
+ /**
+ * If user supplies a rule use it, otherwise use the tracker.
+ * User is responsible for making sure one of them isn't null.
+ **/
+ std::shared_ptr<NodeExclusionRule> rule =
+ excluded_nodes != nullptr ? excluded_nodes : bad_node_tracker_;
+
+ auto datanodes = block->locs();
+ auto it = std::find_if(datanodes.begin(), datanodes.end(),
+ [rule](const DatanodeInfoProto &dn) {
+ return !rule->IsBadNode(dn.id().datanodeuuid());
+ });
+
+ if (it == datanodes.end()) {
+ handler(Status::ResourceUnavailable("No datanodes available"), "", 0);
+ return;
+ }
+
+ DatanodeInfoProto &chosen_dn = *it;
+
+ uint64_t offset_within_block = offset - block->offset();
+ uint64_t size_within_block = std::min<uint64_t>(
+ block->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
+
+ // This is where we will put the logic for re-using a DN connection; we can
+ // steal the FileHandle's dn and put it back when we're done
+ std::shared_ptr<DataNodeConnection> dn = CreateDataNodeConnection(io_service_, chosen_dn, nullptr /*token*/);
+ std::string dn_id = dn->uuid_;
+ std::string client_name = client_name_;
+
+ // Wrap the DN in a block reader to handle the state and logic of the
+ // block request protocol
+ std::shared_ptr<BlockReader> reader;
+ reader = CreateBlockReader(BlockReaderOptions(), dn);
+
+
+ auto read_handler = [reader, dn_id, handler](const Status & status, size_t transferred) {
+ handler(status, dn_id, transferred);
+ };
+
+ dn->Connect([handler,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name]
+ (Status status, std::shared_ptr<DataNodeConnection> dn) {
+ (void)dn;
+ if (status.ok()) {
+ reader->AsyncReadBlock(
+ client_name, *block, offset_within_block,
+ asio::buffer(buffers, size_within_block), read_handler);
+ } else {
+ handler(status, dn_id, 0);
+ }
+ });
+
+ return;
+}
+
+std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options,
+ std::shared_ptr<DataNodeConnection> dn)
+{
+ return std::make_shared<BlockReaderImpl>(options, dn);
+}
+
+std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection(
+ ::asio::io_service * io_service,
+ const ::hadoop::hdfs::DatanodeInfoProto & dn,
+ const hadoop::common::TokenProto * token) {
+ return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token);
+}
+
+bool FileHandle::ShouldExclude(const Status &s) {
+ if (s.ok()) {
+ return false;
+ }
+
+ switch (s.code()) {
+ /* client side resource exhaustion */
+ case Status::kResourceUnavailable:
+ return false;
+ case Status::kInvalidArgument:
+ case Status::kUnimplemented:
+ case Status::kException:
+ default:
+ return true;
+ }
+}
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
new file mode 100644
index 0000000..95b5869
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_LIB_FS_FILEHANDLE_H_
+#define LIBHDFSPP_LIB_FS_FILEHANDLE_H_
+
+#include "common/hdfs_public_api.h"
+#include "common/async_stream.h"
+#include "reader/fileinfo.h"
+
+#include "asio.hpp"
+#include "bad_datanode_tracker.h"
+#include "ClientNamenodeProtocol.pb.h"
+
+#include <mutex>
+#include <iostream>
+
+namespace hdfs {
+
+class BlockReader;
+class BlockReaderOptions;
+class DataNodeConnection;
+
+/*
+ * FileHandle: coordinates operations on a particular file in HDFS
+ *
+ * Threading model: not thread-safe; consumers and io_service should not call
+ * concurrently. PositionRead is the exceptions; they can be
+ * called concurrently and repeatedly.
+ * Lifetime: pointer returned to consumer by FileSystem::Open. Consumer is
+ * resonsible for freeing the object.
+ */
+class FileHandleImpl : public FileHandle {
+public:
+ FileHandleImpl(::asio::io_service *io_service, const std::string &client_name,
+ const std::shared_ptr<const struct FileInfo> file_info,
+ std::shared_ptr<BadDataNodeTracker> bad_data_nodes);
+
+ /*
+ * [Some day reliably] Reads a particular offset into the data file.
+ * On error, bytes_read returns the number of bytes successfully read; on
+ * success, bytes_read will equal nbyte
+ */
+ void PositionRead(
+ void *buf,
+ size_t nbyte,
+ uint64_t offset,
+ const std::function<void(const Status &status, size_t bytes_read)> &handler
+ ) override;
+
+ /**
+ * Note: The nbyte argument for Read and Pread as well as the
+ * offset argument for Seek are in/out parameters.
+ *
+ * For Read and Pread the value referenced by nbyte should
+ * be set to the number of bytes to read. Before returning
+ * the value referenced will be set by the callee to the number
+ * of bytes that was successfully read.
+ *
+ * For Seek the value referenced by offset should be the number
+ * of bytes to shift from the specified whence position. The
+ * referenced value will be set to the new offset before returning.
+ **/
+ Status PositionRead(void *buf, size_t *bytes_read, off_t offset) override;
+ Status Read(void *buf, size_t *nbyte) override;
+ Status Seek(off_t *offset, std::ios_base::seekdir whence) override;
+
+
+ /*
+ * Reads some amount of data into the buffer. Will attempt to find the best
+ * datanode and read data from it.
+ *
+ * If an error occurs during connection or transfer, the callback will be
+ * called with bytes_read equal to the number of bytes successfully transferred.
+ * If no data nodes can be found, status will be Status::ResourceUnavailable.
+ *
+ */
+ void AsyncPreadSome(size_t offset, const MutableBuffers &buffers,
+ std::shared_ptr<NodeExclusionRule> excluded_nodes,
+ const std::function<void(const Status &status,
+ const std::string &dn_id, size_t bytes_read)> handler);
+
+protected:
+ virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options,
+ std::shared_ptr<DataNodeConnection> dn);
+ virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection(
+ ::asio::io_service *io_service,
+ const ::hadoop::hdfs::DatanodeInfoProto & dn,
+ const hadoop::common::TokenProto * token);
+private:
+ ::asio::io_service * const io_service_;
+ const std::string client_name_;
+ const std::shared_ptr<const struct FileInfo> file_info_;
+ std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
+ bool CheckSeekBounds(ssize_t desired_position);
+ off_t offset_;
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
index e0c27ac..7404606 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -22,7 +22,10 @@
#include <asio/ip/tcp.hpp>
+#include <functional>
#include <limits>
+#include <future>
+#include <tuple>
namespace hdfs {
@@ -32,38 +35,17 @@ static const int kNamenodeProtocolVersion = 1;
using ::asio::ip::tcp;
-FileSystem::~FileSystem() {}
+/*****************************************************************************
+ * NAMENODE OPERATIONS
+ ****************************************************************************/
-void FileSystem::New(
- IoService *io_service, const Options &options, const std::string &server,
- const std::string &service,
- const std::function<void(const Status &, FileSystem *)> &handler) {
- FileSystemImpl *impl = new FileSystemImpl(io_service, options);
- impl->Connect(server, service, [impl, handler](const Status &stat) {
- if (stat.ok()) {
- handler(stat, impl);
- } else {
- delete impl;
- handler(stat, nullptr);
- }
- });
-}
-
-FileSystemImpl::FileSystemImpl(IoService *io_service, const Options &options)
- : io_service_(static_cast<IoServiceImpl *>(io_service)),
- engine_(&io_service_->io_service(), options,
- RpcEngine::GetRandomClientName(), kNamenodeProtocol,
- kNamenodeProtocolVersion),
- namenode_(&engine_),
- bad_node_tracker_(std::make_shared<BadDataNodeTracker>()) {}
-
-void FileSystemImpl::Connect(const std::string &server,
+void NameNodeOperations::Connect(const std::string &server,
const std::string &service,
- std::function<void(const Status &)> &&handler) {
- using namespace continuation;
+ std::function<void(const Status &)> &handler) {
+ using namespace asio_continuation;
typedef std::vector<tcp::endpoint> State;
auto m = Pipeline<State>::Create();
- m->Push(Resolve(&io_service_->io_service(), server, service,
+ m->Push(Resolve(io_service_, server, service,
std::back_inserter(m->state())))
.Push(Bind([this, m](const Continuation::Next &next) {
engine_.Connect(m->state().front(), next);
@@ -76,9 +58,9 @@ void FileSystemImpl::Connect(const std::string &server,
});
}
-void FileSystemImpl::Open(
- const std::string &path,
- const std::function<void(const Status &, InputStream *)> &handler) {
+void NameNodeOperations::GetBlockLocations(const std::string & path,
+ std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler)
+{
using ::hadoop::hdfs::GetBlockLocationsRequestProto;
using ::hadoop::hdfs::GetBlockLocationsResponseProto;
@@ -99,10 +81,174 @@ void FileSystemImpl::Open(
[this, s](const continuation::Continuation::Next &next) {
namenode_.GetBlockLocations(&s->req, s->resp, next);
}));
+
m->Run([this, handler](const Status &stat, const State &s) {
- handler(stat, stat.ok() ? new InputStreamImpl(this, &s.resp->locations(),
- bad_node_tracker_)
+ if (stat.ok()) {
+ auto file_info = std::make_shared<struct FileInfo>();
+ auto locations = s.resp->locations();
+
+ file_info->file_length_ = locations.filelength();
+
+ for (const auto &block : locations.blocks()) {
+ file_info->blocks_.push_back(block);
+ }
+
+ if (locations.has_lastblock() && locations.lastblock().b().numbytes()) {
+ file_info->blocks_.push_back(locations.lastblock());
+ }
+
+ handler(stat, file_info);
+ } else {
+ handler(stat, nullptr);
+ }
+ });
+}
+
+
+/*****************************************************************************
+ * FILESYSTEM BASE CLASS
+ ****************************************************************************/
+
+void FileSystem::New(
+ IoService *io_service, const Options &options, const std::string &server,
+ const std::string &service,
+ const std::function<void(const Status &, FileSystem *)> &handler) {
+ FileSystemImpl *impl = new FileSystemImpl(io_service, options);
+ impl->Connect(server, service, [impl, handler](const Status &stat) {
+ if (stat.ok()) {
+ handler(stat, impl);
+ } else {
+ delete impl;
+ handler(stat, nullptr);
+ }
+ });
+}
+
+FileSystem * FileSystem::New(
+ IoService *io_service, const Options &options, const std::string &server,
+ const std::string &service) {
+ auto callstate = std::make_shared<std::promise<std::tuple<Status, FileSystem *>>>();
+ std::future<std::tuple<Status, FileSystem *>> future(callstate->get_future());
+
+ auto callback = [callstate](const Status &s, FileSystem * fs) {
+ callstate->set_value(std::make_tuple(s, fs));
+ };
+
+ New(io_service, options, server, service, callback);
+
+ /* block until promise is set */
+ auto returnstate = future.get();
+
+ if (std::get<0>(returnstate).ok()) {
+ return std::get<1>(returnstate);
+ } else {
+ return nullptr;
+ }
+}
+
+/*****************************************************************************
+ * FILESYSTEM IMPLEMENTATION
+ ****************************************************************************/
+
+FileSystemImpl::FileSystemImpl(IoService *&io_service, const Options &options)
+ : io_service_(static_cast<IoServiceImpl *>(io_service)),
+ nn_(&io_service_->io_service(), options,
+ GetRandomClientName(), kNamenodeProtocol,
+ kNamenodeProtocolVersion),
+ client_name_(GetRandomClientName())
+{
+ // Poor man's move
+ io_service = nullptr;
+
+ /* spawn background threads for asio delegation */
+ unsigned int threads = 1 /* options.io_threads_, pending HDFS-9117 */;
+ for (unsigned int i = 0; i < threads; i++) {
+ AddWorkerThread();
+ }
+}
+
+FileSystemImpl::~FileSystemImpl() {
+ /**
+ * Note: IoService must be stopped before getting rid of worker threads.
+ * Once worker threads are joined and deleted the service can be deleted.
+ **/
+ io_service_->Stop();
+ worker_threads_.clear();
+ io_service_.reset(nullptr);
+}
+
+void FileSystemImpl::Connect(const std::string &server,
+ const std::string &service,
+ std::function<void(const Status &)> &&handler) {
+ /* IoService::New can return nullptr */
+ if (!io_service_) {
+ handler (Status::Error("Null IoService"));
+ }
+ nn_.Connect(server, service, handler);
+}
+
+Status FileSystemImpl::Connect(const std::string &server, const std::string &service) {
+ /* synchronized */
+ auto stat = std::make_shared<std::promise<Status>>();
+ std::future<Status> future = stat->get_future();
+
+ auto callback = [stat](const Status &s) {
+ stat->set_value(s);
+ };
+
+ Connect(server, service, callback);
+
+ /* block until promise is set */
+ auto s = future.get();
+
+ return s;
+}
+
+
+int FileSystemImpl::AddWorkerThread() {
+ auto service_task = [](IoService *service) { service->Run(); };
+ worker_threads_.push_back(
+ WorkerPtr(new std::thread(service_task, io_service_.get())));
+ return worker_threads_.size();
+}
+
+void FileSystemImpl::Open(
+ const std::string &path,
+ const std::function<void(const Status &, FileHandle *)> &handler) {
+
+ nn_.GetBlockLocations(path, [this, handler](const Status &stat, std::shared_ptr<const struct FileInfo> file_info) {
+ handler(stat, stat.ok() ? new FileHandleImpl(&io_service_->io_service(), client_name_, file_info, bad_node_tracker_)
: nullptr);
});
}
+
+Status FileSystemImpl::Open(const std::string &path,
+ FileHandle **handle) {
+ auto callstate = std::make_shared<std::promise<std::tuple<Status, FileHandle*>>>();
+ std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future());
+
+ /* wrap async FileSystem::Open with promise to make it a blocking call */
+ auto h = [callstate](const Status &s, FileHandle *is) {
+ callstate->set_value(std::make_tuple(s, is));
+ };
+
+ Open(path, h);
+
+ /* block until promise is set */
+ auto returnstate = future.get();
+ Status stat = std::get<0>(returnstate);
+ FileHandle *file_handle = std::get<1>(returnstate);
+
+ if (!stat.ok()) {
+ delete file_handle;
+ return stat;
+ }
+ if (!file_handle) {
+ return stat;
+ }
+
+ *handle = file_handle;
+ return stat;
+}
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
index dfe8b0c..cc8a8e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
@@ -18,75 +18,110 @@
#ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_
#define LIBHDFSPP_LIB_FS_FILESYSTEM_H_
+#include "filehandle.h"
#include "common/hdfs_public_api.h"
+#include "common/async_stream.h"
#include "libhdfspp/hdfs.h"
#include "fs/bad_datanode_tracker.h"
#include "rpc/rpc_engine.h"
+#include "reader/block_reader.h"
+#include "reader/fileinfo.h"
#include "ClientNamenodeProtocol.pb.h"
#include "ClientNamenodeProtocol.hrpc.inl"
+#include "asio.hpp"
+
+#include <thread>
+
namespace hdfs {
-class FileHandle;
-class HadoopFileSystem;
+/**
+ * NameNodeConnection: abstracts the details of communicating with a NameNode
+ * and the implementation of the communications protocol.
+ *
+ * Will eventually handle retry and failover.
+ *
+ * Threading model: thread-safe; all operations can be called concurrently
+ * Lifetime: owned by a FileSystemImpl
+ */
+class NameNodeOperations {
+public:
+ NameNodeOperations(::asio::io_service *io_service, const Options &options,
+ const std::string &client_name, const char *protocol_name,
+ int protocol_version) :
+ io_service_(io_service),
+ engine_(io_service, options, client_name, protocol_name, protocol_version),
+ namenode_(& engine_) {}
+
+ void Connect(const std::string &server,
+ const std::string &service,
+ std::function<void(const Status &)> &handler);
+
+ void GetBlockLocations(const std::string & path,
+ std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler);
+private:
+ ::asio::io_service * io_service_;
+ RpcEngine engine_;
+ ClientNamenodeProtocol namenode_;
+};
+
+/*
+ * FileSystem: The consumer's main point of interaction with the cluster as
+ * a whole.
+ *
+ * Initially constructed in a disconnected state; call Connect before operating
+ * on the FileSystem.
+ *
+ * All open files must be closed before the FileSystem is destroyed.
+ *
+ * Threading model: thread-safe for all operations
+ * Lifetime: pointer created for consumer who is responsible for deleting it
+ */
class FileSystemImpl : public FileSystem {
- public:
- FileSystemImpl(IoService *io_service, const Options &options);
+public:
+ FileSystemImpl(IoService *&io_service, const Options &options);
+ ~FileSystemImpl() override;
+
+ /* attempt to connect to namenode, return bad status on failure */
void Connect(const std::string &server, const std::string &service,
std::function<void(const Status &)> &&handler);
+ /* attempt to connect to namenode, return bad status on failure */
+ Status Connect(const std::string &server, const std::string &service);
+
+
virtual void Open(const std::string &path,
- const std::function<void(const Status &, InputStream *)> &
- handler) override;
- RpcEngine &rpc_engine() { return engine_; }
+ const std::function<void(const Status &, FileHandle *)>
+ &handler) override;
+ Status Open(const std::string &path, FileHandle **handle) override;
- private:
- IoServiceImpl *io_service_;
- RpcEngine engine_;
- ClientNamenodeProtocol namenode_;
- std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
-};
-class InputStreamImpl : public InputStream {
- public:
- InputStreamImpl(FileSystemImpl *fs,
- const ::hadoop::hdfs::LocatedBlocksProto *blocks,
- std::shared_ptr<BadDataNodeTracker> tracker);
- virtual void PositionRead(
- void *buf, size_t nbyte, uint64_t offset,
- const std::function<void(const Status &, const std::string &, size_t)> &
- handler) override;
- /**
- * If optional_rule_override is null then use the bad_datanode_tracker. If
- * non-null use the provided NodeExclusionRule to determine eligible
- * datanodes.
- **/
- template <class MutableBufferSequence, class Handler>
- void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers,
- std::shared_ptr<NodeExclusionRule> excluded_nodes,
- const Handler &handler);
-
- template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
- void AsyncReadBlock(const std::string &client_name,
- const hadoop::hdfs::LocatedBlockProto &block,
- const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset,
- const MutableBufferSequence &buffers,
- const Handler &handler);
- uint64_t get_file_length() const;
- private:
- FileSystemImpl *fs_;
- unsigned long long file_length_;
- std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_;
- template <class Reader>
- struct HandshakeContinuation;
- template <class Reader, class MutableBufferSequence>
- struct ReadBlockContinuation;
- struct RemoteBlockReaderTrait;
- friend class FileHandle;
+ /* add a new thread to handle asio requests, return number of threads in pool
+ */
+ int AddWorkerThread();
+
+ /* how many worker threads are servicing asio requests */
+ int WorkerThreadCount() { return worker_threads_.size(); }
+
+
+private:
+ std::unique_ptr<IoServiceImpl> io_service_;
+ NameNodeOperations nn_;
+ const std::string client_name_;
std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
+
+ struct WorkerDeleter {
+ void operator()(std::thread *t) {
+ t->join();
+ delete t;
+ }
+ };
+ typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
+ std::vector<WorkerPtr> worker_threads_;
+
};
-}
-#include "inputstream_impl.h"
+
+}
#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
deleted file mode 100644
index 0b78c93..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "filesystem.h"
-
-namespace hdfs {
-
-using ::hadoop::hdfs::LocatedBlocksProto;
-
-InputStream::~InputStream() {}
-
-InputStreamImpl::InputStreamImpl(FileSystemImpl *fs,
- const LocatedBlocksProto *blocks,
- std::shared_ptr<BadDataNodeTracker> tracker)
- : fs_(fs), file_length_(blocks->filelength()), bad_node_tracker_(tracker) {
- for (const auto &block : blocks->blocks()) {
- blocks_.push_back(block);
- }
-
- if (blocks->has_lastblock() && blocks->lastblock().b().numbytes()) {
- blocks_.push_back(blocks->lastblock());
- }
-}
-
-void InputStreamImpl::PositionRead(
- void *buf, size_t nbyte, uint64_t offset,
- const std::function<void(const Status &, const std::string &, size_t)> &
- handler) {
- AsyncPreadSome(offset, asio::buffer(buf, nbyte), bad_node_tracker_, handler);
-}
-
-uint64_t InputStreamImpl::get_file_length() const { return file_length_; }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
deleted file mode 100644
index 2f64d39..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef FS_INPUTSTREAM_IMPL_H_
-#define FS_INPUTSTREAM_IMPL_H_
-
-#include "reader/block_reader.h"
-
-#include "common/continuation/asio.h"
-#include "common/continuation/protobuf.h"
-
-#include <functional>
-#include <future>
-#include <type_traits>
-#include <algorithm>
-
-namespace hdfs {
-
-struct InputStreamImpl::RemoteBlockReaderTrait {
- typedef RemoteBlockReader<asio::ip::tcp::socket> Reader;
- struct State {
- std::unique_ptr<asio::ip::tcp::socket> conn_;
- std::shared_ptr<Reader> reader_;
- std::array<asio::ip::tcp::endpoint, 1> endpoints_;
- size_t transferred_;
- Reader *reader() { return reader_.get(); }
- size_t *transferred() { return &transferred_; }
- const size_t *transferred() const { return &transferred_; }
- };
- static continuation::Pipeline<State> *CreatePipeline(
- ::asio::io_service *io_service,
- const ::hadoop::hdfs::DatanodeInfoProto &dn) {
- using namespace ::asio::ip;
- auto m = continuation::Pipeline<State>::Create();
- auto &s = m->state();
- s.conn_.reset(new tcp::socket(*io_service));
- s.reader_ = std::make_shared<Reader>(BlockReaderOptions(), s.conn_.get());
- auto datanode = dn.id();
- s.endpoints_[0] = tcp::endpoint(address::from_string(datanode.ipaddr()),
- datanode.xferport());
-
- m->Push(continuation::Connect(s.conn_.get(), s.endpoints_.begin(),
- s.endpoints_.end()));
- return m;
- }
-};
-
-template <class Reader>
-struct InputStreamImpl::HandshakeContinuation : continuation::Continuation {
- HandshakeContinuation(Reader *reader, const std::string &client_name,
- const hadoop::common::TokenProto *token,
- const hadoop::hdfs::ExtendedBlockProto *block,
- uint64_t length, uint64_t offset)
- : reader_(reader),
- client_name_(client_name),
- length_(length),
- offset_(offset) {
- if (token) {
- token_.reset(new hadoop::common::TokenProto());
- token_->CheckTypeAndMergeFrom(*token);
- }
- block_.CheckTypeAndMergeFrom(*block);
- }
-
- virtual void Run(const Next &next) override {
- reader_->async_connect(client_name_, token_.get(), &block_, length_,
- offset_, next);
- }
-
- private:
- Reader *reader_;
- const std::string client_name_;
- std::unique_ptr<hadoop::common::TokenProto> token_;
- hadoop::hdfs::ExtendedBlockProto block_;
- uint64_t length_;
- uint64_t offset_;
-};
-
-template <class Reader, class MutableBufferSequence>
-struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation {
- ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer,
- size_t *transferred)
- : reader_(reader),
- buffer_(buffer),
- buffer_size_(asio::buffer_size(buffer)),
- transferred_(transferred) {
- static_assert(!std::is_reference<MutableBufferSequence>::value,
- "Buffer must not be a reference type");
- }
-
- virtual void Run(const Next &next) override {
- *transferred_ = 0;
- next_ = next;
- OnReadData(Status::OK(), 0);
- }
-
- private:
- Reader *reader_;
- const MutableBufferSequence buffer_;
- const size_t buffer_size_;
- size_t *transferred_;
- std::function<void(const Status &)> next_;
-
- void OnReadData(const Status &status, size_t transferred) {
- using std::placeholders::_1;
- using std::placeholders::_2;
- *transferred_ += transferred;
- if (!status.ok()) {
- next_(status);
- } else if (*transferred_ >= buffer_size_) {
- next_(status);
- } else {
- reader_->async_read_some(
- asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
- std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
- }
- }
-};
-
-template <class MutableBufferSequence, class Handler>
-void InputStreamImpl::AsyncPreadSome(
- size_t offset, const MutableBufferSequence &buffers,
- std::shared_ptr<NodeExclusionRule> excluded_nodes, const Handler &handler) {
- using ::hadoop::hdfs::DatanodeInfoProto;
- using ::hadoop::hdfs::LocatedBlockProto;
-
- /**
- * Note: block and chosen_dn will end up pointing to things inside
- * the blocks_ vector. They shouldn't be directly deleted.
- **/
- auto block = std::find_if(
- blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) {
- return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
- });
-
- if (block == blocks_.end()) {
- handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0);
- return;
- }
-
- /**
- * If user supplies a rule use it, otherwise use the tracker.
- * User is responsible for making sure one of them isn't null.
- **/
- std::shared_ptr<NodeExclusionRule> rule =
- excluded_nodes != nullptr ? excluded_nodes : bad_node_tracker_;
-
- auto datanodes = block->locs();
- auto it = std::find_if(datanodes.begin(), datanodes.end(),
- [rule](const DatanodeInfoProto &dn) {
- return !rule->IsBadNode(dn.id().datanodeuuid());
- });
-
- if (it == datanodes.end()) {
- handler(Status::ResourceUnavailable("No datanodes available"), "", 0);
- return;
- }
-
- DatanodeInfoProto *chosen_dn = &*it;
-
- uint64_t offset_within_block = offset - block->offset();
- uint64_t size_within_block = std::min<uint64_t>(
- block->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
-
- AsyncReadBlock<RemoteBlockReaderTrait>(
- fs_->rpc_engine().client_name(), *block, *chosen_dn, offset_within_block,
- asio::buffer(buffers, size_within_block), handler);
-}
-
-template <class BlockReaderTrait, class MutableBufferSequence, class Handler>
-void InputStreamImpl::AsyncReadBlock(
- const std::string &client_name,
- const hadoop::hdfs::LocatedBlockProto &block,
- const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset,
- const MutableBufferSequence &buffers, const Handler &handler) {
- typedef typename BlockReaderTrait::Reader Reader;
- auto m =
- BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), dn);
- auto &s = m->state();
- size_t size = asio::buffer_size(buffers);
- m->Push(new HandshakeContinuation<Reader>(s.reader(), client_name, nullptr,
- &block.b(), size, offset))
- .Push(new ReadBlockContinuation<Reader, MutableBufferSequence>(
- s.reader(), buffers, s.transferred()));
- const std::string &dnid = dn.id().datanodeuuid();
- m->Run([handler, dnid](const Status &status,
- const typename BlockReaderTrait::State &state) {
- handler(status, dnid, *state.transferred());
- });
-}
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
index 71e28ac..0dcae29 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
@@ -16,5 +16,5 @@
# limitations under the License.
#
-add_library(reader remote_block_reader.cc datatransfer.cc)
+add_library(reader block_reader.cc datatransfer.cc)
add_dependencies(reader proto)