You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/07/14 21:55:58 UTC

hadoop git commit: HDFS-8758. Implement the continuation library in libhdfspp. Contributed by Haohui Mai.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 ac60c6ab4 -> 8b02d962b


HDFS-8758. Implement the continuation library in libhdfspp. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8b02d962
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8b02d962
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8b02d962

Branch: refs/heads/HDFS-8707
Commit: 8b02d962b291afe4b08c47f0398c1db0709419a1
Parents: ac60c6a
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Jul 9 14:02:55 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Jul 10 17:09:01 2015 -0700

----------------------------------------------------------------------
 .../src/main/native/libhdfspp/doc/Doxyfile.in   |   1 +
 .../libhdfspp/lib/common/continuation/asio.h    | 112 ++++++++++++++++
 .../lib/common/continuation/continuation.h      | 125 ++++++++++++++++++
 .../lib/common/continuation/protobuf.h          | 128 +++++++++++++++++++
 4 files changed, 366 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in
index 773990f..ac1d0fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in
@@ -8,6 +8,7 @@ BUILTIN_STL_SUPPORT    = YES
 
 INPUT                  = @PROJECT_SOURCE_DIR@/doc/mainpage.dox \
                          @PROJECT_SOURCE_DIR@/include/libhdfspp \
+                         @PROJECT_SOURCE_DIR@/lib/common/continuation \
 
 INPUT_ENCODING         = UTF-8
 RECURSIVE              = NO

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
new file mode 100644
index 0000000..f7d76e8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_COMMON_CONTINUATION_ASIO_H_
+#define LIB_COMMON_CONTINUATION_ASIO_H_
+
+#include "continuation.h"
+#include "common/util.h"
+
+#include "libhdfspp/status.h"
+
+#include <asio/connect.hpp>
+#include <asio/read.hpp>
+#include <asio/write.hpp>
+#include <asio/ip/tcp.hpp>
+
+namespace hdfs {
+namespace continuation {
+
+template <class Stream, class MutableBufferSequence>
+class ReadContinuation : public Continuation {
+public:
+  ReadContinuation(Stream *stream, const MutableBufferSequence &buffer)
+      : stream_(stream), buffer_(buffer) {}
+  virtual void Run(const Next &next) override {
+    auto handler =
+        [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); };
+    asio::async_read(*stream_, buffer_, handler);
+  }
+
+private:
+  Stream *stream_;
+  MutableBufferSequence buffer_;
+};
+
+template <class Stream, class ConstBufferSequence>
+class WriteContinuation : public Continuation {
+public:
+  WriteContinuation(Stream *stream, const ConstBufferSequence &buffer)
+      : stream_(stream), buffer_(buffer) {}
+
+  virtual void Run(const Next &next) override {
+    auto handler =
+        [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); };
+    asio::async_write(*stream_, buffer_, handler);
+  }
+
+private:
+  Stream *stream_;
+  ConstBufferSequence buffer_;
+};
+
+template <class Socket, class Iterator>
+class ConnectContinuation : public Continuation {
+public:
+  ConnectContinuation(Socket *socket, Iterator begin, Iterator end,
+                      Iterator *connected_endpoint)
+      : socket_(socket), begin_(begin), end_(end),
+        connected_endpoint_(connected_endpoint) {}
+
+  virtual void Run(const Next &next) override {
+    auto handler = [this, next](const asio::error_code &ec, Iterator it) {
+      if (connected_endpoint_) {
+        *connected_endpoint_ = it;
+      }
+      next(ToStatus(ec));
+    };
+    asio::async_connect(*socket_, begin_, end_, handler);
+  }
+
+private:
+  Socket *socket_;
+  Iterator begin_;
+  Iterator end_;
+  Iterator *connected_endpoint_;
+};
+
+template <class Stream, class ConstBufferSequence>
+static inline Continuation *Write(Stream *stream,
+                                  const ConstBufferSequence &buffer) {
+  return new WriteContinuation<Stream, ConstBufferSequence>(stream, buffer);
+}
+
+template <class Stream, class MutableBufferSequence>
+static inline Continuation *Read(Stream *stream,
+                                 const MutableBufferSequence &buffer) {
+  return new ReadContinuation<Stream, MutableBufferSequence>(stream, buffer);
+}
+
+template <class Socket, class Iterator>
+static inline Continuation *Connect(Socket *socket, Iterator begin,
+                                    Iterator end) {
+  return new ConnectContinuation<Socket, Iterator>(socket, begin, end, nullptr);
+}
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
new file mode 100644
index 0000000..9576c2f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_COMMON_CONTINUATION_CONTINUATION_H_
+#define LIB_COMMON_CONTINUATION_CONTINUATION_H_
+
+#include "libhdfspp/status.h"
+
+#include <functional>
+#include <memory>
+#include <vector>
+
+namespace hdfs {
+namespace continuation {
+
+class PipelineBase;
+
+/**
+ * A continuation is a fragment of runnable code whose execution will
+ * be scheduled by a \link Pipeline \endlink.
+ *
+ * The Continuation class is a build block to implement the
+ * Continuation Passing Style (CPS) in libhdfs++. In CPS, the
+ * upper-level user specifies the control flow by chaining a sequence
+ * of continuations explicitly through the \link Run() \endlink method,
+ * while in traditional imperative programming the sequences of
+ * sentences implicitly specify the control flow.
+ *
+ * See http://en.wikipedia.org/wiki/Continuation for more details.
+ **/
+class Continuation {
+public:
+  typedef std::function<void(const Status &)> Next;
+  virtual ~Continuation() = default;
+  virtual void Run(const Next &next) = 0;
+  Continuation(const Continuation &) = delete;
+  Continuation &operator=(const Continuation &) = delete;
+
+protected:
+  Continuation() = default;
+};
+
+/**
+ * A pipeline schedules the execution of a chain of \link Continuation
+ * \endlink. The pipeline schedules the execution of continuations
+ * based on their order in the pipeline, where the next parameter for
+ * each continuation points to the \link Schedule() \endlink
+ * method. That way the pipeline executes all scheduled continuations
+ * in sequence.
+ *
+ * The typical use case of a pipeline is executing continuations
+ * asynchronously. Note that a continuation calls the next
+ * continuation when it is finished. If the continuation is posted
+ * into an asynchronous event loop, invoking the next continuation
+ * can be done in the callback handler in the asynchronous event loop.
+ *
+ * The pipeline allocates the memory as follows. A pipeline is always
+ * allocated on the heap. It owns all the continuations as well as the
+ * the state specified by the user. Both the continuations and the
+ * state have the same life cycle of the pipeline. The design
+ * simplifies the problem of ensuring that the executions in the
+ * asynchronous event loop always hold valid pointers w.r.t. the
+ * pipeline. The pipeline will automatically deallocate itself right
+ * after it invokes the callback specified the user.
+ **/
+template <class State> class Pipeline {
+public:
+  typedef std::function<void(const Status &, const State &)> UserHandler;
+  static Pipeline *Create() { return new Pipeline(); }
+  Pipeline &Push(Continuation *stage);
+  void Run(UserHandler &&handler);
+  State &state() { return state_; }
+
+private:
+  State state_;
+  std::vector<std::unique_ptr<Continuation>> routines_;
+  size_t stage_;
+  std::function<void(const Status &, const State &)> handler_;
+
+  Pipeline() : stage_(0) {}
+  ~Pipeline() = default;
+  void Schedule(const Status &status);
+};
+
+template <class State>
+inline Pipeline<State> &Pipeline<State>::Push(Continuation *stage) {
+  routines_.emplace_back(std::unique_ptr<Continuation>(stage));
+  return *this;
+}
+
+template <class State>
+inline void Pipeline<State>::Schedule(const Status &status) {
+  if (stage_ >= routines_.size()) {
+    handler_(status, state_);
+    routines_.clear();
+    delete this;
+  } else {
+    auto next = routines_[stage_].get();
+    ++stage_;
+    next->Run(std::bind(&Pipeline::Schedule, this, std::placeholders::_1));
+  }
+}
+
+template <class State> inline void Pipeline<State>::Run(UserHandler &&handler) {
+  handler_ = std::move(handler);
+  Schedule(Status::OK());
+}
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
new file mode 100644
index 0000000..3e4b535
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_
+#define LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_
+
+#include "common/util.h"
+
+#include <google/protobuf/message_lite.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
+#include <cassert>
+
+namespace hdfs {
+namespace continuation {
+
+template <class Stream, size_t MaxMessageSize = 512>
+struct ReadDelimitedPBMessageContinuation : public Continuation {
+  ReadDelimitedPBMessageContinuation(Stream *stream,
+                                     ::google::protobuf::MessageLite *msg)
+      : stream_(stream), msg_(msg) {}
+
+  virtual void Run(const Next &next) override {
+    namespace pbio = google::protobuf::io;
+    auto handler = [this, next](const asio::error_code &ec, size_t) {
+      Status status;
+      if (ec) {
+        status = ToStatus(ec);
+      } else {
+        pbio::ArrayInputStream as(&buf_[0], buf_.size());
+        pbio::CodedInputStream is(&as);
+        uint32_t size = 0;
+        bool v = is.ReadVarint32(&size);
+        assert(v);
+        is.PushLimit(size);
+        msg_->Clear();
+        v = msg_->MergeFromCodedStream(&is);
+        assert(v);
+      }
+      next(status);
+    };
+    asio::async_read(
+        *stream_, asio::buffer(buf_),
+        std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this,
+                  std::placeholders::_1, std::placeholders::_2),
+        handler);
+  }
+
+private:
+  size_t CompletionHandler(const asio::error_code &ec, size_t transferred) {
+    if (ec) {
+      return 0;
+    }
+
+    size_t offset = 0, len = 0;
+    for (size_t i = 0; i + 1 < transferred && i < sizeof(int); ++i) {
+      len = (len << 7) | (buf_[i] & 0x7f);
+      if ((uint8_t)buf_.at(i) < 0x80) {
+        offset = i + 1;
+        break;
+      }
+    }
+
+    assert(offset + len < buf_.size() && "Message is too big");
+    return offset ? len + offset - transferred : 1;
+  }
+
+  Stream *stream_;
+  ::google::protobuf::MessageLite *msg_;
+  std::array<char, MaxMessageSize> buf_;
+};
+
+template <class Stream>
+struct WriteDelimitedPBMessageContinuation : Continuation {
+  WriteDelimitedPBMessageContinuation(Stream *stream,
+                                      const google::protobuf::MessageLite *msg)
+      : stream_(stream), msg_(msg) {}
+
+  virtual void Run(const Next &next) override {
+    namespace pbio = google::protobuf::io;
+    int size = msg_->ByteSize();
+    buf_.reserve(pbio::CodedOutputStream::VarintSize32(size) + size);
+    pbio::StringOutputStream ss(&buf_);
+    pbio::CodedOutputStream os(&ss);
+    os.WriteVarint32(size);
+    msg_->SerializeToCodedStream(&os);
+    write_coroutine_ =
+        std::shared_ptr<Continuation>(Write(stream_, asio::buffer(buf_)));
+    write_coroutine_->Run([next](const Status &stat) { next(stat); });
+  }
+
+private:
+  Stream *stream_;
+  const google::protobuf::MessageLite *msg_;
+  std::string buf_;
+  std::shared_ptr<Continuation> write_coroutine_;
+};
+
+template <class Stream, size_t MaxMessageSize = 512>
+static inline Continuation *
+ReadDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
+  return new ReadDelimitedPBMessageContinuation<Stream, MaxMessageSize>(stream,
+                                                                        msg);
+}
+
+template <class Stream>
+static inline Continuation *
+WriteDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
+  return new WriteDelimitedPBMessageContinuation<Stream>(stream, msg);
+}
+}
+}
+#endif