You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/07/16 02:02:30 UTC

hadoop git commit: HDFS-8759. Implement remote block reader in libhdfspp. Contributed by Haohui Mai.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 8b02d962b -> 6416b38ca


HDFS-8759. Implement remote block reader in libhdfspp. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6416b38c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6416b38c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6416b38c

Branch: refs/heads/HDFS-8707
Commit: 6416b38cade36a1c251ef96d5a0f9b80b785e58b
Parents: 8b02d96
Author: Haohui Mai <wh...@apache.org>
Authored: Fri Jul 10 16:50:45 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Jul 14 12:47:00 2015 -0700

----------------------------------------------------------------------
 .../native/libhdfspp/include/libhdfspp/status.h |   4 +-
 .../main/native/libhdfspp/lib/CMakeLists.txt    |  20 ++
 .../native/libhdfspp/lib/common/CMakeLists.txt  |   1 +
 .../main/native/libhdfspp/lib/common/base64.cc  |  71 ++++
 .../main/native/libhdfspp/lib/common/status.cc  |  66 ++++
 .../native/libhdfspp/lib/reader/CMakeLists.txt  |  20 ++
 .../native/libhdfspp/lib/reader/block_reader.h  | 114 +++++++
 .../native/libhdfspp/lib/reader/datatransfer.h  |  35 ++
 .../libhdfspp/lib/reader/remote_block_reader.cc |  46 +++
 .../lib/reader/remote_block_reader_impl.h       | 342 +++++++++++++++++++
 10 files changed, 718 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h
index 9436c8b..d2ef005 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/status.h
@@ -45,6 +45,8 @@ class Status {
   { return Status(kUnimplemented, ""); }
   static Status Exception(const char *expception_class_name, const char *error_message)
   { return Status(kException, expception_class_name, error_message); }
+  static Status Error(const char *error_message)
+  { return Exception("Exception", error_message); }
 
   // Returns true iff the status indicates success.
   bool ok() const { return (state_ == NULL); }
@@ -71,7 +73,7 @@ class Status {
     kInvalidArgument = static_cast<unsigned>(std::errc::invalid_argument),
     kResourceUnavailable = static_cast<unsigned>(std::errc::resource_unavailable_try_again),
     kUnimplemented = static_cast<unsigned>(std::errc::function_not_supported),
-    kException = 256,
+    kException = 255,
   };
 
   explicit Status(int code, const char *msg1, const char *msg2);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
index 7458453..e77942b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/CMakeLists.txt
@@ -1,2 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+add_subdirectory(common)
+add_subdirectory(reader)
 add_subdirectory(rpc)
 add_subdirectory(proto)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
new file mode 100644
index 0000000..570d0ac
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt
@@ -0,0 +1 @@
+add_library(common base64.cc status.cc)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/base64.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/base64.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/base64.cc
new file mode 100644
index 0000000..f98fec5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/base64.cc
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util.h"
+
+#include <array>
+#include <functional>
+#include <algorithm>
+
+namespace hdfs {
+
+std::string Base64Encode(const std::string &src) {
+  static const char kDictionary[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+                                    "abcdefghijklmnopqrstuvwxyz"
+                                    "0123456789+/";
+
+  int encoded_size = (src.size() + 2) / 3 * 4;
+  std::string dst;
+  dst.reserve(encoded_size);
+
+  size_t i = 0;
+  while (i + 3 < src.length()) {
+    const char *s = &src[i];
+    const int r[4] = {s[0] >> 2, ((s[0] << 4) | (s[1] >> 4)) & 0x3f,
+                      ((s[1] << 2) | (s[2] >> 6)) & 0x3f, s[2] & 0x3f};
+
+    std::transform(r, r + sizeof(r) / sizeof(int), std::back_inserter(dst),
+                   [&r](unsigned char v) { return kDictionary[v]; });
+    i += 3;
+  }
+
+  size_t remained = src.length() - i;
+  const char *s = &src[i];
+
+  switch (remained) {
+  case 0:
+    break;
+  case 1: {
+    char padding[4] = {kDictionary[s[0] >> 2], kDictionary[(s[0] << 4) & 0x3f],
+                       '=', '='};
+    dst.append(padding, sizeof(padding));
+  } break;
+  case 2: {
+    char padding[4] = {kDictionary[src[i] >> 2],
+                       kDictionary[((s[0] << 4) | (s[1] >> 4)) & 0x3f],
+                       kDictionary[(s[1] << 2) & 0x3f], '='};
+    dst.append(padding, sizeof(padding));
+  } break;
+  default:
+    assert("Unreachable");
+    break;
+  }
+  return dst;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/status.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/status.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/status.cc
new file mode 100644
index 0000000..66cfa1c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/status.cc
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "libhdfspp/status.h"
+
+#include <cassert>
+#include <cstring>
+
+namespace hdfs {
+
+Status::Status(int code, const char *msg1)
+    : state_(ConstructState(code, msg1, nullptr)) {}
+
+Status::Status(int code, const char *msg1, const char *msg2)
+    : state_(ConstructState(code, msg1, msg2)) {}
+
+const char *Status::ConstructState(int code, const char *msg1,
+                                   const char *msg2) {
+  assert(code != kOk);
+  const uint32_t len1 = strlen(msg1);
+  const uint32_t len2 = msg2 ? strlen(msg2) : 0;
+  const uint32_t size = len1 + (len2 ? (2 + len2) : 0);
+  char *result = new char[size + 8 + 2];
+  *reinterpret_cast<uint32_t *>(result) = size;
+  *reinterpret_cast<uint32_t *>(result + 4) = code;
+  memcpy(result + 8, msg1, len1);
+  if (len2) {
+    result[8 + len1] = ':';
+    result[9 + len1] = ' ';
+    memcpy(result + 10 + len1, msg2, len2);
+  }
+  return result;
+}
+
+std::string Status::ToString() const {
+  if (!state_) {
+    return "OK";
+  } else {
+    uint32_t length = *reinterpret_cast<const uint32_t *>(state_);
+    return std::string(state_ + 8, length);
+  }
+}
+
+const char *Status::CopyState(const char *state) {
+  uint32_t size;
+  memcpy(&size, state, sizeof(size));
+  char *result = new char[size + 8];
+  memcpy(result, state, size + 8);
+  return result;
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
new file mode 100644
index 0000000..65ec108
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+add_library(reader remote_block_reader.cc)
+add_dependencies(reader proto)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/block_reader.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/block_reader.h
new file mode 100644
index 0000000..81636b9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/block_reader.h
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef BLOCK_READER_H_
+#define BLOCK_READER_H_
+
+#include "libhdfspp/status.h"
+#include "datatransfer.pb.h"
+
+#include <memory>
+
+namespace hdfs {
+
+struct CacheStrategy {
+  bool drop_behind_specified;
+  bool drop_behind;
+  bool read_ahead_specified;
+  unsigned long long read_ahead;
+  CacheStrategy()
+      : drop_behind_specified(false), drop_behind(false),
+        read_ahead_specified(false), read_ahead(false) {}
+};
+
+enum DropBehindStrategy {
+  kUnspecified = 0,
+  kEnableDropBehind = 1,
+  kDisableDropBehind = 2,
+};
+
+enum EncryptionScheme {
+  kNone = 0,
+  kAESCTRNoPadding = 1,
+};
+
+struct BlockReaderOptions {
+  bool verify_checksum;
+  CacheStrategy cache_strategy;
+  EncryptionScheme encryption_scheme;
+
+  BlockReaderOptions()
+      : verify_checksum(true), encryption_scheme(EncryptionScheme::kNone) {}
+};
+
+template <class Stream>
+class RemoteBlockReader
+    : public std::enable_shared_from_this<RemoteBlockReader<Stream>> {
+public:
+  explicit RemoteBlockReader(const BlockReaderOptions &options, Stream *stream)
+      : stream_(stream), state_(kOpen), options_(options),
+        chunk_padding_bytes_(0) {}
+
+  template <class MutableBufferSequence, class ReadHandler>
+  void async_read_some(const MutableBufferSequence &buffers,
+                       const ReadHandler &handler);
+
+  template <class MutableBufferSequence>
+  size_t read_some(const MutableBufferSequence &buffers, Status *status);
+
+  Status connect(const std::string &client_name,
+                 const hadoop::common::TokenProto *token,
+                 const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+                 uint64_t offset);
+
+  template <class ConnectHandler>
+  void async_connect(const std::string &client_name,
+                     const hadoop::common::TokenProto *token,
+                     const hadoop::hdfs::ExtendedBlockProto *block,
+                     uint64_t length, uint64_t offset,
+                     const ConnectHandler &handler);
+
+private:
+  struct ReadPacketHeader;
+  struct ReadChecksum;
+  struct ReadPadding;
+  template <class MutableBufferSequence> struct ReadData;
+  struct AckRead;
+  enum State {
+    kOpen,
+    kReadPacketHeader,
+    kReadChecksum,
+    kReadPadding,
+    kReadData,
+    kFinished,
+  };
+
+  Stream *stream_;
+  hadoop::hdfs::PacketHeaderProto header_;
+  State state_;
+  BlockReaderOptions options_;
+  size_t packet_len_;
+  int packet_data_read_bytes_;
+  int chunk_padding_bytes_;
+  long long bytes_to_read_;
+  std::vector<char> checksum_;
+};
+}
+
+#include "remote_block_reader_impl.h"
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
new file mode 100644
index 0000000..d22f5e8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/datatransfer.h
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef COMMON_DATA_TRANSFER_H_
+#define COMMON_DATA_TRANSFER_H_
+
+namespace hdfs {
+
+enum {
+  kDataTransferVersion = 28,
+  kDataTransferSasl = 0xdeadbeef,
+};
+
+enum Operation {
+  kWriteBlock = 80,
+  kReadBlock  = 81,
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
new file mode 100644
index 0000000..68bc4ee
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "block_reader.h"
+
+namespace hdfs {
+
+hadoop::hdfs::OpReadBlockProto
+ReadBlockProto(const std::string &client_name, bool verify_checksum,
+               const hadoop::common::TokenProto *token,
+               const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+               uint64_t offset) {
+  using namespace hadoop::hdfs;
+  using namespace hadoop::common;
+  BaseHeaderProto *base_h = new BaseHeaderProto();
+  base_h->set_allocated_block(new ExtendedBlockProto(*block));
+  if (token) {
+    base_h->set_allocated_token(new TokenProto(*token));
+  }
+  ClientOperationHeaderProto *h = new ClientOperationHeaderProto();
+  h->set_clientname(client_name);
+  h->set_allocated_baseheader(base_h);
+
+  OpReadBlockProto p;
+  p.set_allocated_header(h);
+  p.set_offset(offset);
+  p.set_len(length);
+  p.set_sendchecksums(verify_checksum);
+  // TODO: p.set_allocated_cachingstrategy();
+  return p;
+}
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6416b38c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
new file mode 100644
index 0000000..68ea6ad
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_
+#define LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_
+
+#include "datatransfer.h"
+#include "common/continuation/asio.h"
+#include "common/continuation/protobuf.h"
+
+#include <asio/buffers_iterator.hpp>
+#include <asio/streambuf.hpp>
+#include <asio/write.hpp>
+
+#include <arpa/inet.h>
+
+#include <future>
+
+namespace hdfs {
+
+hadoop::hdfs::OpReadBlockProto
+ReadBlockProto(const std::string &client_name, bool verify_checksum,
+               const hadoop::common::TokenProto *token,
+               const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+               uint64_t offset);
+
+template <class Stream>
+template <class ConnectHandler>
+void RemoteBlockReader<Stream>::async_connect(
+    const std::string &client_name, const hadoop::common::TokenProto *token,
+    const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+    uint64_t offset, const ConnectHandler &handler) {
+  // The total number of bytes that we need to transfer from the DN is
+  // the amount that the user wants (bytesToRead), plus the padding at
+  // the beginning in order to chunk-align. Note that the DN may elect
+  // to send more than this amount if the read starts/ends mid-chunk.
+  bytes_to_read_ = length;
+
+  struct State {
+    std::string header;
+    hadoop::hdfs::OpReadBlockProto request;
+    hadoop::hdfs::BlockOpResponseProto response;
+  };
+
+  auto m = continuation::Pipeline<State>::Create();
+  State *s = &m->state();
+
+  s->header.insert(s->header.begin(),
+                   {0, kDataTransferVersion, Operation::kReadBlock});
+  s->request = std::move(ReadBlockProto(client_name, options_.verify_checksum,
+                                        token, block, length, offset));
+
+  auto read_pb_message =
+      new continuation::ReadDelimitedPBMessageContinuation<Stream, 16384>(
+          stream_, &s->response);
+
+  m->Push(continuation::Write(stream_, asio::buffer(s->header)))
+      .Push(continuation::WriteDelimitedPBMessage(stream_, &s->request))
+      .Push(read_pb_message);
+
+  m->Run([this, handler, offset](const Status &status, const State &s) {
+    Status stat = status;
+    if (stat.ok()) {
+      const auto &resp = s.response;
+      if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) {
+        if (resp.has_readopchecksuminfo()) {
+          const auto &checksum_info = resp.readopchecksuminfo();
+          chunk_padding_bytes_ = offset - checksum_info.chunkoffset();
+        }
+        state_ = kReadPacketHeader;
+      } else {
+        stat = Status::Error(s.response.message().c_str());
+      }
+    }
+    handler(stat);
+  });
+}
+
+template <class Stream>
+struct RemoteBlockReader<Stream>::ReadPacketHeader
+    : continuation::Continuation {
+  ReadPacketHeader(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
+
+  virtual void Run(const Next &next) override {
+    parent_->packet_data_read_bytes_ = 0;
+    parent_->packet_len_ = 0;
+    auto handler = [next, this](const asio::error_code &ec, size_t) {
+      Status status;
+      if (ec) {
+        status = Status(ec.value(), ec.message().c_str());
+      } else {
+        parent_->packet_len_ = packet_length();
+        parent_->header_.Clear();
+        bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart],
+                                                 header_length());
+        assert(v && "Failed to parse the header");
+        parent_->state_ = kReadChecksum;
+      }
+      next(status);
+    };
+
+    asio::async_read(*parent_->stream_, asio::buffer(buf_),
+                     std::bind(&ReadPacketHeader::CompletionHandler, this,
+                               std::placeholders::_1, std::placeholders::_2),
+                     handler);
+  }
+
+private:
+  static const size_t kMaxHeaderSize = 512;
+  static const size_t kPayloadLenOffset = 0;
+  static const size_t kPayloadLenSize = sizeof(int);
+  static const size_t kHeaderLenOffset = 4;
+  static const size_t kHeaderLenSize = sizeof(short);
+  static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize;
+
+  RemoteBlockReader<Stream> *parent_;
+  std::array<char, kMaxHeaderSize> buf_;
+
+  size_t packet_length() const {
+    return ntohl(*reinterpret_cast<const unsigned *>(&buf_[kPayloadLenOffset]));
+  }
+
+  size_t header_length() const {
+    return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset]));
+  }
+
+  size_t CompletionHandler(const asio::error_code &ec, size_t transferred) {
+    if (ec) {
+      return 0;
+    } else if (transferred < kHeaderStart) {
+      return kHeaderStart - transferred;
+    } else {
+      return kHeaderStart + header_length() - transferred;
+    }
+  }
+};
+
+template <class Stream>
+struct RemoteBlockReader<Stream>::ReadChecksum : continuation::Continuation {
+  ReadChecksum(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
+
+  virtual void Run(const Next &next) override {
+    auto parent = parent_;
+    if (parent->state_ != kReadChecksum) {
+      next(Status::OK());
+      return;
+    }
+
+    auto handler = [parent, next](const asio::error_code &ec, size_t) {
+      Status status;
+      if (ec) {
+        status = Status(ec.value(), ec.message().c_str());
+      } else {
+        parent->state_ =
+            parent->chunk_padding_bytes_ ? kReadPadding : kReadData;
+      }
+      next(status);
+    };
+    parent->checksum_.resize(parent->packet_len_ - sizeof(int) -
+                             parent->header_.datalen());
+    asio::async_read(*parent->stream_, asio::buffer(parent->checksum_),
+                     handler);
+  }
+
+private:
+  RemoteBlockReader<Stream> *parent_;
+};
+
+template <class Stream>
+struct RemoteBlockReader<Stream>::ReadPadding : continuation::Continuation {
+  ReadPadding(RemoteBlockReader<Stream> *parent)
+      : parent_(parent), padding_(parent->chunk_padding_bytes_),
+        bytes_transferred_(std::make_shared<size_t>(0)),
+        read_data_(new ReadData<asio::mutable_buffers_1>(
+            parent, bytes_transferred_, asio::buffer(padding_))) {}
+
+  virtual void Run(const Next &next) override {
+    if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) {
+      next(Status::OK());
+      return;
+    }
+
+    auto h = [next, this](const Status &status) {
+      if (status.ok()) {
+        assert(reinterpret_cast<const int &>(*bytes_transferred_) ==
+               parent_->chunk_padding_bytes_);
+        parent_->chunk_padding_bytes_ = 0;
+        parent_->state_ = kReadData;
+      }
+      next(status);
+    };
+    read_data_->Run(h);
+  }
+
+private:
+  RemoteBlockReader<Stream> *parent_;
+  std::vector<char> padding_;
+  std::shared_ptr<size_t> bytes_transferred_;
+  std::shared_ptr<continuation::Continuation> read_data_;
+  ReadPadding(const ReadPadding &) = delete;
+  ReadPadding &operator=(const ReadPadding &) = delete;
+};
+
+template <class Stream>
+template <class MutableBufferSequence>
+struct RemoteBlockReader<Stream>::ReadData : continuation::Continuation {
+  ReadData(RemoteBlockReader<Stream> *parent,
+           std::shared_ptr<size_t> bytes_transferred,
+           const MutableBufferSequence &buf)
+      : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) {}
+
+  virtual void Run(const Next &next) override {
+    auto handler =
+        [next, this](const asio::error_code &ec, size_t transferred) {
+          Status status;
+          if (ec) {
+            status = Status(ec.value(), ec.message().c_str());
+          }
+          *bytes_transferred_ += transferred;
+          parent_->bytes_to_read_ -= transferred;
+          parent_->packet_data_read_bytes_ += transferred;
+          if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) {
+            parent_->state_ = kReadPacketHeader;
+          }
+          next(status);
+        };
+
+    auto data_len =
+        parent_->header_.datalen() - parent_->packet_data_read_bytes_;
+    async_read(*parent_->stream_, buf_, asio::transfer_exactly(data_len),
+               handler);
+  }
+
+private:
+  RemoteBlockReader<Stream> *parent_;
+  std::shared_ptr<size_t> bytes_transferred_;
+  MutableBufferSequence buf_;
+};
+
+template <class Stream>
+struct RemoteBlockReader<Stream>::AckRead : continuation::Continuation {
+  AckRead(RemoteBlockReader<Stream> *parent) : parent_(parent) {}
+
+  virtual void Run(const Next &next) override {
+    if (parent_->bytes_to_read_ > 0) {
+      next(Status::OK());
+      return;
+    }
+
+    auto m =
+        continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create();
+    m->state().set_status(parent_->options_.verify_checksum
+                              ? hadoop::hdfs::Status::CHECKSUM_OK
+                              : hadoop::hdfs::Status::SUCCESS);
+
+    m->Push(
+        continuation::WriteDelimitedPBMessage(parent_->stream_, &m->state()));
+
+    m->Run([this, next](const Status &status,
+                        const hadoop::hdfs::ClientReadStatusProto &) {
+      if (status.ok()) {
+        parent_->state_ = RemoteBlockReader<Stream>::kFinished;
+      }
+      next(status);
+    });
+  }
+
+private:
+  RemoteBlockReader<Stream> *parent_;
+};
+
+template <class Stream>
+template <class MutableBufferSequence, class ReadHandler>
+void RemoteBlockReader<Stream>::async_read_some(
+    const MutableBufferSequence &buffers, const ReadHandler &handler) {
+  assert(state_ != kOpen && "Not connected");
+
+  struct State {
+    std::shared_ptr<size_t> bytes_transferred;
+  };
+  auto m = continuation::Pipeline<State>::Create();
+  m->state().bytes_transferred = std::make_shared<size_t>(0);
+
+  m->Push(new ReadPacketHeader(this))
+      .Push(new ReadChecksum(this))
+      .Push(new ReadPadding(this))
+      .Push(new ReadData<MutableBufferSequence>(
+          this, m->state().bytes_transferred, buffers))
+      .Push(new AckRead(this));
+
+  auto self = this->shared_from_this();
+  m->Run([self, handler](const Status &status, const State &state) {
+    handler(status, *state.bytes_transferred);
+  });
+}
+
+template <class Stream>
+template <class MutableBufferSequence>
+size_t
+RemoteBlockReader<Stream>::read_some(const MutableBufferSequence &buffers,
+                                     Status *status) {
+  size_t transferred = 0;
+  auto done = std::make_shared<std::promise<void>>();
+  auto future = done->get_future();
+  async_read_some(buffers,
+                  [status, &transferred, done](const Status &stat, size_t t) {
+                    *status = stat;
+                    transferred = t;
+                    done->set_value();
+                  });
+  future.wait();
+  return transferred;
+}
+
+template <class Stream>
+Status RemoteBlockReader<Stream>::connect(
+    const std::string &client_name, const hadoop::common::TokenProto *token,
+    const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
+    uint64_t offset) {
+  auto stat = std::make_shared<std::promise<Status>>();
+  std::future<Status> future(stat->get_future());
+  async_connect(client_name, token, block, length, offset,
+                [stat](const Status &status) { stat->set_value(status); });
+  return future.get();
+}
+}
+
+#endif