You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/07/14 21:55:58 UTC
hadoop git commit: HDFS-8758. Implement the continuation library in
libhdfspp. Contributed by Haohui Mai.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-8707 ac60c6ab4 -> 8b02d962b
HDFS-8758. Implement the continuation library in libhdfspp. Contributed by Haohui Mai.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8b02d962
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8b02d962
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8b02d962
Branch: refs/heads/HDFS-8707
Commit: 8b02d962b291afe4b08c47f0398c1db0709419a1
Parents: ac60c6a
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Jul 9 14:02:55 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Jul 10 17:09:01 2015 -0700
----------------------------------------------------------------------
.../src/main/native/libhdfspp/doc/Doxyfile.in | 1 +
.../libhdfspp/lib/common/continuation/asio.h | 112 ++++++++++++++++
.../lib/common/continuation/continuation.h | 125 ++++++++++++++++++
.../lib/common/continuation/protobuf.h | 128 +++++++++++++++++++
4 files changed, 366 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in
index 773990f..ac1d0fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/doc/Doxyfile.in
@@ -8,6 +8,7 @@ BUILTIN_STL_SUPPORT = YES
INPUT = @PROJECT_SOURCE_DIR@/doc/mainpage.dox \
@PROJECT_SOURCE_DIR@/include/libhdfspp \
+ @PROJECT_SOURCE_DIR@/lib/common/continuation \
INPUT_ENCODING = UTF-8
RECURSIVE = NO
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
new file mode 100644
index 0000000..f7d76e8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/asio.h
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_COMMON_CONTINUATION_ASIO_H_
+#define LIB_COMMON_CONTINUATION_ASIO_H_
+
+#include "continuation.h"
+#include "common/util.h"
+
+#include "libhdfspp/status.h"
+
+#include <asio/connect.hpp>
+#include <asio/read.hpp>
+#include <asio/write.hpp>
+#include <asio/ip/tcp.hpp>
+
+namespace hdfs {
+namespace continuation {
+
+template <class Stream, class MutableBufferSequence>
+class ReadContinuation : public Continuation {
+public:
+ ReadContinuation(Stream *stream, const MutableBufferSequence &buffer)
+ : stream_(stream), buffer_(buffer) {}
+ virtual void Run(const Next &next) override {
+ auto handler =
+ [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); };
+ asio::async_read(*stream_, buffer_, handler);
+ }
+
+private:
+ Stream *stream_;
+ MutableBufferSequence buffer_;
+};
+
+template <class Stream, class ConstBufferSequence>
+class WriteContinuation : public Continuation {
+public:
+ WriteContinuation(Stream *stream, const ConstBufferSequence &buffer)
+ : stream_(stream), buffer_(buffer) {}
+
+ virtual void Run(const Next &next) override {
+ auto handler =
+ [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); };
+ asio::async_write(*stream_, buffer_, handler);
+ }
+
+private:
+ Stream *stream_;
+ ConstBufferSequence buffer_;
+};
+
+template <class Socket, class Iterator>
+class ConnectContinuation : public Continuation {
+public:
+ ConnectContinuation(Socket *socket, Iterator begin, Iterator end,
+ Iterator *connected_endpoint)
+ : socket_(socket), begin_(begin), end_(end),
+ connected_endpoint_(connected_endpoint) {}
+
+ virtual void Run(const Next &next) override {
+ auto handler = [this, next](const asio::error_code &ec, Iterator it) {
+ if (connected_endpoint_) {
+ *connected_endpoint_ = it;
+ }
+ next(ToStatus(ec));
+ };
+ asio::async_connect(*socket_, begin_, end_, handler);
+ }
+
+private:
+ Socket *socket_;
+ Iterator begin_;
+ Iterator end_;
+ Iterator *connected_endpoint_;
+};
+
+template <class Stream, class ConstBufferSequence>
+static inline Continuation *Write(Stream *stream,
+ const ConstBufferSequence &buffer) {
+ return new WriteContinuation<Stream, ConstBufferSequence>(stream, buffer);
+}
+
+template <class Stream, class MutableBufferSequence>
+static inline Continuation *Read(Stream *stream,
+ const MutableBufferSequence &buffer) {
+ return new ReadContinuation<Stream, MutableBufferSequence>(stream, buffer);
+}
+
+template <class Socket, class Iterator>
+static inline Continuation *Connect(Socket *socket, Iterator begin,
+ Iterator end) {
+ return new ConnectContinuation<Socket, Iterator>(socket, begin, end, nullptr);
+}
+}
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
new file mode 100644
index 0000000..9576c2f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIB_COMMON_CONTINUATION_CONTINUATION_H_
+#define LIB_COMMON_CONTINUATION_CONTINUATION_H_
+
+#include "libhdfspp/status.h"
+
+#include <functional>
+#include <memory>
+#include <vector>
+
+namespace hdfs {
+namespace continuation {
+
+class PipelineBase;
+
+/**
+ * A continuation is a fragment of runnable code whose execution will
+ * be scheduled by a \link Pipeline \endlink.
+ *
+ * The Continuation class is a build block to implement the
+ * Continuation Passing Style (CPS) in libhdfs++. In CPS, the
+ * upper-level user specifies the control flow by chaining a sequence
+ * of continuations explicitly through the \link Run() \endlink method,
+ * while in traditional imperative programming the sequences of
+ * sentences implicitly specify the control flow.
+ *
+ * See http://en.wikipedia.org/wiki/Continuation for more details.
+ **/
+class Continuation {
+public:
+ typedef std::function<void(const Status &)> Next;
+ virtual ~Continuation() = default;
+ virtual void Run(const Next &next) = 0;
+ Continuation(const Continuation &) = delete;
+ Continuation &operator=(const Continuation &) = delete;
+
+protected:
+ Continuation() = default;
+};
+
+/**
+ * A pipeline schedules the execution of a chain of \link Continuation
+ * \endlink. The pipeline schedules the execution of continuations
+ * based on their order in the pipeline, where the next parameter for
+ * each continuation points to the \link Schedule() \endlink
+ * method. That way the pipeline executes all scheduled continuations
+ * in sequence.
+ *
+ * The typical use case of a pipeline is executing continuations
+ * asynchronously. Note that a continuation calls the next
+ * continuation when it is finished. If the continuation is posted
+ * into an asynchronous event loop, invoking the next continuation
+ * can be done in the callback handler in the asynchronous event loop.
+ *
+ * The pipeline allocates the memory as follows. A pipeline is always
+ * allocated on the heap. It owns all the continuations as well as the
+ * the state specified by the user. Both the continuations and the
+ * state have the same life cycle of the pipeline. The design
+ * simplifies the problem of ensuring that the executions in the
+ * asynchronous event loop always hold valid pointers w.r.t. the
+ * pipeline. The pipeline will automatically deallocate itself right
+ * after it invokes the callback specified the user.
+ **/
+template <class State> class Pipeline {
+public:
+ typedef std::function<void(const Status &, const State &)> UserHandler;
+ static Pipeline *Create() { return new Pipeline(); }
+ Pipeline &Push(Continuation *stage);
+ void Run(UserHandler &&handler);
+ State &state() { return state_; }
+
+private:
+ State state_;
+ std::vector<std::unique_ptr<Continuation>> routines_;
+ size_t stage_;
+ std::function<void(const Status &, const State &)> handler_;
+
+ Pipeline() : stage_(0) {}
+ ~Pipeline() = default;
+ void Schedule(const Status &status);
+};
+
+template <class State>
+inline Pipeline<State> &Pipeline<State>::Push(Continuation *stage) {
+ routines_.emplace_back(std::unique_ptr<Continuation>(stage));
+ return *this;
+}
+
+template <class State>
+inline void Pipeline<State>::Schedule(const Status &status) {
+ if (stage_ >= routines_.size()) {
+ handler_(status, state_);
+ routines_.clear();
+ delete this;
+ } else {
+ auto next = routines_[stage_].get();
+ ++stage_;
+ next->Run(std::bind(&Pipeline::Schedule, this, std::placeholders::_1));
+ }
+}
+
+template <class State> inline void Pipeline<State>::Run(UserHandler &&handler) {
+ handler_ = std::move(handler);
+ Schedule(Status::OK());
+}
+}
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b02d962/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
new file mode 100644
index 0000000..3e4b535
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_
+#define LIBHDFSPP_COMMON_CONTINUATION_PROTOBUF_H_
+
+#include "common/util.h"
+
+#include <google/protobuf/message_lite.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
+#include <cassert>
+
+namespace hdfs {
+namespace continuation {
+
+template <class Stream, size_t MaxMessageSize = 512>
+struct ReadDelimitedPBMessageContinuation : public Continuation {
+ ReadDelimitedPBMessageContinuation(Stream *stream,
+ ::google::protobuf::MessageLite *msg)
+ : stream_(stream), msg_(msg) {}
+
+ virtual void Run(const Next &next) override {
+ namespace pbio = google::protobuf::io;
+ auto handler = [this, next](const asio::error_code &ec, size_t) {
+ Status status;
+ if (ec) {
+ status = ToStatus(ec);
+ } else {
+ pbio::ArrayInputStream as(&buf_[0], buf_.size());
+ pbio::CodedInputStream is(&as);
+ uint32_t size = 0;
+ bool v = is.ReadVarint32(&size);
+ assert(v);
+ is.PushLimit(size);
+ msg_->Clear();
+ v = msg_->MergeFromCodedStream(&is);
+ assert(v);
+ }
+ next(status);
+ };
+ asio::async_read(
+ *stream_, asio::buffer(buf_),
+ std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this,
+ std::placeholders::_1, std::placeholders::_2),
+ handler);
+ }
+
+private:
+ size_t CompletionHandler(const asio::error_code &ec, size_t transferred) {
+ if (ec) {
+ return 0;
+ }
+
+ size_t offset = 0, len = 0;
+ for (size_t i = 0; i + 1 < transferred && i < sizeof(int); ++i) {
+ len = (len << 7) | (buf_[i] & 0x7f);
+ if ((uint8_t)buf_.at(i) < 0x80) {
+ offset = i + 1;
+ break;
+ }
+ }
+
+ assert(offset + len < buf_.size() && "Message is too big");
+ return offset ? len + offset - transferred : 1;
+ }
+
+ Stream *stream_;
+ ::google::protobuf::MessageLite *msg_;
+ std::array<char, MaxMessageSize> buf_;
+};
+
+template <class Stream>
+struct WriteDelimitedPBMessageContinuation : Continuation {
+ WriteDelimitedPBMessageContinuation(Stream *stream,
+ const google::protobuf::MessageLite *msg)
+ : stream_(stream), msg_(msg) {}
+
+ virtual void Run(const Next &next) override {
+ namespace pbio = google::protobuf::io;
+ int size = msg_->ByteSize();
+ buf_.reserve(pbio::CodedOutputStream::VarintSize32(size) + size);
+ pbio::StringOutputStream ss(&buf_);
+ pbio::CodedOutputStream os(&ss);
+ os.WriteVarint32(size);
+ msg_->SerializeToCodedStream(&os);
+ write_coroutine_ =
+ std::shared_ptr<Continuation>(Write(stream_, asio::buffer(buf_)));
+ write_coroutine_->Run([next](const Status &stat) { next(stat); });
+ }
+
+private:
+ Stream *stream_;
+ const google::protobuf::MessageLite *msg_;
+ std::string buf_;
+ std::shared_ptr<Continuation> write_coroutine_;
+};
+
+template <class Stream, size_t MaxMessageSize = 512>
+static inline Continuation *
+ReadDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
+ return new ReadDelimitedPBMessageContinuation<Stream, MaxMessageSize>(stream,
+ msg);
+}
+
+template <class Stream>
+static inline Continuation *
+WriteDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) {
+ return new WriteDelimitedPBMessageContinuation<Stream>(stream, msg);
+}
+}
+}
+#endif