You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/07/14 21:48:16 UTC
[1/5] hadoop git commit: HDFS-8758. Implement the continuation
library in libhdfspp. Contributed by Haohui Mai.
Repository: hadoop
Updated Branches:
refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1 [created] 08e12b0cb
HDFS-8758. Implement the continuation library in libhdfspp. Contributed by Haohui Mai.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8b02d962
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8b02d962
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8b02d962
Branch: refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1
Commit: 8b02d962b291afe4b08c47f0398c1db0709419a1
Parents: ac60c6a
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Jul 9 14:02:55 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Jul 10 17:09:01 2015 -0700
----------------------------------------------------------------------
.../src/main/native/libhdfspp/doc/Doxyfile.in | 1 +
.../libhdfspp/lib/common/continuation/asio.h | 112 ++++++++++++++++
.../lib/common/continuation/continuation.h | 125 ++++++++++++++++++
.../lib/common/continuation/protobuf.h | 128 +++++++++++++++++++
4 files changed, 366 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in
index 773990f..ac1d0fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in
@@ -8,6 +8,7 @@ BUILTIN_STL_SUPPORT = YES
INPUT = @PROJECT_SOURCE_DIR@/doc/mainpage.dox \
@PROJECT_SOURCE_DIR@/include/libhdfspp \
+ @PROJECT_SOURCE_DIR@/lib/common/continuation \
INPUT_ENCODING = UTF-8
RECURSIVE = NO
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
new file mode 100644
index 0000000..f7d76e8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_COMMON_CONTINUATION_ASIO_H_
+#define LIB_COMMON_CONTINUATION_ASIO_H_
+
+#include "continuation.h"
+#include "common/util.h"
+
+#include "libhdfspp/status.h"
+
+#include <asio/connect.hpp>
+#include <asio/read.hpp>
+#include <asio/write.hpp>
+#include <asio/ip/tcp.hpp>
+
+namespace hdfs {
+namespace continuation {
+
+template <class Stream, class MutableBufferSequence>
+class ReadContinuation : public Continuation {
+public:
+ ReadContinuation(Stream *stream, const MutableBufferSequence &buffer)
+ : stream_(stream), buffer_(buffer) {}
+ virtual void Run(const Next &next) override {
+ auto handler =
+ [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); };
+ asio::async_read(*stream_, buffer_, handler);
+ }
+
+private:
+ Stream *stream_;
+ MutableBufferSequence buffer_;
+};
+
+template <class Stream, class ConstBufferSequence>
+class WriteContinuation : public Continuation {
+public:
+ WriteContinuation(Stream *stream, const ConstBufferSequence &buffer)
+ : stream_(stream), buffer_(buffer) {}
+
+ virtual void Run(const Next &next) override {
+ auto handler =
+ [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); };
+ asio::async_write(*stream_, buffer_, handler);
+ }
+
+private:
+ Stream *stream_;
+ ConstBufferSequence buffer_;
+};
+
+template <class Socket, class Iterator>
+class ConnectContinuation : public Continuation {
+public:
+ ConnectContinuation(Socket *socket, Iterator begin, Iterator end,
+ Iterator *connected_endpoint)
+ : socket_(socket), begin_(begin), end_(end),
+ connected_endpoint_(connected_endpoint) {}
+
+ virtual void Run(const Next &next) override {
+ auto handler = [this, next](const asio::error_code &ec, Iterator it) {
+ if (connected_endpoint_) {
+ *connected_endpoint_ = it;
+ }
+ next(ToStatus(ec));
+ };
+ asio::async_connect(*socket_, begin_, end_, handler);
+ }
+
+private:
+ Socket *socket_;
+ Iterator begin_;
+ Iterator end_;
+ Iterator *connected_endpoint_;
+};
+
+template <class Stream, class ConstBufferSequence>
+static inline Continuation *Write(Stream *stream,
+ const ConstBufferSequence &buffer) {
+ return new WriteContinuation<Stream, ConstBufferSequence>(stream, buffer);
+}
+
+template <class Stream, class MutableBufferSequence>
+static inline Continuation *Read(Stream *stream,
+ const MutableBufferSequence &buffer) {
+ return new ReadContinuation<Stream, MutableBufferSequence>(stream, buffer);
+}
+
+template <class Socket, class Iterator>
+static inline Continuation *Connect(Socket *socket, Iterator begin,
+ Iterator end) {
+ return new ConnectContinuation<Socket, Iterator>(socket, begin, end, nullptr);
+}
+}
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
new file mode 100644
index 0000000..9576c2f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_COMMON_CONTINUATION_CONTINUATION_H_
+#define LIB_COMMON_CONTINUATION_CONTINUATION_H_
+
+#include "libhdfspp/status.h"
+
+#include <functional>
+#include <memory>
+#include <vector>
+
+namespace hdfs {
+namespace continuation {
+
+class PipelineBase;
+
+/**
+ * A continuation is a fragment of runnable code whose execution will
+ * be scheduled by a \link Pipeline \endlink.
+ *
+ * The Continuation class is a build block to implement the
+ * Continuation Passing Style (CPS) in libhdfs++. In CPS, the
+ * upper-level user specifies the control flow by chaining a sequence
+ * of continuations explicitly through the \link Run() \endlink method,
+ * while in traditional imperative programming the sequences of
+ * sentences implicitly specify the control flow.
+ *
+ * See http://en.wikipedia.org/wiki/Continuation for more details.
+ **/
+class Continuation {
+public:
+ typedef std::function<void(const Status &)> Next;
+ virtual ~Continuation() = default;
+ virtual void Run(const Next &next) = 0;
+ Continuation(const Continuation &) = delete;
+ Continuation &operator=(const Continuation &) = delete;
+
+protected:
+ Continuation() = default;
+};
+
+/**
+ * A pipeline schedules the execution of a chain of \link Continuation
+ * \endlink. The pipeline schedules the execution of continuations
+ * based on their order in the pipeline, where the next parameter for
+ * each continuation points to the \link Schedule() \endlink
+ * method. That way the pipeline executes all scheduled continuations
+ * in sequence.
+ *
+ * The typical use case of a pipeline is executing continuations
+ * asynchronously. Note that a continuation calls the next
+ * continuation when it is finished. If the continuation is posted
+ * into an asynchronous event loop, invoking the next continuation
+ * can be done in the callback handler in the asynchronous event loop.
+ *
+ * The pipeline allocates the memory as follows. A pipeline is always
+ * allocated on the heap. It owns all the continuations as well as the
+ * the state specified by the user. Both the continuations and the
+ * state have the same life cycle of the pipeline. The design
+ * simplifies the problem of ensuring that the executions in the
+ * asynchronous event loop always hold valid pointers w.r.t. the
+ * pipeline. The pipeline will automatically deallocate itself right
+ * after it invokes the callback specified the user.
+ **/
+template <class State> class Pipeline {
+public:
+ typedef std::function<void(const Status &, const State &)> UserHandler;
+ static Pipeline *Create() { return new Pipeline(); }
+ Pipeline &Push(Continuation *stage);
+ void Run(UserHandler &&handler);
+ State &state() { return state_; }
+
+private:
+ State state_;
+ std::vector<std::unique_ptr<Continuation>> routines_;
+ size_t stage_;
+ std::function<void(const Status &, const State &)> handler_;
+
+ Pipeline() : stage_(0) {}
+ ~Pipeline() = default;
+ void Schedule(const Status &status);
+};
+
+template <class State>
+inline Pipeline<State> &Pipeline<State>::Push(Continuation *stage) {
+ routines_.emplace_back(std::unique_ptr<Continuation>(stage));
+ return *this;
+}
+
+template <class State>
+inline void Pipeline<State>::Schedule(const Status &status) {
+ if (stage_ >= routines_.size()) {
+ handler_(status, state_);
+ routines_.clear();
+ delete this;
+ } else {
+ auto next = routines_[stage_].get();
+ ++stage_;
+ next->Run(std::bind(&Pipeline::Schedule, this, std::placeholders::_1));
+ }
+}
+
+template <class State> inline void Pipeline<State>::Run(UserHandler &&handler) {
+ handler_ = std::move(handler);
+ Schedule(Status::OK());
+}
+}
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
new file mode 100644
index 0000000..3e4b535
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_
+#define LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_
+
+#include "common/util.h"
+
+#include <google/protobuf/message_lite.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
+#include <cassert>
+
+namespace hdfs {
+namespace continuation {
+
+template <class Stream, size_t MaxMessageSize = 512>
+struct ReadDelimitedPBMessageContinuation : public Continuation {
+ ReadDelimitedPBMessageContinuation(Stream *stream,
+ ::google::protobuf::MessageLite *msg)
+ : stream_(stream), msg_(msg) {}
+
+ virtual void Run(const Next &next) override {
+ namespace pbio = google::protobuf::io;
+ auto handler = [this, next](const asio::error_code &ec, size_t) {
+ Status status;
+ if (ec) {
+ status = ToStatus(ec);
+ } else {
+ pbio::ArrayInputStream as(&buf_[0], buf_.size());
+ pbio::CodedInputStream is(&as);
+ uint32_t size = 0;
+ bool v = is.ReadVarint32(&size);
+ assert(v);
+ is.PushLimit(size);
+ msg_->Clear();
+ v = msg_->MergeFromCodedStream(&is);
+ assert(v);
+ }
+ next(status);
+ };
+ asio::async_read(
+ *stream_, asio::buffer(buf_),
+ std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this,
+ std::placeholders::_1, std::placeholders::_2),
+ handler);
+ }
+
+private:
+ size_t CompletionHandler(const asio::error_code &ec, size_t transferred) {
+ if (ec) {
+ return 0;
+ }
+
+ size_t offset = 0, len = 0;
+ for (size_t i = 0; i + 1 < transferred && i < sizeof(int); ++i) {
+ len = (len << 7) | (buf_[i] & 0x7f);
+ if ((uint8_t)buf_.at(i) < 0x80) {
+ offset = i + 1;
+ break;
+ }
+ }
+
+ assert(offset + len < buf_.size() && "Message is too big");
+ return offset ? len + offset - transferred : 1;
+ }
+
+ Stream *stream_;
+ ::google::protobuf::MessageLite *msg_;
+ std::array<char, MaxMessageSize> buf_;
+};
+
+template <class Stream>
+struct WriteDelimitedPBMessageContinuation : Continuation {
+ WriteDelimitedPBMessageContinuation(Stream *stream,
+ const google::protobuf::MessageLite *msg)
+ : stream_(stream), msg_(msg) {}
+
+ virtual void Run(const Next &next) override {
+ namespace pbio = google::protobuf::io;
+ int size = msg_->ByteSize();
+ buf_.reserve(pbio::CodedOutputStream::VarintSize32(size) + size);
+ pbio::StringOutputStream ss(&buf_);
+ pbio::CodedOutputStream os(&ss);
+ os.WriteVarint32(size);
+ msg_->SerializeToCodedStream(&os);
+ write_coroutine_ =
+ std::shared_ptr<Continuation>(Write(stream_, asio::buffer(buf_)));
+ write_coroutine_->Run([next](const Status &stat) { next(stat); });
+ }
+
+private:
+ Stream *stream_;
+ const google::protobuf::MessageLite *msg_;
+ std::string buf_;
+ std::shared_ptr<Continuation> write_coroutine_;
+};
+
+template <class Stream, size_t MaxMessageSize = 512>
+static inline Continuation *
+ReadDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
+ return new ReadDelimitedPBMessageContinuation<Stream, MaxMessageSize>(stream,
+ msg);
+}
+
+template <class Stream>
+static inline Continuation *
+WriteDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
+ return new WriteDelimitedPBMessageContinuation<Stream>(stream, msg);
+}
+}
+}
+#endif
[3/5] hadoop git commit: HDFS-8759. Implement remote block reader in
libhdfspp. Contributed by Haohui Mai.
Posted by wh...@apache.org.
HDFS-8759. Implement remote block reader in libhdfspp. Contributed by Haohui Mai.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6416b38c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6416b38c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6416b38c
Branch: refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1
Commit: 6416b38cade36a1c251ef96d5a0f9b80b785e58b
Parents: 8b02d96
Author: Haohui Mai <wh...@apache.org>
Authored: Fri Jul 10 16:50:45 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Jul 14 12:47:00 2015 -0700
----------------------------------------------------------------------
.../native/libhdfspp/include/libhdfspp/status.h | 4 +-
.../main/native/libhdfspp/lib/CMakeLists.txt | 20 ++
.../native/libhdfspp/lib/common/CMakeLists.txt | 1 +
.../main/native/libhdfspp/lib/common/base64.cc | 71 ++++
.../main/native/libhdfspp/lib/common/status.cc | 66 ++++
.../native/libhdfspp/lib/reader/CMakeLists.txt | 20 ++
.../native/libhdfspp/lib/reader/block_reader.h | 114 +++++++
.../native/libhdfspp/lib/reader/datatransfer.h | 35 ++
.../libhdfspp/lib/reader/remote_block_reader.cc | 46 +++
.../lib/reader/remote_block_reader_impl.h | 342 +++++++++++++++++++
10 files changed, 718 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h
index 9436c8b..d2ef005 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h
@@ -45,6 +45,8 @@ class Status {
{ return Status(kUnimplemented, ""); }
static Status Exception(const char *expception_class_name, const char *error_message)
{ return Status(kException, expception_class_name, error_message); }
+ static Status Error(const char *error_message)
+ { return Exception("Exception", error_message); }
// Returns true iff the status indicates success.
bool ok() const { return (state_ == NULL); }
@@ -71,7 +73,7 @@ class Status {
kInvalidArgument = static_cast<unsigned>(std::errc::invalid_argument),
kResourceUnavailable = static_cast<unsigned>(std::errc::resource_unavailable_try_again),
kUnimplemented = static_cast<unsigned>(std::errc::function_not_supported),
- kException = 256,
+ kException = 255,
};
explicit Status(int code, const char *msg1, const char *msg2);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
index 7458453..e77942b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
@@ -1,2 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+add_subdirectory(common)
+add_subdirectory(reader)
add_subdirectory(rpc)
add_subdirectory(proto)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
new file mode 100644
index 0000000..570d0ac
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
@@ -0,0 +1 @@
+add_library(common base64.cc status.cc)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/base64.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/base64.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/base64.cc
new file mode 100644
index 0000000..f98fec5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/base64.cc
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util.h"
+
+#include <array>
+#include <functional>
+#include <algorithm>
+
+namespace hdfs {
+
+std::string Base64Encode(const std::string &src) {
+ static const char kDictionary[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "abcdefghijklmnopqrstuvwxyz"
+ "0123456789+/";
+
+ int encoded_size = (src.size() + 2) / 3 * 4;
+ std::string dst;
+ dst.reserve(encoded_size);
+
+ size_t i = 0;
+ while (i + 3 < src.length()) {
+ const char *s = &src[i];
+ const int r[4] = {s[0] >> 2, ((s[0] << 4) | (s[1] >> 4)) & 0x3f,
+ ((s[1] << 2) | (s[2] >> 6)) & 0x3f, s[2] & 0x3f};
+
+ std::transform(r, r + sizeof(r) / sizeof(int), std::back_inserter(dst),
+ [&r](unsigned char v) { return kDictionary[v]; });
+ i += 3;
+ }
+
+ size_t remained = src.length() - i;
+ const char *s = &src[i];
+
+ switch (remained) {
+ case 0:
+ break;
+ case 1: {
+ char padding[4] = {kDictionary[s[0] >> 2], kDictionary[(s[0] << 4) & 0x3f],
+ '=', '='};
+ dst.append(padding, sizeof(padding));
+ } break;
+ case 2: {
+ char padding[4] = {kDictionary[src[i] >> 2],
+ kDictionary[((s[0] << 4) | (s[1] >> 4)) & 0x3f],
+ kDictionary[(s[1] << 2) & 0x3f], '='};
+ dst.append(padding, sizeof(padding));
+ } break;
+ default:
+ assert("Unreachable");
+ break;
+ }
+ return dst;
+}
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/status.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/status.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/status.cc
new file mode 100644
index 0000000..66cfa1c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/status.cc
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "libhdfspp/status.h"
+
+#include <cassert>
+#include <cstring>
+
+namespace hdfs {
+
+Status::Status(int code, const char *msg1)
+ : state_(ConstructState(code, msg1, nullptr)) {}
+
+Status::Status(int code, const char *msg1, const char *msg2)
+ : state_(ConstructState(code, msg1, msg2)) {}
+
+const char *Status::ConstructState(int code, const char *msg1,
+ const char *msg2) {
+ assert(code != kOk);
+ const uint32_t len1 = strlen(msg1);
+ const uint32_t len2 = msg2 ? strlen(msg2) : 0;
+ const uint32_t size = len1 + (len2 ? (2 + len2) : 0);
+ char *result = new char[size + 8 + 2];
+ *reinterpret_cast<uint32_t *>(result) = size;
+ *reinterpret_cast<uint32_t *>(result + 4) = code;
+ memcpy(result + 8, msg1, len1);
+ if (len2) {
+ result[8 + len1] = ':';
+ result[9 + len1] = ' ';
+ memcpy(result + 10 + len1, msg2, len2);
+ }
+ return result;
+}
+
+std::string Status::ToString() const {
+ if (!state_) {
+ return "OK";
+ } else {
+ uint32_t length = *reinterpret_cast<const uint32_t *>(state_);
+ return std::string(state_ + 8, length);
+ }
+}
+
+const char *Status::CopyState(const char *state) {
+ uint32_t size;
+ memcpy(&size, state, sizeof(size));
+ char *result = new char[size + 8];
+ memcpy(result, state, size + 8);
+ return result;
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
new file mode 100644
index 0000000..65ec108
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+add_library(reader remote_block_reader.cc)
+add_dependencies(reader proto)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/block_reader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/block_reader.h
new file mode 100644
index 0000000..81636b9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/block_reader.h
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef BLOCK_READER_H_
+#define BLOCK_READER_H_
+
+#include "libhdfspp/status.h"
+#include "datatransfer.pb.h"
+
+#include <memory>
+
+namespace hdfs {
+
+struct CacheStrategy {
+ bool drop_behind_specified;
+ bool drop_behind;
+ bool read_ahead_specified;
+ unsigned long long read_ahead;
+ CacheStrategy()
+ : drop_behind_specified(false), drop_behind(false),
+ read_ahead_specified(false), read_ahead(false) {}
+};
+
+enum DropBehindStrategy {
+ kUnspecified = 0,
+ kEnableDropBehind = 1,
+ kDisableDropBehind = 2,
+};
+
+enum EncryptionScheme {
+ kNone = 0,
+ kAESCTRNoPadding = 1,
+};
+
+struct BlockReaderOptions {
+ bool verify_checksum;
+ CacheStrategy cache_strategy;
+ EncryptionScheme encryption_scheme;
+
+ BlockReaderOptions()
+ : verify_checksum(true), encryption_scheme(EncryptionScheme::kNone) {}
+};
+
+template <class Stream>
+class RemoteBlockReader
+ : public std::enable_shared_from_this<RemoteBlockReader<Stream>> {
+public:
+ explicit RemoteBlockReader(const BlockReaderOptions &options, Stream *stream)
+ : stream_(stream), state_(kOpen), options_(options),
+ chunk_padding_bytes_(0) {}
+
+ template <class MutableBufferSequence, class ReadHandler>
+ void async_read_some(const MutableBufferSequence &buffers,
+ const ReadHandler &handler);
+
+ template <class MutableBufferSequence>
+ size_t read_some(const MutableBufferSequence &buffers, Status *status);
+
+ Status connect(const std::string &client_name,
+ const hadoop::common::TokenProto *token,
+ const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+ uint64_t offset);
+
+ template <class ConnectHandler>
+ void async_connect(const std::string &client_name,
+ const hadoop::common::TokenProto *token,
+ const hadoop::hdfs::ExtendedBlockProto *block,
+ uint64_t length, uint64_t offset,
+ const ConnectHandler &handler);
+
+private:
+ struct ReadPacketHeader;
+ struct ReadChecksum;
+ struct ReadPadding;
+ template <class MutableBufferSequence> struct ReadData;
+ struct AckRead;
+ enum State {
+ kOpen,
+ kReadPacketHeader,
+ kReadChecksum,
+ kReadPadding,
+ kReadData,
+ kFinished,
+ };
+
+ Stream *stream_;
+ hadoop::hdfs::PacketHeaderProto header_;
+ State state_;
+ BlockReaderOptions options_;
+ size_t packet_len_;
+ int packet_data_read_bytes_;
+ int chunk_padding_bytes_;
+ long long bytes_to_read_;
+ std::vector<char> checksum_;
+};
+}
+
+#include "remote_block_reader_impl.h"
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
new file mode 100644
index 0000000..d22f5e8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef COMMON_DATA_TRANSFER_H_
+#define COMMON_DATA_TRANSFER_H_
+
+namespace hdfs {
+
+enum {
+ kDataTransferVersion = 28,
+ kDataTransferSasl = 0xdeadbeef,
+};
+
+enum Operation {
+ kWriteBlock = 80,
+ kReadBlock = 81,
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
new file mode 100644
index 0000000..68bc4ee
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "block_reader.h"
+
+namespace hdfs {
+
+hadoop::hdfs::OpReadBlockProto
+ReadBlockProto(const std::string &client_name, bool verify_checksum,
+ const hadoop::common::TokenProto *token,
+ const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+ uint64_t offset) {
+ using namespace hadoop::hdfs;
+ using namespace hadoop::common;
+ BaseHeaderProto *base_h = new BaseHeaderProto();
+ base_h->set_allocated_block(new ExtendedBlockProto(*block));
+ if (token) {
+ base_h->set_allocated_token(new TokenProto(*token));
+ }
+ ClientOperationHeaderProto *h = new ClientOperationHeaderProto();
+ h->set_clientname(client_name);
+ h->set_allocated_baseheader(base_h);
+
+ OpReadBlockProto p;
+ p.set_allocated_header(h);
+ p.set_offset(offset);
+ p.set_len(length);
+ p.set_sendchecksums(verify_checksum);
+ // TODO: p.set_allocated_cachingstrategy();
+ return p;
+}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
new file mode 100644
index 0000000..68ea6ad
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_
+#define LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_
+
+#include "datatransfer.h"
+#include "common/continuation/asio.h"
+#include "common/continuation/protobuf.h"
+
+#include <asio/buffers_iterator.hpp>
+#include <asio/streambuf.hpp>
+#include <asio/write.hpp>
+
+#include <arpa/inet.h>
+
+#include <future>
+
+namespace hdfs {
+
+hadoop::hdfs::OpReadBlockProto
+ReadBlockProto(const std::string &client_name, bool verify_checksum,
+ const hadoop::common::TokenProto *token,
+ const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+ uint64_t offset);
+
+template <class Stream>
+template <class ConnectHandler>
+void RemoteBlockReader<Stream>::async_connect(
+ const std::string &client_name, const hadoop::common::TokenProto *token,
+ const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+ uint64_t offset, const ConnectHandler &handler) {
+ // The total number of bytes that we need to transfer from the DN is
+ // the amount that the user wants (bytesToRead), plus the padding at
+ // the beginning in order to chunk-align. Note that the DN may elect
+ // to send more than this amount if the read starts/ends mid-chunk.
+ bytes_to_read_ = length;
+
+ struct State {
+ std::string header;
+ hadoop::hdfs::OpReadBlockProto request;
+ hadoop::hdfs::BlockOpResponseProto response;
+ };
+
+ auto m = continuation::Pipeline<State>::Create();
+ State *s = &m->state();
+
+ s->header.insert(s->header.begin(),
+ {0, kDataTransferVersion, Operation::kReadBlock});
+ s->request = std::move(ReadBlockProto(client_name, options_.verify_checksum,
+ token, block, length, offset));
+
+ auto read_pb_message =
+ new continuation::ReadDelimitedPBMessageContinuation<Stream, 16384>(
+ stream_, &s->response);
+
+ m->Push(continuation::Write(stream_, asio::buffer(s->header)))
+ .Push(continuation::WriteDelimitedPBMessage(stream_, &s->request))
+ .Push(read_pb_message);
+
+ m->Run([this, handler, offset](const Status &status, const State &s) {
+ Status stat = status;
+ if (stat.ok()) {
+ const auto &resp = s.response;
+ if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
+ if (resp.has_readopchecksuminfo()) {
+ const auto &checksum_info = resp.readopchecksuminfo();
+ chunk_padding_bytes_ = offset - checksum_info.chunkoffset();
+ }
+ state_ = kReadPacketHeader;
+ } else {
+ stat = Status::Error(s.response.message().c_str());
+ }
+ }
+ handler(stat);
+ });
+}
+
+template <class Stream>
+struct RemoteBlockReader<Stream>::ReadPacketHeader
+ : continuation::Continuation {
+ ReadPacketHeader(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
+
+ virtual void Run(const Next &next) override {
+ parent_->packet_data_read_bytes_ = 0;
+ parent_->packet_len_ = 0;
+ auto handler = [next, this](const asio::error_code &ec, size_t) {
+ Status status;
+ if (ec) {
+ status = Status(ec.value(), ec.message().c_str());
+ } else {
+ parent_->packet_len_ = packet_length();
+ parent_->header_.Clear();
+ bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart],
+ header_length());
+ assert(v && "Failed to parse the header");
+ parent_->state_ = kReadChecksum;
+ }
+ next(status);
+ };
+
+ asio::async_read(*parent_->stream_, asio::buffer(buf_),
+ std::bind(&ReadPacketHeader::CompletionHandler, this,
+ std::placeholders::_1, std::placeholders::_2),
+ handler);
+ }
+
+private:
+ static const size_t kMaxHeaderSize = 512;
+ static const size_t kPayloadLenOffset = 0;
+ static const size_t kPayloadLenSize = sizeof(int);
+ static const size_t kHeaderLenOffset = 4;
+ static const size_t kHeaderLenSize = sizeof(short);
+ static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize;
+
+ RemoteBlockReader<Stream> *parent_;
+ std::array<char, kMaxHeaderSize> buf_;
+
+ size_t packet_length() const {
+ return ntohl(*reinterpret_cast<const unsigned *>(&buf_[kPayloadLenOffset]));
+ }
+
+ size_t header_length() const {
+ return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset]));
+ }
+
+ size_t CompletionHandler(const asio::error_code &ec, size_t transferred) {
+ if (ec) {
+ return 0;
+ } else if (transferred < kHeaderStart) {
+ return kHeaderStart - transferred;
+ } else {
+ return kHeaderStart + header_length() - transferred;
+ }
+ }
+};
+
+template <class Stream>
+struct RemoteBlockReader<Stream>::ReadChecksum : continuation::Continuation {
+ ReadChecksum(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
+
+ virtual void Run(const Next &next) override {
+ auto parent = parent_;
+ if (parent->state_ != kReadChecksum) {
+ next(Status::OK());
+ return;
+ }
+
+ auto handler = [parent, next](const asio::error_code &ec, size_t) {
+ Status status;
+ if (ec) {
+ status = Status(ec.value(), ec.message().c_str());
+ } else {
+ parent->state_ =
+ parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
+ }
+ next(status);
+ };
+ parent->checksum_.resize(parent->packet_len_ - sizeof(int) -
+ parent->header_.datalen());
+ asio::async_read(*parent->stream_, asio::buffer(parent->checksum_),
+ handler);
+ }
+
+private:
+ RemoteBlockReader<Stream> *parent_;
+};
+
+template <class Stream>
+struct RemoteBlockReader<Stream>::ReadPadding : continuation::Continuation {
+ ReadPadding(RemoteBlockReader<Stream> *parent)
+ : parent_(parent), padding_(parent->chunk_padding_bytes_),
+ bytes_transferred_(std::make_shared<size_t>(0)),
+ read_data_(new ReadData<asio::mutable_buffers_1>(
+ parent, bytes_transferred_, asio::buffer(padding_))) {}
+
+ virtual void Run(const Next &next) override {
+ if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) {
+ next(Status::OK());
+ return;
+ }
+
+ auto h = [next, this](const Status &status) {
+ if (status.ok()) {
+ assert(reinterpret_cast<const int &>(*bytes_transferred_) ==
+ parent_->chunk_padding_bytes_);
+ parent_->chunk_padding_bytes_ = 0;
+ parent_->state_ = kReadData;
+ }
+ next(status);
+ };
+ read_data_->Run(h);
+ }
+
+private:
+ RemoteBlockReader<Stream> *parent_;
+ std::vector<char> padding_;
+ std::shared_ptr<size_t> bytes_transferred_;
+ std::shared_ptr<continuation::Continuation> read_data_;
+ ReadPadding(const ReadPadding &) = delete;
+ ReadPadding &operator=(const ReadPadding &) = delete;
+};
+
+template <class Stream>
+template <class MutableBufferSequence>
+struct RemoteBlockReader<Stream>::ReadData : continuation::Continuation {
+ ReadData(RemoteBlockReader<Stream> *parent,
+ std::shared_ptr<size_t> bytes_transferred,
+ const MutableBufferSequence &buf)
+ : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) {}
+
+ virtual void Run(const Next &next) override {
+ auto handler =
+ [next, this](const asio::error_code &ec, size_t transferred) {
+ Status status;
+ if (ec) {
+ status = Status(ec.value(), ec.message().c_str());
+ }
+ *bytes_transferred_ += transferred;
+ parent_->bytes_to_read_ -= transferred;
+ parent_->packet_data_read_bytes_ += transferred;
+ if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
+ parent_->state_ = kReadPacketHeader;
+ }
+ next(status);
+ };
+
+ auto data_len =
+ parent_->header_.datalen() - parent_->packet_data_read_bytes_;
+ async_read(*parent_->stream_, buf_, asio::transfer_exactly(data_len),
+ handler);
+ }
+
+private:
+ RemoteBlockReader<Stream> *parent_;
+ std::shared_ptr<size_t> bytes_transferred_;
+ MutableBufferSequence buf_;
+};
+
+template <class Stream>
+struct RemoteBlockReader<Stream>::AckRead : continuation::Continuation {
+ AckRead(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
+
+ virtual void Run(const Next &next) override {
+ if (parent_->bytes_to_read_ > 0) {
+ next(Status::OK());
+ return;
+ }
+
+ auto m =
+ continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create();
+ m->state().set_status(parent_->options_.verify_checksum
+ ? hadoop::hdfs::Status::CHECKSUM_OK
+ : hadoop::hdfs::Status::SUCCESS);
+
+ m->Push(
+ continuation::WriteDelimitedPBMessage(parent_->stream_, &m->state()));
+
+ m->Run([this, next](const Status &status,
+ const hadoop::hdfs::ClientReadStatusProto &) {
+ if (status.ok()) {
+ parent_->state_ = RemoteBlockReader<Stream>::kFinished;
+ }
+ next(status);
+ });
+ }
+
+private:
+ RemoteBlockReader<Stream> *parent_;
+};
+
+template <class Stream>
+template <class MutableBufferSequence, class ReadHandler>
+void RemoteBlockReader<Stream>::async_read_some(
+ const MutableBufferSequence &buffers, const ReadHandler &handler) {
+ assert(state_ != kOpen && "Not connected");
+
+ struct State {
+ std::shared_ptr<size_t> bytes_transferred;
+ };
+ auto m = continuation::Pipeline<State>::Create();
+ m->state().bytes_transferred = std::make_shared<size_t>(0);
+
+ m->Push(new ReadPacketHeader(this))
+ .Push(new ReadChecksum(this))
+ .Push(new ReadPadding(this))
+ .Push(new ReadData<MutableBufferSequence>(
+ this, m->state().bytes_transferred, buffers))
+ .Push(new AckRead(this));
+
+ auto self = this->shared_from_this();
+ m->Run([self, handler](const Status &status, const State &state) {
+ handler(status, *state.bytes_transferred);
+ });
+}
+
+template <class Stream>
+template <class MutableBufferSequence>
+size_t
+RemoteBlockReader<Stream>::read_some(const MutableBufferSequence &buffers,
+ Status *status) {
+ size_t transferred = 0;
+ auto done = std::make_shared<std::promise<void>>();
+ auto future = done->get_future();
+ async_read_some(buffers,
+ [status, &transferred, done](const Status &stat, size_t t) {
+ *status = stat;
+ transferred = t;
+ done->set_value();
+ });
+ future.wait();
+ return transferred;
+}
+
+template <class Stream>
+Status RemoteBlockReader<Stream>::connect(
+ const std::string &client_name, const hadoop::common::TokenProto *token,
+ const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+ uint64_t offset) {
+ auto stat = std::make_shared<std::promise<Status>>();
+ std::future<Status> future(stat->get_future());
+ async_connect(client_name, token, block, length, offset,
+ [stat](const Status &status) { stat->set_value(status); });
+ return future.get();
+}
+}
+
+#endif
[5/5] hadoop git commit: f
Posted by wh...@apache.org.
f
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/08e12b0c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/08e12b0c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/08e12b0c
Branch: refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1
Commit: 08e12b0cb0457a06e0888f07496dbd84e7b5fb0c
Parents: e6ca03c
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Jul 14 12:47:51 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Jul 14 12:47:51 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08e12b0c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
index cae786c..e51483a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
@@ -19,6 +19,7 @@
project (libhdfspp)
find_package(Doxygen)
+find_package(OpenSSL REQUIRED)
find_package(Protobuf REQUIRED)
find_package(Threads)
[2/5] hadoop git commit: HDFS-8764. Generate Hadoop RPC stubs from
protobuf definitions. Contributed by Haohui Mai.
Posted by wh...@apache.org.
HDFS-8764. Generate Hadoop RPC stubs from protobuf definitions. Contributed by Haohui Mai.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f34bda72
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f34bda72
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f34bda72
Branch: refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1
Commit: f34bda720184cc2a7e3ac48916c693519387539b
Parents: 6416b38
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Jul 13 16:53:13 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Jul 14 12:47:00 2015 -0700
----------------------------------------------------------------------
.../native/libhdfspp/lib/proto/CMakeLists.txt | 46 +++++++++-
.../native/libhdfspp/lib/proto/cpp_helpers.h | 82 +++++++++++++++++
.../libhdfspp/lib/proto/protoc_gen_hrpc.cc | 94 ++++++++++++++++++++
3 files changed, 221 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f34bda72/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
index 156a7f4..3f703b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
@@ -18,4 +18,48 @@ protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS
${COMMON_PROTO_DIR}/Security.proto
)
-add_library(proto ${PROTO_SRCS} ${PROTO_HDRS})
+add_executable(protoc-gen-hrpc protoc_gen_hrpc.cc)
+target_link_libraries(protoc-gen-hrpc ${PROTOBUF_PROTOC_LIBRARY} ${PROTOBUF_LIBRARY})
+
+function(GEN_HRPC SRCS)
+ if(NOT ARGN)
+ message(SEND_ERROR "Error: GEN_HRPC() called without any proto files")
+ return()
+ endif()
+
+ if(DEFINED PROTOBUF_IMPORT_DIRS)
+ foreach(DIR ${PROTOBUF_IMPORT_DIRS})
+ get_filename_component(ABS_PATH ${DIR} ABSOLUTE)
+ list(FIND _protobuf_include_path ${ABS_PATH} _contains_already)
+ if(${_contains_already} EQUAL -1)
+ list(APPEND _protobuf_include_path -I ${ABS_PATH})
+ endif()
+ endforeach()
+ endif()
+
+ set(${SRCS})
+
+ foreach(FIL ${ARGN})
+ get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
+ get_filename_component(FIL_WE ${FIL} NAME_WE)
+
+ list(APPEND ${SRCS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.hrpc.inl")
+
+ add_custom_command(
+ OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.hrpc.inl"
+ COMMAND ${PROTOBUF_PROTOC_EXECUTABLE}
+ ARGS --plugin=protoc-gen-hrpc=${CMAKE_CURRENT_BINARY_DIR}/protoc-gen-hrpc --hrpc_out=${CMAKE_CURRENT_BINARY_DIR} ${_protobuf_include_path} ${ABS_FIL}
+ DEPENDS ${ABS_FIL} ${PROTOBUF_PROTOC_EXECUTABLE} protoc-gen-hrpc
+ COMMENT "Running HRPC protocol buffer compiler on ${FIL}"
+ VERBATIM )
+ endforeach()
+
+ set_source_files_properties(${${SRCS}} PROPERTIES GENERATED TRUE)
+ set(${SRCS} ${${SRCS}} PARENT_SCOPE)
+endfunction()
+
+gen_hrpc(HRPC_SRCS
+ ${CLIENT_PROTO_DIR}/ClientNamenodeProtocol.proto
+)
+
+add_library(proto ${PROTO_SRCS} ${PROTO_HDRS} ${HRPC_SRCS})
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f34bda72/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/cpp_helpers.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/cpp_helpers.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/cpp_helpers.h
new file mode 100644
index 0000000..6f380ad
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/cpp_helpers.h
@@ -0,0 +1,82 @@
+// Protocol Buffers - Google's data interchange format
+// Copyright 2008 Google Inc. All rights reserved.
+// https://developers.google.com/protocol-buffers/
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+// Author: kenton@google.com (Kenton Varda)
+// Based on original Protocol Buffers design by
+// Sanjay Ghemawat, Jeff Dean, and others.
+
+#ifndef LIBHDFSPP_PROTO_CPP_HELPERS_H_
+#define LIBHDFSPP_PROTO_CPP_HELPERS_H_
+
+#include <string>
+
+/**
+ * The functions in this file are derived from the original implementation of
+ *the protobuf library from Google.
+ **/
+
+static inline std::string StripProto(const std::string &str) {
+ static const std::string kExtension = ".proto";
+ if (str.size() >= kExtension.size() &&
+ str.compare(str.size() - kExtension.size(), kExtension.size(),
+ kExtension) == 0) {
+ return str.substr(0, str.size() - kExtension.size());
+ } else {
+ return str;
+ }
+}
+
+static inline std::string ToCamelCase(const std::string &input) {
+ bool cap_next_letter = true;
+ std::string result;
+ // Note: I distrust ctype.h due to locales.
+ for (size_t i = 0; i < input.size(); i++) {
+ if ('a' <= input[i] && input[i] <= 'z') {
+ if (cap_next_letter) {
+ result += input[i] + ('A' - 'a');
+ } else {
+ result += input[i];
+ }
+ cap_next_letter = false;
+ } else if ('A' <= input[i] && input[i] <= 'Z') {
+ // Capital letters are left as-is.
+ result += input[i];
+ cap_next_letter = false;
+ } else if ('0' <= input[i] && input[i] <= '9') {
+ result += input[i];
+ cap_next_letter = true;
+ } else {
+ cap_next_letter = true;
+ }
+ }
+ return result;
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f34bda72/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc
new file mode 100644
index 0000000..f384e36
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "cpp_helpers.h"
+
+#include <google/protobuf/compiler/code_generator.h>
+#include <google/protobuf/compiler/plugin.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/io/printer.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+#include <google/protobuf/stubs/common.h>
+
+using ::google::protobuf::FileDescriptor;
+using ::google::protobuf::MethodDescriptor;
+using ::google::protobuf::ServiceDescriptor;
+using ::google::protobuf::compiler::CodeGenerator;
+using ::google::protobuf::compiler::GeneratorContext;
+using ::google::protobuf::io::Emiter;
+using ::google::protobuf::io::ZeroCopyOutputStream;
+
+class StubGenerator : public CodeGenerator {
+public:
+ virtual bool Generate(const FileDescriptor *file, const std::string &,
+ GeneratorContext *ctx,
+ std::string *error) const override;
+
+private:
+ void EmitService(const ServiceDescriptor *service, Emiter *out) const;
+ void EmitMethod(const MethodDescriptor *method, Emiter *out) const;
+};
+
+bool StubGenerator::Generate(const FileDescriptor *file, const std::string &,
+ GeneratorContext *ctx, std::string *) const {
+ namespace pb = ::google::protobuf;
+ std::unique_ptr<ZeroCopyOutputStream> os(
+ ctx->Open(StripProto(file->name()) + ".hrpc.inl"));
+ Emiter out(os.get(), '$');
+ for (int i = 0; i < file->service_count(); ++i) {
+ const ServiceDescriptor *service = file->service(i);
+ EmitService(service, &out);
+ }
+ return true;
+}
+
+void StubGenerator::EmitService(const ServiceDescriptor *service,
+ Emiter *out) const {
+ out->Emit("\n// GENERATED AUTOMATICALLY. DO NOT MODIFY.\n"
+ "class $service$ {\n"
+ "private:\n"
+ " ::hdfs::RpcEngine *const engine_;\n"
+ "public:\n"
+ " typedef std::function<void(const ::hdfs::Status &)> Callback;\n"
+ " typedef ::google::protobuf::MessageLite Message;\n"
+ " inline $service$(::hdfs::RpcEngine *engine)\n"
+ " : engine_(engine) {}\n",
+ "service", service->name());
+ for (int i = 0; i < service->method_count(); ++i) {
+ const MethodDescriptor *method = service->method(i);
+ EmitMethod(method, out);
+ }
+ out->Emit("};\n");
+}
+
+void StubGenerator::EmitMethod(const MethodDescriptor *method,
+ Emiter *out) const {
+ out->Emit("\n inline void $camel_method$(const Message *req, "
+ "const std::shared_ptr<Message> &resp, "
+ "Callback &&handler) {\n"
+ " engine_->AsyncRpc(\"$method$\", req, resp, std::move(handler));\n"
+ " }\n",
+ "camel_method", ToCamelCase(method->name()),
+ "method", method->name()
+ );
+}
+
+int main(int argc, char *argv[]) {
+ StubGenerator generator;
+ return google::protobuf::compiler::PluginMain(argc, argv, &generator);
+}
[4/5] hadoop git commit: fs
Posted by wh...@apache.org.
fs
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6ca03c0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6ca03c0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6ca03c0
Branch: refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1
Commit: e6ca03c0357b4b9061349e9903a2064ee0c51715
Parents: f34bda7
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Jul 14 12:43:09 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Jul 14 12:47:00 2015 -0700
----------------------------------------------------------------------
.../native/libhdfspp/include/libhdfspp/hdfs.h | 50 ++++++
.../main/native/libhdfspp/lib/common/hdfs.cc | 29 ++++
.../main/native/libhdfspp/lib/common/wrapper.h | 42 +++++
.../main/native/libhdfspp/lib/fs/CMakeLists.txt | 4 +
.../main/native/libhdfspp/lib/fs/filesystem.cc | 95 +++++++++++
.../main/native/libhdfspp/lib/fs/filesystem.h | 61 +++++++
.../main/native/libhdfspp/lib/fs/inputstream.cc | 53 ++++++
.../native/libhdfspp/lib/fs/inputstream_impl.h | 160 +++++++++++++++++++
.../native/libhdfspp/lib/fs/inputstream_test.cc | 82 ++++++++++
.../native/libhdfspp/lib/fs/namenode_protocol.h | 42 +++++
10 files changed, 618 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
new file mode 100644
index 0000000..d12f20e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_HDFS_H_
+#define LIBHDFSPP_HDFS_H_
+
+#include "libhdfspp/status.h"
+
+namespace hdfs {
+
+class IoService {
+ public:
+ static IoService *New();
+ virtual void Run() = 0;
+ virtual void Stop() = 0;
+ virtual ~IoService();
+};
+
+
+class InputStream {
+ public:
+ virtual Status PositionRead(void *buf, size_t nbyte, size_t offset, size_t *read_bytes) = 0;
+ virtual ~InputStream();
+};
+
+class FileSystem {
+ public:
+ static Status New(IoService *io_service, const char *server,
+ unsigned short port, FileSystem **fsptr);
+ virtual Status Open(const char *path, InputStream **isptr) = 0;
+ virtual ~FileSystem();
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc
new file mode 100644
index 0000000..e7a5d6c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "wrapper.h"
+
+namespace hdfs {
+
+IoService::~IoService() {}
+
+IoService *IoService::New() {
+ return new IoServiceImpl();
+}
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h
new file mode 100644
index 0000000..39d26cc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMMON_WRAPPER_H_
+#define COMMON_WRAPPER_H_
+
+#include "libhdfspp/hdfs.h"
+
+#include <asio/io_service.hpp>
+
+namespace hdfs {
+
+class IoServiceImpl : public IoService {
+ public:
+ virtual void Run() override {
+ asio::io_service::work work(io_service_);
+ io_service_.run();
+ }
+ virtual void Stop() override { io_service_.stop(); }
+ ::asio::io_service &io_service() { return io_service_; }
+ private:
+ ::asio::io_service io_service_;
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
new file mode 100644
index 0000000..bd649ff
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
@@ -0,0 +1,4 @@
+add_library(fs filesystem.cc inputstream.cc)
+add_dependencies(fs proto)
+add_executable(inputstream_test inputstream_test.cc)
+target_link_libraries(inputstream_test common fs rpc reader proto ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
new file mode 100644
index 0000000..ab322c6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filesystem.h"
+
+#include "common/util.h"
+
+#include <asio/ip/tcp.hpp>
+
+#include <limits>
+
+namespace hdfs {
+
+static const char kNamenodeProtocol[] = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
+static const int kNamenodeProtocolVersion = 1;
+
+using ::asio::ip::tcp;
+
+FileSystem::~FileSystem()
+{}
+
+Status FileSystem::New(IoService *io_service, const char *server,
+ unsigned short port, FileSystem **fsptr) {
+ std::unique_ptr<FileSystemImpl> impl(new FileSystemImpl(io_service));
+ Status stat = impl->Connect(server, port);
+ if (stat.ok()) {
+ *fsptr = impl.release();
+ }
+ return stat;
+}
+
+FileSystemImpl::FileSystemImpl(IoService *io_service)
+ : io_service_(static_cast<IoServiceImpl*>(io_service))
+ , engine_(&io_service_->io_service(), RpcEngine::GetRandomClientName(),
+ kNamenodeProtocol, kNamenodeProtocolVersion)
+ , namenode_(&engine_)
+{}
+
+Status FileSystemImpl::Connect(const char *server, unsigned short port) {
+ asio::error_code ec;
+ tcp::resolver resolver(io_service_->io_service());
+ tcp::resolver::query query(tcp::v4(), server, std::to_string(port));
+ tcp::resolver::iterator iterator = resolver.resolve(query, ec);
+
+ if (ec) {
+ return ToStatus(ec);
+ }
+
+ std::vector<tcp::endpoint> servers(iterator, tcp::resolver::iterator());
+ Status stat = engine_.Connect(servers);
+ if (!stat.ok()) {
+ return stat;
+ }
+ engine_.Start();
+ return stat;
+}
+
+Status FileSystemImpl::Open(const char *path, InputStream **isptr) {
+ using ::hadoop::hdfs::GetBlockLocationsRequestProto;
+ using ::hadoop::hdfs::GetBlockLocationsResponseProto;
+
+ GetBlockLocationsRequestProto req;
+ auto resp = std::make_shared<GetBlockLocationsResponseProto>();
+ req.set_src(path);
+ req.set_offset(0);
+ req.set_length(std::numeric_limits<long long>::max());
+ auto stat_p = std::make_shared<std::promise<Status>>();
+ std::future<Status> future(stat_p->get_future());
+ namenode_.GetBlockLocations(&req, resp,
+ [stat_p](const Status &status) { stat_p->set_value(status); });
+ Status stat = future.get();
+ if (!stat.ok()) {
+ return stat;
+ }
+
+ *isptr = new InputStreamImpl(this, &resp->locations());
+ return Status::OK();
+}
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h
new file mode 100644
index 0000000..30536eb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FS_FILESYSTEM_H_
+#define FS_FILESYSTEM_H_
+
+#include "common/wrapper.h"
+#include "libhdfspp/hdfs.h"
+#include "rpc/rpc_engine.h"
+#include "ClientNamenodeProtocol.pb.h"
+#include "ClientNamenodeProtocol.hrpc.inl"
+
+namespace hdfs {
+
+class FileSystemImpl : public FileSystem {
+ public:
+ FileSystemImpl(IoService *io_service);
+ Status Connect(const char *server, unsigned short port);
+ virtual Status Open(const char *path, InputStream **isptr) override;
+ RpcEngine &rpc_engine() { return engine_; }
+ private:
+ IoServiceImpl *io_service_;
+ RpcEngine engine_;
+ ClientNamenodeProtocol namenode_;
+};
+
+class InputStreamImpl : public InputStream {
+ public:
+ InputStreamImpl(FileSystemImpl *fs, const ::hadoop::hdfs::LocatedBlocksProto *blocks);
+ virtual Status PositionRead(void *buf, size_t nbyte, size_t offset, size_t *read_bytes) override;
+ template<class MutableBufferSequence, class Handler>
+ void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers,
+ const Handler &handler);
+ private:
+ FileSystemImpl *fs_;
+ unsigned long long file_length_;
+ std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_;
+ struct HandshakeContinuation;
+ template<class MutableBufferSequence>
+ struct ReadBlockContinuation;
+};
+
+}
+
+#include "inputstream_impl.h"
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
new file mode 100644
index 0000000..a41c684
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filesystem.h"
+
+namespace hdfs {
+
+using ::hadoop::hdfs::LocatedBlocksProto;
+
+InputStream::~InputStream()
+{}
+
+InputStreamImpl::InputStreamImpl(FileSystemImpl *fs, const LocatedBlocksProto *blocks)
+ : fs_(fs)
+ , file_length_(blocks->filelength())
+{
+ for (const auto &block : blocks->blocks()) {
+ blocks_.push_back(block);
+ }
+
+ if (blocks->has_lastblock() && blocks->lastblock().b().numbytes()) {
+ blocks_.push_back(blocks->lastblock());
+ }
+}
+
+Status InputStreamImpl::PositionRead(void *buf, size_t nbyte, size_t offset, size_t *read_bytes) {
+ auto stat = std::make_shared<std::promise<Status>>();
+ std::future<Status> future(stat->get_future());
+ auto handler = [stat,read_bytes](const Status &status, size_t transferred) {
+ *read_bytes = transferred;
+ stat->set_value(status);
+ };
+
+ AsyncPreadSome(offset, asio::buffer(buf, nbyte), handler);
+ return future.get();
+}
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
new file mode 100644
index 0000000..88e1912
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FS_INPUTSTREAM_IMPL_H_
+#define FS_INPUTSTREAM_IMPL_H_
+
+#include "reader/block_reader.h"
+
+#include "common/continuation/asio.h"
+#include "common/continuation/protobuf.h"
+
+#include <functional>
+#include <future>
+
+namespace hdfs {
+
+struct InputStreamImpl::HandshakeContinuation : continuation::Continuation {
+ typedef RemoteBlockReader<::asio::ip::tcp::socket> Reader;
+ HandshakeContinuation(Reader *reader, const std::string &client_name,
+ const hadoop::common::TokenProto *token,
+ const hadoop::hdfs::ExtendedBlockProto *block,
+ uint64_t length, uint64_t offset)
+ : reader_(reader)
+ , client_name_(client_name)
+ , length_(length)
+ , offset_(offset)
+ {
+ if (token) {
+ token_.reset(new hadoop::common::TokenProto());
+ token_->CheckTypeAndMergeFrom(*token);
+ }
+ block_.CheckTypeAndMergeFrom(*block);
+ }
+
+ virtual void Run(const Next& next) override {
+ reader_->async_connect(client_name_, token_.get(), &block_, length_, offset_, next);
+ }
+
+ private:
+ Reader *reader_;
+ const std::string client_name_;
+ std::unique_ptr<hadoop::common::TokenProto> token_;
+ hadoop::hdfs::ExtendedBlockProto block_;
+ uint64_t length_;
+ uint64_t offset_;
+};
+
+template<class MutableBufferSequence>
+struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation {
+ typedef RemoteBlockReader<::asio::ip::tcp::socket> Reader;
+ ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer,
+ size_t *transferred)
+ : reader_(reader)
+ , buffer_(buffer)
+ , buffer_size_(asio::buffer_size(buffer))
+ , transferred_(transferred)
+ {}
+
+ virtual void Run(const Next& next) override {
+ *transferred_ = 0;
+ next_ = next;
+ OnReadData(Status::OK(), 0);
+ }
+
+ private:
+ Reader *reader_;
+ MutableBufferSequence buffer_;
+ const size_t buffer_size_;
+ size_t *transferred_;
+ std::function<void(const Status &)> next_;
+
+ void OnReadData(const Status &status, size_t transferred) {
+ using std::placeholders::_1;
+ using std::placeholders::_2;
+ *transferred_ += transferred;
+ if (!status.ok()) {
+ next_(status);
+ } else if (*transferred_ >= buffer_size_) {
+ next_(status);
+ } else {
+ reader_->async_read_some(
+ asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
+ std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
+ }
+ }
+};
+
+
+template<class MutableBufferSequence, class Handler>
+void InputStreamImpl::AsyncPreadSome(
+ size_t offset, const MutableBufferSequence &buffers,
+ const Handler &handler) {
+ using ::hadoop::hdfs::LocatedBlockProto;
+ namespace ip = ::asio::ip;
+ using ::asio::ip::tcp;
+
+ auto it = std::find_if(
+ blocks_.begin(), blocks_.end(),
+ [offset](const LocatedBlockProto &p) {
+ return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
+ });
+
+ if (it == blocks_.end()) {
+ handler(Status::InvalidArgument("Cannot find corresponding blocks"), 0);
+ return;
+ } else if (!it->locs_size()) {
+ handler(Status::ResourceUnavailable("No datanodes available"), 0);
+ return;
+ }
+
+ uint64_t offset_within_block = offset - it->offset();
+ uint64_t size_within_block =
+ std::min<uint64_t>(it->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
+
+ struct State {
+ std::unique_ptr<tcp::socket> conn;
+ std::shared_ptr<RemoteBlockReader<tcp::socket> > reader;
+ LocatedBlockProto block;
+ std::vector<tcp::endpoint> endpoints;
+ size_t transferred;
+ };
+
+ auto m = continuation::Pipeline<State>::Create();
+ auto &s = m->state();
+ s.conn.reset(new tcp::socket(fs_->rpc_engine().io_service()));
+ s.reader = std::make_shared<RemoteBlockReader<tcp::socket> >(BlockReaderOptions(), s.conn.get());
+ s.block = *it;
+ for (auto &loc : it->locs()) {
+ auto datanode = loc.id();
+ s.endpoints.push_back(tcp::endpoint(ip::address::from_string(datanode.ipaddr()), datanode.xferport()));
+ }
+
+ m->Push(continuation::Connect(s.conn.get(), s.endpoints.begin(), s.endpoints.end()))
+ .Push(new HandshakeContinuation(s.reader.get(), fs_->rpc_engine().client_name(), nullptr,
+ &s.block.b(), size_within_block, offset_within_block))
+ .Push(new ReadBlockContinuation<::asio::mutable_buffers_1>(
+ s.reader.get(), asio::buffer(buffers, size_within_block), &s.transferred));
+
+ m->Run([handler](const Status &status, const State &state) {
+ handler(status, state.transferred);
+ });
+}
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc
new file mode 100644
index 0000000..dceac86
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "libhdfspp/hdfs.h"
+
+#include <iostream>
+#include <string>
+#include <thread>
+
+using namespace hdfs;
+
+class Executor {
+ public:
+ Executor()
+ : io_service_(IoService::New())
+ , thread_(std::bind(&IoService::Run, io_service_.get()))
+ {}
+
+ IoService *io_service() { return io_service_.get(); }
+
+ ~Executor() {
+ io_service_->Stop();
+ thread_.join();
+ }
+
+ std::unique_ptr<IoService> io_service_;
+ std::thread thread_;
+};
+
+int main(int argc, char *argv[]) {
+ if (argc != 4) {
+ std::cerr
+ << "Print files stored in a HDFS cluster.\n"
+ << "Usage: " << argv[0] << " "
+ << "<nnhost> <nnport> <file>\n";
+ return 1;
+ }
+
+ Executor executor;
+
+ FileSystem *fsptr;
+ Status stat = FileSystem::New(executor.io_service(), argv[1], std::stoi(argv[2]), &fsptr);
+ if (!stat.ok()) {
+ std::cerr << "Cannot create the filesystem: " << stat.ToString() << std::endl;
+ return 1;
+ }
+
+ std::unique_ptr<FileSystem> fs(fsptr);
+
+ InputStream *isptr;
+ stat = fs->Open(argv[3], &isptr);
+ if (!stat.ok()) {
+ std::cerr << "Cannot open the file: " << stat.ToString() << std::endl;
+ return 1;
+ }
+
+ std::unique_ptr<InputStream> is(isptr);
+
+ char buf[8192] = {0,};
+ size_t read_bytes = 0;
+ stat = is->PositionRead(buf, sizeof(buf), 0, &read_bytes);
+ if (!stat.ok()) {
+ std::cerr << "Read failures: " << stat.ToString() << std::endl;
+ }
+ std::cerr << "Read bytes:" << read_bytes << std::endl << buf << std::endl;
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h
new file mode 100644
index 0000000..80aa237
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FS_NAMENODE_PROTOCOL_H_
+#define FS_NAMENODE_PROTOCOL_H_
+
+#include "ClientNamenodeProtocol.pb.h"
+#include "rpc/rpc_engine.h"
+
+namespace hdfs {
+
+class ClientNamenodeProtocol {
+ public:
+ ClientNamenodeProtocol(RpcEngine *engine)
+ : engine_(engine)
+ {}
+
+ Status GetBlockLocations(const ::hadoop::hdfs::GetBlockLocationsRequestProto *request,
+ std::shared_ptr<::hadoop::hdfs::GetBlockLocationsResponseProto> response) {
+ return engine_->Rpc("getBlockLocations", request, response);
+ }
+ private:
+ RpcEngine *engine_;
+};
+
+};
+
+#endif