You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/07/14 21:48:16 UTC

[1/5] hadoop git commit: HDFS-8758. Implement the continuation library in libhdfspp. Contributed by Haohui Mai.

Repository: hadoop
Updated Branches:
  refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1 [created] 08e12b0cb


HDFS-8758. Implement the continuation library in libhdfspp. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8b02d962
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8b02d962
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8b02d962

Branch: refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1
Commit: 8b02d962b291afe4b08c47f0398c1db0709419a1
Parents: ac60c6a
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Jul 9 14:02:55 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Jul 10 17:09:01 2015 -0700

----------------------------------------------------------------------
 .../src/main/native/libhdfspp/doc/Doxyfile.in   |   1 +
 .../libhdfspp/lib/common/continuation/asio.h    | 112 ++++++++++++++++
 .../lib/common/continuation/continuation.h      | 125 ++++++++++++++++++
 .../lib/common/continuation/protobuf.h          | 128 +++++++++++++++++++
 4 files changed, 366 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in
index 773990f..ac1d0fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in
@@ -8,6 +8,7 @@ BUILTIN_STL_SUPPORT    = YES
 
 INPUT                  = @PROJECT_SOURCE_DIR@/doc/mainpage.dox \
                          @PROJECT_SOURCE_DIR@/include/libhdfspp \
+                         @PROJECT_SOURCE_DIR@/lib/common/continuation \
 
 INPUT_ENCODING         = UTF-8
 RECURSIVE              = NO

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
new file mode 100644
index 0000000..f7d76e8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_COMMON_CONTINUATION_ASIO_H_
+#define LIB_COMMON_CONTINUATION_ASIO_H_
+
+#include "continuation.h"
+#include "common/util.h"
+
+#include "libhdfspp/status.h"
+
+#include <asio/connect.hpp>
+#include <asio/read.hpp>
+#include <asio/write.hpp>
+#include <asio/ip/tcp.hpp>
+
+namespace hdfs {
+namespace continuation {
+
+template <class Stream, class MutableBufferSequence>
+class ReadContinuation : public Continuation {
+public:
+  ReadContinuation(Stream *stream, const MutableBufferSequence &buffer)
+      : stream_(stream), buffer_(buffer) {}
+  virtual void Run(const Next &next) override {
+    auto handler =
+        [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); };
+    asio::async_read(*stream_, buffer_, handler);
+  }
+
+private:
+  Stream *stream_;
+  MutableBufferSequence buffer_;
+};
+
+template <class Stream, class ConstBufferSequence>
+class WriteContinuation : public Continuation {
+public:
+  WriteContinuation(Stream *stream, const ConstBufferSequence &buffer)
+      : stream_(stream), buffer_(buffer) {}
+
+  virtual void Run(const Next &next) override {
+    auto handler =
+        [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); };
+    asio::async_write(*stream_, buffer_, handler);
+  }
+
+private:
+  Stream *stream_;
+  ConstBufferSequence buffer_;
+};
+
+template <class Socket, class Iterator>
+class ConnectContinuation : public Continuation {
+public:
+  ConnectContinuation(Socket *socket, Iterator begin, Iterator end,
+                      Iterator *connected_endpoint)
+      : socket_(socket), begin_(begin), end_(end),
+        connected_endpoint_(connected_endpoint) {}
+
+  virtual void Run(const Next &next) override {
+    auto handler = [this, next](const asio::error_code &ec, Iterator it) {
+      if (connected_endpoint_) {
+        *connected_endpoint_ = it;
+      }
+      next(ToStatus(ec));
+    };
+    asio::async_connect(*socket_, begin_, end_, handler);
+  }
+
+private:
+  Socket *socket_;
+  Iterator begin_;
+  Iterator end_;
+  Iterator *connected_endpoint_;
+};
+
+template <class Stream, class ConstBufferSequence>
+static inline Continuation *Write(Stream *stream,
+                                  const ConstBufferSequence &buffer) {
+  return new WriteContinuation<Stream, ConstBufferSequence>(stream, buffer);
+}
+
+template <class Stream, class MutableBufferSequence>
+static inline Continuation *Read(Stream *stream,
+                                 const MutableBufferSequence &buffer) {
+  return new ReadContinuation<Stream, MutableBufferSequence>(stream, buffer);
+}
+
+template <class Socket, class Iterator>
+static inline Continuation *Connect(Socket *socket, Iterator begin,
+                                    Iterator end) {
+  return new ConnectContinuation<Socket, Iterator>(socket, begin, end, nullptr);
+}
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
new file mode 100644
index 0000000..9576c2f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_COMMON_CONTINUATION_CONTINUATION_H_
+#define LIB_COMMON_CONTINUATION_CONTINUATION_H_
+
+#include "libhdfspp/status.h"
+
+#include <functional>
+#include <memory>
+#include <vector>
+
+namespace hdfs {
+namespace continuation {
+
+class PipelineBase;
+
+/**
+ * A continuation is a fragment of runnable code whose execution will
+ * be scheduled by a \link Pipeline \endlink.
+ *
+ * The Continuation class is a build block to implement the
+ * Continuation Passing Style (CPS) in libhdfs++. In CPS, the
+ * upper-level user specifies the control flow by chaining a sequence
+ * of continuations explicitly through the \link Run() \endlink method,
+ * while in traditional imperative programming the sequences of
+ * sentences implicitly specify the control flow.
+ *
+ * See http://en.wikipedia.org/wiki/Continuation for more details.
+ **/
+class Continuation {
+public:
+  typedef std::function<void(const Status &)> Next;
+  virtual ~Continuation() = default;
+  virtual void Run(const Next &next) = 0;
+  Continuation(const Continuation &) = delete;
+  Continuation &operator=(const Continuation &) = delete;
+
+protected:
+  Continuation() = default;
+};
+
+/**
+ * A pipeline schedules the execution of a chain of \link Continuation
+ * \endlink. The pipeline schedules the execution of continuations
+ * based on their order in the pipeline, where the next parameter for
+ * each continuation points to the \link Schedule() \endlink
+ * method. That way the pipeline executes all scheduled continuations
+ * in sequence.
+ *
+ * The typical use case of a pipeline is executing continuations
+ * asynchronously. Note that a continuation calls the next
+ * continuation when it is finished. If the continuation is posted
+ * into an asynchronous event loop, invoking the next continuation
+ * can be done in the callback handler in the asynchronous event loop.
+ *
+ * The pipeline allocates the memory as follows. A pipeline is always
+ * allocated on the heap. It owns all the continuations as well as the
+ * the state specified by the user. Both the continuations and the
+ * state have the same life cycle of the pipeline. The design
+ * simplifies the problem of ensuring that the executions in the
+ * asynchronous event loop always hold valid pointers w.r.t. the
+ * pipeline. The pipeline will automatically deallocate itself right
+ * after it invokes the callback specified the user.
+ **/
+template <class State> class Pipeline {
+public:
+  typedef std::function<void(const Status &, const State &)> UserHandler;
+  static Pipeline *Create() { return new Pipeline(); }
+  Pipeline &Push(Continuation *stage);
+  void Run(UserHandler &&handler);
+  State &state() { return state_; }
+
+private:
+  State state_;
+  std::vector<std::unique_ptr<Continuation>> routines_;
+  size_t stage_;
+  std::function<void(const Status &, const State &)> handler_;
+
+  Pipeline() : stage_(0) {}
+  ~Pipeline() = default;
+  void Schedule(const Status &status);
+};
+
+template <class State>
+inline Pipeline<State> &Pipeline<State>::Push(Continuation *stage) {
+  routines_.emplace_back(std::unique_ptr<Continuation>(stage));
+  return *this;
+}
+
+template <class State>
+inline void Pipeline<State>::Schedule(const Status &status) {
+  if (stage_ >= routines_.size()) {
+    handler_(status, state_);
+    routines_.clear();
+    delete this;
+  } else {
+    auto next = routines_[stage_].get();
+    ++stage_;
+    next->Run(std::bind(&Pipeline::Schedule, this, std::placeholders::_1));
+  }
+}
+
+template <class State> inline void Pipeline<State>::Run(UserHandler &&handler) {
+  handler_ = std::move(handler);
+  Schedule(Status::OK());
+}
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
new file mode 100644
index 0000000..3e4b535
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_
+#define LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_
+
+#include "common/util.h"
+
+#include <google/protobuf/message_lite.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
+#include <cassert>
+
+namespace hdfs {
+namespace continuation {
+
+template <class Stream, size_t MaxMessageSize = 512>
+struct ReadDelimitedPBMessageContinuation : public Continuation {
+  ReadDelimitedPBMessageContinuation(Stream *stream,
+                                     ::google::protobuf::MessageLite *msg)
+      : stream_(stream), msg_(msg) {}
+
+  virtual void Run(const Next &next) override {
+    namespace pbio = google::protobuf::io;
+    auto handler = [this, next](const asio::error_code &ec, size_t) {
+      Status status;
+      if (ec) {
+        status = ToStatus(ec);
+      } else {
+        pbio::ArrayInputStream as(&buf_[0], buf_.size());
+        pbio::CodedInputStream is(&as);
+        uint32_t size = 0;
+        bool v = is.ReadVarint32(&size);
+        assert(v);
+        is.PushLimit(size);
+        msg_->Clear();
+        v = msg_->MergeFromCodedStream(&is);
+        assert(v);
+      }
+      next(status);
+    };
+    asio::async_read(
+        *stream_, asio::buffer(buf_),
+        std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this,
+                  std::placeholders::_1, std::placeholders::_2),
+        handler);
+  }
+
+private:
+  size_t CompletionHandler(const asio::error_code &ec, size_t transferred) {
+    if (ec) {
+      return 0;
+    }
+
+    size_t offset = 0, len = 0;
+    for (size_t i = 0; i + 1 < transferred && i < sizeof(int); ++i) {
+      len = (len << 7) | (buf_[i] & 0x7f);
+      if ((uint8_t)buf_.at(i) < 0x80) {
+        offset = i + 1;
+        break;
+      }
+    }
+
+    assert(offset + len < buf_.size() && "Message is too big");
+    return offset ? len + offset - transferred : 1;
+  }
+
+  Stream *stream_;
+  ::google::protobuf::MessageLite *msg_;
+  std::array<char, MaxMessageSize> buf_;
+};
+
+template <class Stream>
+struct WriteDelimitedPBMessageContinuation : Continuation {
+  WriteDelimitedPBMessageContinuation(Stream *stream,
+                                      const google::protobuf::MessageLite *msg)
+      : stream_(stream), msg_(msg) {}
+
+  virtual void Run(const Next &next) override {
+    namespace pbio = google::protobuf::io;
+    int size = msg_->ByteSize();
+    buf_.reserve(pbio::CodedOutputStream::VarintSize32(size) + size);
+    pbio::StringOutputStream ss(&buf_);
+    pbio::CodedOutputStream os(&ss);
+    os.WriteVarint32(size);
+    msg_->SerializeToCodedStream(&os);
+    write_coroutine_ =
+        std::shared_ptr<Continuation>(Write(stream_, asio::buffer(buf_)));
+    write_coroutine_->Run([next](const Status &stat) { next(stat); });
+  }
+
+private:
+  Stream *stream_;
+  const google::protobuf::MessageLite *msg_;
+  std::string buf_;
+  std::shared_ptr<Continuation> write_coroutine_;
+};
+
+template <class Stream, size_t MaxMessageSize = 512>
+static inline Continuation *
+ReadDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
+  return new ReadDelimitedPBMessageContinuation<Stream, MaxMessageSize>(stream,
+                                                                        msg);
+}
+
+template <class Stream>
+static inline Continuation *
+WriteDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
+  return new WriteDelimitedPBMessageContinuation<Stream>(stream, msg);
+}
+}
+}
+#endif


[3/5] hadoop git commit: HDFS-8759. Implement remote block reader in libhdfspp. Contributed by Haohui Mai.

Posted by wh...@apache.org.
HDFS-8759. Implement remote block reader in libhdfspp. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6416b38c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6416b38c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6416b38c

Branch: refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1
Commit: 6416b38cade36a1c251ef96d5a0f9b80b785e58b
Parents: 8b02d96
Author: Haohui Mai <wh...@apache.org>
Authored: Fri Jul 10 16:50:45 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Jul 14 12:47:00 2015 -0700

----------------------------------------------------------------------
 .../native/libhdfspp/include/libhdfspp/status.h |   4 +-
 .../main/native/libhdfspp/lib/CMakeLists.txt    |  20 ++
 .../native/libhdfspp/lib/common/CMakeLists.txt  |   1 +
 .../main/native/libhdfspp/lib/common/base64.cc  |  71 ++++
 .../main/native/libhdfspp/lib/common/status.cc  |  66 ++++
 .../native/libhdfspp/lib/reader/CMakeLists.txt  |  20 ++
 .../native/libhdfspp/lib/reader/block_reader.h  | 114 +++++++
 .../native/libhdfspp/lib/reader/datatransfer.h  |  35 ++
 .../libhdfspp/lib/reader/remote_block_reader.cc |  46 +++
 .../lib/reader/remote_block_reader_impl.h       | 342 +++++++++++++++++++
 10 files changed, 718 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h
index 9436c8b..d2ef005 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h
@@ -45,6 +45,8 @@ class Status {
   { return Status(kUnimplemented, ""); }
   static Status Exception(const char *expception_class_name, const char *error_message)
   { return Status(kException, expception_class_name, error_message); }
+  static Status Error(const char *error_message)
+  { return Exception("Exception", error_message); }
 
   // Returns true iff the status indicates success.
   bool ok() const { return (state_ == NULL); }
@@ -71,7 +73,7 @@ class Status {
     kInvalidArgument = static_cast<unsigned>(std::errc::invalid_argument),
     kResourceUnavailable = static_cast<unsigned>(std::errc::resource_unavailable_try_again),
     kUnimplemented = static_cast<unsigned>(std::errc::function_not_supported),
-    kException = 256,
+    kException = 255,
   };
 
   explicit Status(int code, const char *msg1, const char *msg2);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
index 7458453..e77942b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
@@ -1,2 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+add_subdirectory(common)
+add_subdirectory(reader)
 add_subdirectory(rpc)
 add_subdirectory(proto)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
new file mode 100644
index 0000000..570d0ac
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
@@ -0,0 +1 @@
+add_library(common base64.cc status.cc)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/base64.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/base64.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/base64.cc
new file mode 100644
index 0000000..f98fec5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/base64.cc
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util.h"
+
+#include <array>
+#include <functional>
+#include <algorithm>
+
+namespace hdfs {
+
+std::string Base64Encode(const std::string &src) {
+  static const char kDictionary[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+                                    "abcdefghijklmnopqrstuvwxyz"
+                                    "0123456789+/";
+
+  int encoded_size = (src.size() + 2) / 3 * 4;
+  std::string dst;
+  dst.reserve(encoded_size);
+
+  size_t i = 0;
+  while (i + 3 < src.length()) {
+    const char *s = &src[i];
+    const int r[4] = {s[0] >> 2, ((s[0] << 4) | (s[1] >> 4)) & 0x3f,
+                      ((s[1] << 2) | (s[2] >> 6)) & 0x3f, s[2] & 0x3f};
+
+    std::transform(r, r + sizeof(r) / sizeof(int), std::back_inserter(dst),
+                   [&r](unsigned char v) { return kDictionary[v]; });
+    i += 3;
+  }
+
+  size_t remained = src.length() - i;
+  const char *s = &src[i];
+
+  switch (remained) {
+  case 0:
+    break;
+  case 1: {
+    char padding[4] = {kDictionary[s[0] >> 2], kDictionary[(s[0] << 4) & 0x3f],
+                       '=', '='};
+    dst.append(padding, sizeof(padding));
+  } break;
+  case 2: {
+    char padding[4] = {kDictionary[src[i] >> 2],
+                       kDictionary[((s[0] << 4) | (s[1] >> 4)) & 0x3f],
+                       kDictionary[(s[1] << 2) & 0x3f], '='};
+    dst.append(padding, sizeof(padding));
+  } break;
+  default:
+    assert("Unreachable");
+    break;
+  }
+  return dst;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/status.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/status.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/status.cc
new file mode 100644
index 0000000..66cfa1c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/status.cc
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "libhdfspp/status.h"
+
+#include <cassert>
+#include <cstring>
+
+namespace hdfs {
+
+Status::Status(int code, const char *msg1)
+    : state_(ConstructState(code, msg1, nullptr)) {}
+
+Status::Status(int code, const char *msg1, const char *msg2)
+    : state_(ConstructState(code, msg1, msg2)) {}
+
+const char *Status::ConstructState(int code, const char *msg1,
+                                   const char *msg2) {
+  assert(code != kOk);
+  const uint32_t len1 = strlen(msg1);
+  const uint32_t len2 = msg2 ? strlen(msg2) : 0;
+  const uint32_t size = len1 + (len2 ? (2 + len2) : 0);
+  char *result = new char[size + 8 + 2];
+  *reinterpret_cast<uint32_t *>(result) = size;
+  *reinterpret_cast<uint32_t *>(result + 4) = code;
+  memcpy(result + 8, msg1, len1);
+  if (len2) {
+    result[8 + len1] = ':';
+    result[9 + len1] = ' ';
+    memcpy(result + 10 + len1, msg2, len2);
+  }
+  return result;
+}
+
+std::string Status::ToString() const {
+  if (!state_) {
+    return "OK";
+  } else {
+    uint32_t length = *reinterpret_cast<const uint32_t *>(state_);
+    return std::string(state_ + 8, length);
+  }
+}
+
+const char *Status::CopyState(const char *state) {
+  uint32_t size;
+  memcpy(&size, state, sizeof(size));
+  char *result = new char[size + 8];
+  memcpy(result, state, size + 8);
+  return result;
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
new file mode 100644
index 0000000..65ec108
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+add_library(reader remote_block_reader.cc)
+add_dependencies(reader proto)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/block_reader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/block_reader.h
new file mode 100644
index 0000000..81636b9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/block_reader.h
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef BLOCK_READER_H_
+#define BLOCK_READER_H_
+
+#include "libhdfspp/status.h"
+#include "datatransfer.pb.h"
+
+#include <memory>
+
+namespace hdfs {
+
+struct CacheStrategy {
+  bool drop_behind_specified;
+  bool drop_behind;
+  bool read_ahead_specified;
+  unsigned long long read_ahead;
+  CacheStrategy()
+      : drop_behind_specified(false), drop_behind(false),
+        read_ahead_specified(false), read_ahead(false) {}
+};
+
+enum DropBehindStrategy {
+  kUnspecified = 0,
+  kEnableDropBehind = 1,
+  kDisableDropBehind = 2,
+};
+
+enum EncryptionScheme {
+  kNone = 0,
+  kAESCTRNoPadding = 1,
+};
+
+struct BlockReaderOptions {
+  bool verify_checksum;
+  CacheStrategy cache_strategy;
+  EncryptionScheme encryption_scheme;
+
+  BlockReaderOptions()
+      : verify_checksum(true), encryption_scheme(EncryptionScheme::kNone) {}
+};
+
+template <class Stream>
+class RemoteBlockReader
+    : public std::enable_shared_from_this<RemoteBlockReader<Stream>> {
+public:
+  explicit RemoteBlockReader(const BlockReaderOptions &options, Stream *stream)
+      : stream_(stream), state_(kOpen), options_(options),
+        chunk_padding_bytes_(0) {}
+
+  template <class MutableBufferSequence, class ReadHandler>
+  void async_read_some(const MutableBufferSequence &buffers,
+                       const ReadHandler &handler);
+
+  template <class MutableBufferSequence>
+  size_t read_some(const MutableBufferSequence &buffers, Status *status);
+
+  Status connect(const std::string &client_name,
+                 const hadoop::common::TokenProto *token,
+                 const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+                 uint64_t offset);
+
+  template <class ConnectHandler>
+  void async_connect(const std::string &client_name,
+                     const hadoop::common::TokenProto *token,
+                     const hadoop::hdfs::ExtendedBlockProto *block,
+                     uint64_t length, uint64_t offset,
+                     const ConnectHandler &handler);
+
+private:
+  struct ReadPacketHeader;
+  struct ReadChecksum;
+  struct ReadPadding;
+  template <class MutableBufferSequence> struct ReadData;
+  struct AckRead;
+  enum State {
+    kOpen,
+    kReadPacketHeader,
+    kReadChecksum,
+    kReadPadding,
+    kReadData,
+    kFinished,
+  };
+
+  Stream *stream_;
+  hadoop::hdfs::PacketHeaderProto header_;
+  State state_;
+  BlockReaderOptions options_;
+  size_t packet_len_;
+  int packet_data_read_bytes_;
+  int chunk_padding_bytes_;
+  long long bytes_to_read_;
+  std::vector<char> checksum_;
+};
+}
+
+#include "remote_block_reader_impl.h"
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
new file mode 100644
index 0000000..d22f5e8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef COMMON_DATA_TRANSFER_H_
+#define COMMON_DATA_TRANSFER_H_
+
+namespace hdfs {
+
+enum {
+  kDataTransferVersion = 28,
+  kDataTransferSasl = 0xdeadbeef,
+};
+
+enum Operation {
+  kWriteBlock = 80,
+  kReadBlock  = 81,
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
new file mode 100644
index 0000000..68bc4ee
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "block_reader.h"
+
+namespace hdfs {
+
+hadoop::hdfs::OpReadBlockProto
+ReadBlockProto(const std::string &client_name, bool verify_checksum,
+               const hadoop::common::TokenProto *token,
+               const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+               uint64_t offset) {
+  using namespace hadoop::hdfs;
+  using namespace hadoop::common;
+  BaseHeaderProto *base_h = new BaseHeaderProto();
+  base_h->set_allocated_block(new ExtendedBlockProto(*block));
+  if (token) {
+    base_h->set_allocated_token(new TokenProto(*token));
+  }
+  ClientOperationHeaderProto *h = new ClientOperationHeaderProto();
+  h->set_clientname(client_name);
+  h->set_allocated_baseheader(base_h);
+
+  OpReadBlockProto p;
+  p.set_allocated_header(h);
+  p.set_offset(offset);
+  p.set_len(length);
+  p.set_sendchecksums(verify_checksum);
+  // TODO: p.set_allocated_cachingstrategy();
+  return p;
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
new file mode 100644
index 0000000..68ea6ad
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_
+#define LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_
+
+#include "datatransfer.h"
+#include "common/continuation/asio.h"
+#include "common/continuation/protobuf.h"
+
+#include <asio/buffers_iterator.hpp>
+#include <asio/streambuf.hpp>
+#include <asio/write.hpp>
+
+#include <arpa/inet.h>
+
+#include <future>
+
+namespace hdfs {
+
+hadoop::hdfs::OpReadBlockProto
+ReadBlockProto(const std::string &client_name, bool verify_checksum,
+               const hadoop::common::TokenProto *token,
+               const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+               uint64_t offset);
+
+template <class Stream>
+template <class ConnectHandler>
+void RemoteBlockReader<Stream>::async_connect(
+    const std::string &client_name, const hadoop::common::TokenProto *token,
+    const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+    uint64_t offset, const ConnectHandler &handler) {
+  // The total number of bytes that we need to transfer from the DN is
+  // the amount that the user wants (bytesToRead), plus the padding at
+  // the beginning in order to chunk-align. Note that the DN may elect
+  // to send more than this amount if the read starts/ends mid-chunk.
+  bytes_to_read_ = length;
+
+  struct State {
+    std::string header;
+    hadoop::hdfs::OpReadBlockProto request;
+    hadoop::hdfs::BlockOpResponseProto response;
+  };
+
+  auto m = continuation::Pipeline<State>::Create();
+  State *s = &m->state();
+
+  s->header.insert(s->header.begin(),
+                   {0, kDataTransferVersion, Operation::kReadBlock});
+  s->request = std::move(ReadBlockProto(client_name, options_.verify_checksum,
+                                        token, block, length, offset));
+
+  auto read_pb_message =
+      new continuation::ReadDelimitedPBMessageContinuation<Stream, 16384>(
+          stream_, &s->response);
+
+  m->Push(continuation::Write(stream_, asio::buffer(s->header)))
+      .Push(continuation::WriteDelimitedPBMessage(stream_, &s->request))
+      .Push(read_pb_message);
+
+  m->Run([this, handler, offset](const Status &status, const State &s) {
+    Status stat = status;
+    if (stat.ok()) {
+      const auto &resp = s.response;
+      if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
+        if (resp.has_readopchecksuminfo()) {
+          const auto &checksum_info = resp.readopchecksuminfo();
+          chunk_padding_bytes_ = offset - checksum_info.chunkoffset();
+        }
+        state_ = kReadPacketHeader;
+      } else {
+        stat = Status::Error(s.response.message().c_str());
+      }
+    }
+    handler(stat);
+  });
+}
+
+template <class Stream>
+struct RemoteBlockReader<Stream>::ReadPacketHeader
+    : continuation::Continuation {
+  ReadPacketHeader(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
+
+  virtual void Run(const Next &next) override {
+    parent_->packet_data_read_bytes_ = 0;
+    parent_->packet_len_ = 0;
+    auto handler = [next, this](const asio::error_code &ec, size_t) {
+      Status status;
+      if (ec) {
+        status = Status(ec.value(), ec.message().c_str());
+      } else {
+        parent_->packet_len_ = packet_length();
+        parent_->header_.Clear();
+        bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart],
+                                                 header_length());
+        assert(v && "Failed to parse the header");
+        parent_->state_ = kReadChecksum;
+      }
+      next(status);
+    };
+
+    asio::async_read(*parent_->stream_, asio::buffer(buf_),
+                     std::bind(&ReadPacketHeader::CompletionHandler, this,
+                               std::placeholders::_1, std::placeholders::_2),
+                     handler);
+  }
+
+private:
+  static const size_t kMaxHeaderSize = 512;
+  static const size_t kPayloadLenOffset = 0;
+  static const size_t kPayloadLenSize = sizeof(int);
+  static const size_t kHeaderLenOffset = 4;
+  static const size_t kHeaderLenSize = sizeof(short);
+  static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize;
+
+  RemoteBlockReader<Stream> *parent_;
+  std::array<char, kMaxHeaderSize> buf_;
+
+  size_t packet_length() const {
+    return ntohl(*reinterpret_cast<const unsigned *>(&buf_[kPayloadLenOffset]));
+  }
+
+  size_t header_length() const {
+    return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset]));
+  }
+
+  size_t CompletionHandler(const asio::error_code &ec, size_t transferred) {
+    if (ec) {
+      return 0;
+    } else if (transferred < kHeaderStart) {
+      return kHeaderStart - transferred;
+    } else {
+      return kHeaderStart + header_length() - transferred;
+    }
+  }
+};
+
+template <class Stream>
+struct RemoteBlockReader<Stream>::ReadChecksum : continuation::Continuation {
+  ReadChecksum(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
+
+  virtual void Run(const Next &next) override {
+    auto parent = parent_;
+    if (parent->state_ != kReadChecksum) {
+      next(Status::OK());
+      return;
+    }
+
+    auto handler = [parent, next](const asio::error_code &ec, size_t) {
+      Status status;
+      if (ec) {
+        status = Status(ec.value(), ec.message().c_str());
+      } else {
+        parent->state_ =
+            parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
+      }
+      next(status);
+    };
+    parent->checksum_.resize(parent->packet_len_ - sizeof(int) -
+                             parent->header_.datalen());
+    asio::async_read(*parent->stream_, asio::buffer(parent->checksum_),
+                     handler);
+  }
+
+private:
+  RemoteBlockReader<Stream> *parent_;
+};
+
+template <class Stream>
+struct RemoteBlockReader<Stream>::ReadPadding : continuation::Continuation {
+  ReadPadding(RemoteBlockReader<Stream> *parent)
+      : parent_(parent), padding_(parent->chunk_padding_bytes_),
+        bytes_transferred_(std::make_shared<size_t>(0)),
+        read_data_(new ReadData<asio::mutable_buffers_1>(
+            parent, bytes_transferred_, asio::buffer(padding_))) {}
+
+  virtual void Run(const Next &next) override {
+    if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) {
+      next(Status::OK());
+      return;
+    }
+
+    auto h = [next, this](const Status &status) {
+      if (status.ok()) {
+        assert(reinterpret_cast<const int &>(*bytes_transferred_) ==
+               parent_->chunk_padding_bytes_);
+        parent_->chunk_padding_bytes_ = 0;
+        parent_->state_ = kReadData;
+      }
+      next(status);
+    };
+    read_data_->Run(h);
+  }
+
+private:
+  RemoteBlockReader<Stream> *parent_;
+  std::vector<char> padding_;
+  std::shared_ptr<size_t> bytes_transferred_;
+  std::shared_ptr<continuation::Continuation> read_data_;
+  ReadPadding(const ReadPadding &) = delete;
+  ReadPadding &operator=(const ReadPadding &) = delete;
+};
+
+template <class Stream>
+template <class MutableBufferSequence>
+struct RemoteBlockReader<Stream>::ReadData : continuation::Continuation {
+  ReadData(RemoteBlockReader<Stream> *parent,
+           std::shared_ptr<size_t> bytes_transferred,
+           const MutableBufferSequence &buf)
+      : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) {}
+
+  virtual void Run(const Next &next) override {
+    auto handler =
+        [next, this](const asio::error_code &ec, size_t transferred) {
+          Status status;
+          if (ec) {
+            status = Status(ec.value(), ec.message().c_str());
+          }
+          *bytes_transferred_ += transferred;
+          parent_->bytes_to_read_ -= transferred;
+          parent_->packet_data_read_bytes_ += transferred;
+          if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
+            parent_->state_ = kReadPacketHeader;
+          }
+          next(status);
+        };
+
+    auto data_len =
+        parent_->header_.datalen() - parent_->packet_data_read_bytes_;
+    async_read(*parent_->stream_, buf_, asio::transfer_exactly(data_len),
+               handler);
+  }
+
+private:
+  RemoteBlockReader<Stream> *parent_;
+  std::shared_ptr<size_t> bytes_transferred_;
+  MutableBufferSequence buf_;
+};
+
+template <class Stream>
+struct RemoteBlockReader<Stream>::AckRead : continuation::Continuation {
+  AckRead(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
+
+  virtual void Run(const Next &next) override {
+    if (parent_->bytes_to_read_ > 0) {
+      next(Status::OK());
+      return;
+    }
+
+    auto m =
+        continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create();
+    m->state().set_status(parent_->options_.verify_checksum
+                              ? hadoop::hdfs::Status::CHECKSUM_OK
+                              : hadoop::hdfs::Status::SUCCESS);
+
+    m->Push(
+        continuation::WriteDelimitedPBMessage(parent_->stream_, &m->state()));
+
+    m->Run([this, next](const Status &status,
+                        const hadoop::hdfs::ClientReadStatusProto &) {
+      if (status.ok()) {
+        parent_->state_ = RemoteBlockReader<Stream>::kFinished;
+      }
+      next(status);
+    });
+  }
+
+private:
+  RemoteBlockReader<Stream> *parent_;
+};
+
+template <class Stream>
+template <class MutableBufferSequence, class ReadHandler>
+void RemoteBlockReader<Stream>::async_read_some(
+    const MutableBufferSequence &buffers, const ReadHandler &handler) {
+  assert(state_ != kOpen && "Not connected");
+
+  struct State {
+    std::shared_ptr<size_t> bytes_transferred;
+  };
+  auto m = continuation::Pipeline<State>::Create();
+  m->state().bytes_transferred = std::make_shared<size_t>(0);
+
+  m->Push(new ReadPacketHeader(this))
+      .Push(new ReadChecksum(this))
+      .Push(new ReadPadding(this))
+      .Push(new ReadData<MutableBufferSequence>(
+          this, m->state().bytes_transferred, buffers))
+      .Push(new AckRead(this));
+
+  auto self = this->shared_from_this();
+  m->Run([self, handler](const Status &status, const State &state) {
+    handler(status, *state.bytes_transferred);
+  });
+}
+
+template <class Stream>
+template <class MutableBufferSequence>
+size_t
+RemoteBlockReader<Stream>::read_some(const MutableBufferSequence &buffers,
+                                     Status *status) {
+  size_t transferred = 0;
+  auto done = std::make_shared<std::promise<void>>();
+  auto future = done->get_future();
+  async_read_some(buffers,
+                  [status, &transferred, done](const Status &stat, size_t t) {
+                    *status = stat;
+                    transferred = t;
+                    done->set_value();
+                  });
+  future.wait();
+  return transferred;
+}
+
+template <class Stream>
+Status RemoteBlockReader<Stream>::connect(
+    const std::string &client_name, const hadoop::common::TokenProto *token,
+    const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+    uint64_t offset) {
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(stat->get_future());
+  async_connect(client_name, token, block, length, offset,
+                [stat](const Status &status) { stat->set_value(status); });
+  return future.get();
+}
+}
+
+#endif


[5/5] hadoop git commit: f

Posted by wh...@apache.org.
f


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/08e12b0c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/08e12b0c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/08e12b0c

Branch: refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1
Commit: 08e12b0cb0457a06e0888f07496dbd84e7b5fb0c
Parents: e6ca03c
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Jul 14 12:47:51 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Jul 14 12:47:51 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt     | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/08e12b0c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
index cae786c..e51483a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/CMakeLists.txt
@@ -19,6 +19,7 @@
 project (libhdfspp)
 
 find_package(Doxygen)
+find_package(OpenSSL REQUIRED)
 find_package(Protobuf REQUIRED)
 find_package(Threads)
 


[2/5] hadoop git commit: HDFS-8764. Generate Hadoop RPC stubs from protobuf definitions. Contributed by Haohui Mai.

Posted by wh...@apache.org.
HDFS-8764. Generate Hadoop RPC stubs from protobuf definitions. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f34bda72
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f34bda72
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f34bda72

Branch: refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1
Commit: f34bda720184cc2a7e3ac48916c693519387539b
Parents: 6416b38
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Jul 13 16:53:13 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Jul 14 12:47:00 2015 -0700

----------------------------------------------------------------------
 .../native/libhdfspp/lib/proto/CMakeLists.txt   | 46 +++++++++-
 .../native/libhdfspp/lib/proto/cpp_helpers.h    | 82 +++++++++++++++++
 .../libhdfspp/lib/proto/protoc_gen_hrpc.cc      | 94 ++++++++++++++++++++
 3 files changed, 221 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f34bda72/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
index 156a7f4..3f703b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/CMakeLists.txt
@@ -18,4 +18,48 @@ protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS
   ${COMMON_PROTO_DIR}/Security.proto
 )
 
-add_library(proto ${PROTO_SRCS} ${PROTO_HDRS})
+add_executable(protoc-gen-hrpc protoc_gen_hrpc.cc)
+target_link_libraries(protoc-gen-hrpc ${PROTOBUF_PROTOC_LIBRARY} ${PROTOBUF_LIBRARY})
+
+function(GEN_HRPC SRCS)
+  if(NOT ARGN)
+    message(SEND_ERROR "Error: GEN_HRPC() called without any proto files")
+    return()
+  endif()
+
+  if(DEFINED PROTOBUF_IMPORT_DIRS)
+    foreach(DIR ${PROTOBUF_IMPORT_DIRS})
+      get_filename_component(ABS_PATH ${DIR} ABSOLUTE)
+      list(FIND _protobuf_include_path ${ABS_PATH} _contains_already)
+      if(${_contains_already} EQUAL -1)
+          list(APPEND _protobuf_include_path -I ${ABS_PATH})
+      endif()
+    endforeach()
+  endif()
+
+  set(${SRCS})
+
+  foreach(FIL ${ARGN})
+    get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
+    get_filename_component(FIL_WE ${FIL} NAME_WE)
+
+    list(APPEND ${SRCS} "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.hrpc.inl")
+
+    add_custom_command(
+      OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${FIL_WE}.hrpc.inl"
+      COMMAND ${PROTOBUF_PROTOC_EXECUTABLE}
+      ARGS --plugin=protoc-gen-hrpc=${CMAKE_CURRENT_BINARY_DIR}/protoc-gen-hrpc --hrpc_out=${CMAKE_CURRENT_BINARY_DIR} ${_protobuf_include_path} ${ABS_FIL}
+      DEPENDS ${ABS_FIL} ${PROTOBUF_PROTOC_EXECUTABLE} protoc-gen-hrpc
+      COMMENT "Running HRPC protocol buffer compiler on ${FIL}"
+      VERBATIM )
+  endforeach()
+
+  set_source_files_properties(${${SRCS}} PROPERTIES GENERATED TRUE)
+  set(${SRCS} ${${SRCS}} PARENT_SCOPE)
+endfunction()
+
+gen_hrpc(HRPC_SRCS
+  ${CLIENT_PROTO_DIR}/ClientNamenodeProtocol.proto
+)
+
+add_library(proto ${PROTO_SRCS} ${PROTO_HDRS} ${HRPC_SRCS})

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f34bda72/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/cpp_helpers.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/cpp_helpers.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/cpp_helpers.h
new file mode 100644
index 0000000..6f380ad
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/cpp_helpers.h
@@ -0,0 +1,82 @@
+// Protocol Buffers - Google's data interchange format
+// Copyright 2008 Google Inc.  All rights reserved.
+// https://developers.google.com/protocol-buffers/
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//     * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+// Author: kenton@google.com (Kenton Varda)
+//  Based on original Protocol Buffers design by
+//  Sanjay Ghemawat, Jeff Dean, and others.
+
+#ifndef LIBHDFSPP_PROTO_CPP_HELPERS_H_
+#define LIBHDFSPP_PROTO_CPP_HELPERS_H_
+
+#include <string>
+
+/**
+ * The functions in this file are derived from the original implementation of
+ *the protobuf library from Google.
+ **/
+
+static inline std::string StripProto(const std::string &str) {
+  static const std::string kExtension = ".proto";
+  if (str.size() >= kExtension.size() &&
+      str.compare(str.size() - kExtension.size(), kExtension.size(),
+                  kExtension) == 0) {
+    return str.substr(0, str.size() - kExtension.size());
+  } else {
+    return str;
+  }
+}
+
+static inline std::string ToCamelCase(const std::string &input) {
+  bool cap_next_letter = true;
+  std::string result;
+  // Note:  I distrust ctype.h due to locales.
+  for (size_t i = 0; i < input.size(); i++) {
+    if ('a' <= input[i] && input[i] <= 'z') {
+      if (cap_next_letter) {
+        result += input[i] + ('A' - 'a');
+      } else {
+        result += input[i];
+      }
+      cap_next_letter = false;
+    } else if ('A' <= input[i] && input[i] <= 'Z') {
+      // Capital letters are left as-is.
+      result += input[i];
+      cap_next_letter = false;
+    } else if ('0' <= input[i] && input[i] <= '9') {
+      result += input[i];
+      cap_next_letter = true;
+    } else {
+      cap_next_letter = true;
+    }
+  }
+  return result;
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f34bda72/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc
new file mode 100644
index 0000000..f384e36
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/proto/protoc_gen_hrpc.cc
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "cpp_helpers.h"
+
+#include <google/protobuf/compiler/code_generator.h>
+#include <google/protobuf/compiler/plugin.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/io/printer.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+#include <google/protobuf/stubs/common.h>
+
+using ::google::protobuf::FileDescriptor;
+using ::google::protobuf::MethodDescriptor;
+using ::google::protobuf::ServiceDescriptor;
+using ::google::protobuf::compiler::CodeGenerator;
+using ::google::protobuf::compiler::GeneratorContext;
+using ::google::protobuf::io::Emiter;
+using ::google::protobuf::io::ZeroCopyOutputStream;
+
+class StubGenerator : public CodeGenerator {
+public:
+  virtual bool Generate(const FileDescriptor *file, const std::string &,
+                        GeneratorContext *ctx,
+                        std::string *error) const override;
+
+private:
+  void EmitService(const ServiceDescriptor *service, Emiter *out) const;
+  void EmitMethod(const MethodDescriptor *method, Emiter *out) const;
+};
+
+bool StubGenerator::Generate(const FileDescriptor *file, const std::string &,
+                             GeneratorContext *ctx, std::string *) const {
+  namespace pb = ::google::protobuf;
+  std::unique_ptr<ZeroCopyOutputStream> os(
+      ctx->Open(StripProto(file->name()) + ".hrpc.inl"));
+  Emiter out(os.get(), '$');
+  for (int i = 0; i < file->service_count(); ++i) {
+    const ServiceDescriptor *service = file->service(i);
+    EmitService(service, &out);
+  }
+  return true;
+}
+
+void StubGenerator::EmitService(const ServiceDescriptor *service,
+                                 Emiter *out) const {
+  out->Emit("\n// GENERATED AUTOMATICALLY. DO NOT MODIFY.\n"
+             "class $service$ {\n"
+             "private:\n"
+             "  ::hdfs::RpcEngine *const engine_;\n"
+             "public:\n"
+             "  typedef std::function<void(const ::hdfs::Status &)> Callback;\n"
+             "  typedef ::google::protobuf::MessageLite Message;\n"
+             "  inline $service$(::hdfs::RpcEngine *engine)\n"
+             "    : engine_(engine) {}\n",
+             "service", service->name());
+  for (int i = 0; i < service->method_count(); ++i) {
+    const MethodDescriptor *method = service->method(i);
+    EmitMethod(method, out);
+  }
+  out->Emit("};\n");
+}
+
+void StubGenerator::EmitMethod(const MethodDescriptor *method,
+                                Emiter *out) const {
+  out->Emit("\n  inline void $camel_method$(const Message *req, "
+             "const std::shared_ptr<Message> &resp, "
+             "Callback &&handler) {\n"
+             "    engine_->AsyncRpc(\"$method$\", req, resp, std::move(handler));\n"
+             "  }\n",
+             "camel_method", ToCamelCase(method->name()),
+             "method", method->name()
+             );
+}
+
+int main(int argc, char *argv[]) {
+  StubGenerator generator;
+  return google::protobuf::compiler::PluginMain(argc, argv, &generator);
+}


[4/5] hadoop git commit: fs

Posted by wh...@apache.org.
fs


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6ca03c0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6ca03c0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6ca03c0

Branch: refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1
Commit: e6ca03c0357b4b9061349e9903a2064ee0c51715
Parents: f34bda7
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Jul 14 12:43:09 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Jul 14 12:47:00 2015 -0700

----------------------------------------------------------------------
 .../native/libhdfspp/include/libhdfspp/hdfs.h   |  50 ++++++
 .../main/native/libhdfspp/lib/common/hdfs.cc    |  29 ++++
 .../main/native/libhdfspp/lib/common/wrapper.h  |  42 +++++
 .../main/native/libhdfspp/lib/fs/CMakeLists.txt |   4 +
 .../main/native/libhdfspp/lib/fs/filesystem.cc  |  95 +++++++++++
 .../main/native/libhdfspp/lib/fs/filesystem.h   |  61 +++++++
 .../main/native/libhdfspp/lib/fs/inputstream.cc |  53 ++++++
 .../native/libhdfspp/lib/fs/inputstream_impl.h  | 160 +++++++++++++++++++
 .../native/libhdfspp/lib/fs/inputstream_test.cc |  82 ++++++++++
 .../native/libhdfspp/lib/fs/namenode_protocol.h |  42 +++++
 10 files changed, 618 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
new file mode 100644
index 0000000..d12f20e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_HDFS_H_
+#define LIBHDFSPP_HDFS_H_
+
+#include "libhdfspp/status.h"
+
+namespace hdfs {
+
+class IoService {
+ public:
+  static IoService *New();
+  virtual void Run() = 0;
+  virtual void Stop() = 0;
+  virtual ~IoService();
+};
+
+
+class InputStream {
+ public:
+  virtual Status PositionRead(void *buf, size_t nbyte, size_t offset, size_t *read_bytes) = 0;
+  virtual ~InputStream();
+};
+
+class FileSystem {
+ public:
+  static Status New(IoService *io_service, const char *server,
+                    unsigned short port, FileSystem **fsptr);
+  virtual Status Open(const char *path, InputStream **isptr) = 0;
+  virtual ~FileSystem();
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc
new file mode 100644
index 0000000..e7a5d6c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "wrapper.h"
+
+namespace hdfs {
+
+IoService::~IoService() {}
+
+IoService *IoService::New() {
+  return new IoServiceImpl();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h
new file mode 100644
index 0000000..39d26cc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMMON_WRAPPER_H_
+#define COMMON_WRAPPER_H_
+
+#include "libhdfspp/hdfs.h"
+
+#include <asio/io_service.hpp>
+
+namespace hdfs {
+
+class IoServiceImpl : public IoService {
+ public:
+  virtual void Run() override {
+    asio::io_service::work work(io_service_);
+    io_service_.run();
+  }
+  virtual void Stop() override { io_service_.stop(); }
+  ::asio::io_service &io_service() { return io_service_; }
+ private:
+  ::asio::io_service io_service_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
new file mode 100644
index 0000000..bd649ff
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
@@ -0,0 +1,4 @@
+add_library(fs filesystem.cc inputstream.cc)
+add_dependencies(fs proto)
+add_executable(inputstream_test inputstream_test.cc)
+target_link_libraries(inputstream_test common fs rpc reader proto ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
new file mode 100644
index 0000000..ab322c6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filesystem.h"
+
+#include "common/util.h"
+
+#include <asio/ip/tcp.hpp>
+
+#include <limits>
+
+namespace hdfs {
+
+static const char kNamenodeProtocol[] = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
+static const int kNamenodeProtocolVersion = 1;
+
+using ::asio::ip::tcp;
+
+FileSystem::~FileSystem()
+{}
+
+Status FileSystem::New(IoService *io_service, const char *server,
+                       unsigned short port, FileSystem **fsptr) {
+  std::unique_ptr<FileSystemImpl> impl(new FileSystemImpl(io_service));
+  Status stat = impl->Connect(server, port);
+  if (stat.ok()) {
+    *fsptr = impl.release();
+  }
+  return stat;
+}
+
+FileSystemImpl::FileSystemImpl(IoService *io_service)
+    : io_service_(static_cast<IoServiceImpl*>(io_service))
+    , engine_(&io_service_->io_service(), RpcEngine::GetRandomClientName(),
+              kNamenodeProtocol, kNamenodeProtocolVersion)
+    , namenode_(&engine_)
+{}
+
+Status FileSystemImpl::Connect(const char *server, unsigned short port) {
+  asio::error_code ec;
+  tcp::resolver resolver(io_service_->io_service());
+  tcp::resolver::query query(tcp::v4(), server, std::to_string(port));
+  tcp::resolver::iterator iterator = resolver.resolve(query, ec);
+
+  if (ec) {
+    return ToStatus(ec);
+  }
+
+  std::vector<tcp::endpoint> servers(iterator, tcp::resolver::iterator());
+  Status stat = engine_.Connect(servers);
+  if (!stat.ok()) {
+    return stat;
+  }
+  engine_.Start();
+  return stat;
+}
+
+Status FileSystemImpl::Open(const char *path, InputStream **isptr) {
+  using ::hadoop::hdfs::GetBlockLocationsRequestProto;
+  using ::hadoop::hdfs::GetBlockLocationsResponseProto;
+
+  GetBlockLocationsRequestProto req;
+  auto resp = std::make_shared<GetBlockLocationsResponseProto>();
+  req.set_src(path);
+  req.set_offset(0);
+  req.set_length(std::numeric_limits<long long>::max());
+  auto stat_p = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(stat_p->get_future());
+  namenode_.GetBlockLocations(&req, resp, 
+                              [stat_p](const Status &status) { stat_p->set_value(status); });
+  Status stat = future.get();
+  if (!stat.ok()) {
+    return stat;
+  }
+
+  *isptr = new InputStreamImpl(this, &resp->locations());
+  return Status::OK();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h
new file mode 100644
index 0000000..30536eb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FS_FILESYSTEM_H_
+#define FS_FILESYSTEM_H_
+
+#include "common/wrapper.h"
+#include "libhdfspp/hdfs.h"
+#include "rpc/rpc_engine.h"
+#include "ClientNamenodeProtocol.pb.h"
+#include "ClientNamenodeProtocol.hrpc.inl"
+
+namespace hdfs {
+
+class FileSystemImpl : public FileSystem {
+ public:
+  FileSystemImpl(IoService *io_service);
+  Status Connect(const char *server, unsigned short port);
+  virtual Status Open(const char *path, InputStream **isptr) override;
+  RpcEngine &rpc_engine() { return engine_; }
+ private:
+  IoServiceImpl *io_service_;
+  RpcEngine engine_;
+  ClientNamenodeProtocol namenode_;
+};
+
+class InputStreamImpl : public InputStream {
+ public:
+  InputStreamImpl(FileSystemImpl *fs, const ::hadoop::hdfs::LocatedBlocksProto *blocks);
+  virtual Status PositionRead(void *buf, size_t nbyte, size_t offset, size_t *read_bytes) override;
+  template<class MutableBufferSequence, class Handler>
+  void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers,
+                      const Handler &handler);
+ private:
+  FileSystemImpl *fs_;
+  unsigned long long file_length_;
+  std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_;
+  struct HandshakeContinuation;
+  template<class MutableBufferSequence>
+  struct ReadBlockContinuation;
+};
+
+}
+
+#include "inputstream_impl.h"
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
new file mode 100644
index 0000000..a41c684
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filesystem.h"
+
+namespace hdfs {
+
+using ::hadoop::hdfs::LocatedBlocksProto;
+
+InputStream::~InputStream()
+{}
+
+InputStreamImpl::InputStreamImpl(FileSystemImpl *fs, const LocatedBlocksProto *blocks)
+    : fs_(fs)
+    , file_length_(blocks->filelength())
+{
+  for (const auto &block : blocks->blocks()) {
+    blocks_.push_back(block);
+  }
+
+  if (blocks->has_lastblock() && blocks->lastblock().b().numbytes()) {
+    blocks_.push_back(blocks->lastblock());
+  }
+}
+
+Status InputStreamImpl::PositionRead(void *buf, size_t nbyte, size_t offset, size_t *read_bytes) {
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(stat->get_future());
+  auto handler = [stat,read_bytes](const Status &status, size_t transferred) {
+    *read_bytes = transferred;
+    stat->set_value(status);
+  };
+
+  AsyncPreadSome(offset, asio::buffer(buf, nbyte), handler);
+  return future.get();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
new file mode 100644
index 0000000..88e1912
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FS_INPUTSTREAM_IMPL_H_
+#define FS_INPUTSTREAM_IMPL_H_
+
+#include "reader/block_reader.h"
+
+#include "common/continuation/asio.h"
+#include "common/continuation/protobuf.h"
+
+#include <functional>
+#include <future>
+
+namespace hdfs {
+
+struct InputStreamImpl::HandshakeContinuation : continuation::Continuation {
+  typedef RemoteBlockReader<::asio::ip::tcp::socket> Reader;
+  HandshakeContinuation(Reader *reader, const std::string &client_name,
+               const hadoop::common::TokenProto *token,
+               const hadoop::hdfs::ExtendedBlockProto *block,
+               uint64_t length, uint64_t offset)
+      : reader_(reader)
+      , client_name_(client_name)
+      , length_(length)
+      , offset_(offset)
+  {
+    if (token) {
+      token_.reset(new hadoop::common::TokenProto());
+      token_->CheckTypeAndMergeFrom(*token);
+    }
+    block_.CheckTypeAndMergeFrom(*block);
+  }
+
+  virtual void Run(const Next& next) override {
+    reader_->async_connect(client_name_, token_.get(), &block_, length_, offset_, next);
+  }
+
+ private:
+  Reader *reader_;
+  const std::string client_name_;
+  std::unique_ptr<hadoop::common::TokenProto> token_;
+  hadoop::hdfs::ExtendedBlockProto block_;
+  uint64_t length_;
+  uint64_t offset_;
+};
+
+template<class MutableBufferSequence>
+struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation {
+  typedef RemoteBlockReader<::asio::ip::tcp::socket> Reader;
+  ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer,
+                 size_t *transferred)
+      : reader_(reader)
+      , buffer_(buffer)
+      , buffer_size_(asio::buffer_size(buffer))
+      , transferred_(transferred)
+  {}
+
+  virtual void Run(const Next& next) override {
+    *transferred_ = 0;
+    next_ = next;
+    OnReadData(Status::OK(), 0);
+  }
+
+ private:
+  Reader *reader_;
+  MutableBufferSequence buffer_;
+  const size_t buffer_size_;
+  size_t *transferred_;
+  std::function<void(const Status &)> next_;
+
+  void OnReadData(const Status &status, size_t transferred) {
+    using std::placeholders::_1;
+    using std::placeholders::_2;
+    *transferred_ += transferred;
+    if (!status.ok()) {
+      next_(status);
+    } else if (*transferred_ >= buffer_size_) {
+      next_(status);
+    } else {
+      reader_->async_read_some(
+          asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
+          std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
+    }
+  }
+};
+
+
+template<class MutableBufferSequence, class Handler>
+void InputStreamImpl::AsyncPreadSome(
+    size_t offset, const MutableBufferSequence &buffers,
+    const Handler &handler) {
+  using ::hadoop::hdfs::LocatedBlockProto;
+  namespace ip = ::asio::ip;
+  using ::asio::ip::tcp;
+
+  auto it = std::find_if(
+      blocks_.begin(), blocks_.end(),
+      [offset](const LocatedBlockProto &p) {
+        return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
+      });
+
+  if (it == blocks_.end()) {
+    handler(Status::InvalidArgument("Cannot find corresponding blocks"), 0);
+    return;
+  } else if (!it->locs_size()) {
+    handler(Status::ResourceUnavailable("No datanodes available"), 0);
+    return;
+  }
+
+  uint64_t offset_within_block = offset - it->offset();
+  uint64_t size_within_block =
+      std::min<uint64_t>(it->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
+
+  struct State {
+    std::unique_ptr<tcp::socket> conn;
+    std::shared_ptr<RemoteBlockReader<tcp::socket> > reader;
+    LocatedBlockProto block;
+    std::vector<tcp::endpoint> endpoints;
+    size_t transferred;
+  };
+
+  auto m = continuation::Pipeline<State>::Create();
+  auto &s = m->state();
+  s.conn.reset(new tcp::socket(fs_->rpc_engine().io_service()));
+  s.reader = std::make_shared<RemoteBlockReader<tcp::socket> >(BlockReaderOptions(), s.conn.get());
+  s.block = *it;
+  for (auto &loc : it->locs()) {
+    auto datanode = loc.id();
+    s.endpoints.push_back(tcp::endpoint(ip::address::from_string(datanode.ipaddr()), datanode.xferport()));
+  }
+
+  m->Push(continuation::Connect(s.conn.get(), s.endpoints.begin(), s.endpoints.end()))
+      .Push(new HandshakeContinuation(s.reader.get(), fs_->rpc_engine().client_name(), nullptr,
+                                   &s.block.b(), size_within_block, offset_within_block))
+      .Push(new ReadBlockContinuation<::asio::mutable_buffers_1>(
+          s.reader.get(), asio::buffer(buffers, size_within_block), &s.transferred));
+
+  m->Run([handler](const Status &status, const State &state) {
+      handler(status, state.transferred);
+    });
+}
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc
new file mode 100644
index 0000000..dceac86
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "libhdfspp/hdfs.h"
+
+#include <iostream>
+#include <string>
+#include <thread>
+
+using namespace hdfs;
+
+class Executor {
+ public:
+  Executor()
+      : io_service_(IoService::New())
+      , thread_(std::bind(&IoService::Run, io_service_.get()))
+  {}
+
+  IoService *io_service() { return io_service_.get(); }
+
+  ~Executor() {
+    io_service_->Stop();
+    thread_.join();
+  }
+
+  std::unique_ptr<IoService> io_service_;
+  std::thread thread_;
+};
+
+int main(int argc, char *argv[]) {
+  if (argc != 4) {
+    std::cerr
+        << "Print files stored in a HDFS cluster.\n"
+        << "Usage: " << argv[0] << " "
+        << "<nnhost> <nnport> <file>\n";
+    return 1;
+  }
+
+  Executor executor;
+
+  FileSystem *fsptr;
+  Status stat = FileSystem::New(executor.io_service(), argv[1], std::stoi(argv[2]), &fsptr);
+  if (!stat.ok()) {
+    std::cerr << "Cannot create the filesystem: " << stat.ToString() << std::endl;
+    return 1;
+  }
+
+  std::unique_ptr<FileSystem> fs(fsptr);
+
+  InputStream *isptr;
+  stat = fs->Open(argv[3], &isptr);
+  if (!stat.ok()) {
+    std::cerr << "Cannot open the file: " << stat.ToString() << std::endl;
+    return 1;
+  }
+
+  std::unique_ptr<InputStream> is(isptr);
+
+  char buf[8192] = {0,};
+  size_t read_bytes = 0;
+  stat = is->PositionRead(buf, sizeof(buf), 0, &read_bytes);
+  if (!stat.ok()) {
+    std::cerr << "Read failures: " << stat.ToString() << std::endl;
+  }
+  std::cerr << "Read bytes:" << read_bytes << std::endl << buf << std::endl;
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h
new file mode 100644
index 0000000..80aa237
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FS_NAMENODE_PROTOCOL_H_
+#define FS_NAMENODE_PROTOCOL_H_
+
+#include "ClientNamenodeProtocol.pb.h"
+#include "rpc/rpc_engine.h"
+
+namespace hdfs {
+
+class ClientNamenodeProtocol {
+ public:
+  ClientNamenodeProtocol(RpcEngine *engine)
+      : engine_(engine)
+  {}
+
+  Status GetBlockLocations(const ::hadoop::hdfs::GetBlockLocationsRequestProto *request,
+                           std::shared_ptr<::hadoop::hdfs::GetBlockLocationsResponseProto> response) {
+    return engine_->Rpc("getBlockLocations", request, response);
+  }
+ private:
+  RpcEngine *engine_;
+};
+
+};
+
+#endif