You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/07/14 21:48:19 UTC
[4/5] hadoop git commit: fs
fs
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6ca03c0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6ca03c0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6ca03c0
Branch: refs/heads/8b02d962b291afe4b08c47f0398c1db0709419a1
Commit: e6ca03c0357b4b9061349e9903a2064ee0c51715
Parents: f34bda7
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Jul 14 12:43:09 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Jul 14 12:47:00 2015 -0700
----------------------------------------------------------------------
.../native/libhdfspp/include/libhdfspp/hdfs.h | 50 ++++++
.../main/native/libhdfspp/lib/common/hdfs.cc | 29 ++++
.../main/native/libhdfspp/lib/common/wrapper.h | 42 +++++
.../main/native/libhdfspp/lib/fs/CMakeLists.txt | 4 +
.../main/native/libhdfspp/lib/fs/filesystem.cc | 95 +++++++++++
.../main/native/libhdfspp/lib/fs/filesystem.h | 61 +++++++
.../main/native/libhdfspp/lib/fs/inputstream.cc | 53 ++++++
.../native/libhdfspp/lib/fs/inputstream_impl.h | 160 +++++++++++++++++++
.../native/libhdfspp/lib/fs/inputstream_test.cc | 82 ++++++++++
.../native/libhdfspp/lib/fs/namenode_protocol.h | 42 +++++
10 files changed, 618 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
new file mode 100644
index 0000000..d12f20e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBHDFSPP_HDFS_H_
+#define LIBHDFSPP_HDFS_H_
+
+#include "libhdfspp/status.h"
+
+namespace hdfs {
+
+class IoService {
+ public:
+ static IoService *New();
+ virtual void Run() = 0;
+ virtual void Stop() = 0;
+ virtual ~IoService();
+};
+
+
+class InputStream {
+ public:
+ virtual Status PositionRead(void *buf, size_t nbyte, size_t offset, size_t *read_bytes) = 0;
+ virtual ~InputStream();
+};
+
+class FileSystem {
+ public:
+ static Status New(IoService *io_service, const char *server,
+ unsigned short port, FileSystem **fsptr);
+ virtual Status Open(const char *path, InputStream **isptr) = 0;
+ virtual ~FileSystem();
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc
new file mode 100644
index 0000000..e7a5d6c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/hdfs.cc
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "wrapper.h"
+
+namespace hdfs {
+
+IoService::~IoService() {}
+
+IoService *IoService::New() {
+ return new IoServiceImpl();
+}
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h
new file mode 100644
index 0000000..39d26cc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/common/wrapper.h
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef COMMON_WRAPPER_H_
+#define COMMON_WRAPPER_H_
+
+#include "libhdfspp/hdfs.h"
+
+#include <asio/io_service.hpp>
+
+namespace hdfs {
+
+class IoServiceImpl : public IoService {
+ public:
+ virtual void Run() override {
+ asio::io_service::work work(io_service_);
+ io_service_.run();
+ }
+ virtual void Stop() override { io_service_.stop(); }
+ ::asio::io_service &io_service() { return io_service_; }
+ private:
+ ::asio::io_service io_service_;
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
new file mode 100644
index 0000000..bd649ff
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt
@@ -0,0 +1,4 @@
+add_library(fs filesystem.cc inputstream.cc)
+add_dependencies(fs proto)
+add_executable(inputstream_test inputstream_test.cc)
+target_link_libraries(inputstream_test common fs rpc reader proto ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
new file mode 100644
index 0000000..ab322c6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filesystem.h"
+
+#include "common/util.h"
+
+#include <asio/ip/tcp.hpp>
+
+#include <limits>
+
+namespace hdfs {
+
+static const char kNamenodeProtocol[] = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
+static const int kNamenodeProtocolVersion = 1;
+
+using ::asio::ip::tcp;
+
+FileSystem::~FileSystem()
+{}
+
+Status FileSystem::New(IoService *io_service, const char *server,
+ unsigned short port, FileSystem **fsptr) {
+ std::unique_ptr<FileSystemImpl> impl(new FileSystemImpl(io_service));
+ Status stat = impl->Connect(server, port);
+ if (stat.ok()) {
+ *fsptr = impl.release();
+ }
+ return stat;
+}
+
+FileSystemImpl::FileSystemImpl(IoService *io_service)
+ : io_service_(static_cast<IoServiceImpl*>(io_service))
+ , engine_(&io_service_->io_service(), RpcEngine::GetRandomClientName(),
+ kNamenodeProtocol, kNamenodeProtocolVersion)
+ , namenode_(&engine_)
+{}
+
+Status FileSystemImpl::Connect(const char *server, unsigned short port) {
+ asio::error_code ec;
+ tcp::resolver resolver(io_service_->io_service());
+ tcp::resolver::query query(tcp::v4(), server, std::to_string(port));
+ tcp::resolver::iterator iterator = resolver.resolve(query, ec);
+
+ if (ec) {
+ return ToStatus(ec);
+ }
+
+ std::vector<tcp::endpoint> servers(iterator, tcp::resolver::iterator());
+ Status stat = engine_.Connect(servers);
+ if (!stat.ok()) {
+ return stat;
+ }
+ engine_.Start();
+ return stat;
+}
+
+Status FileSystemImpl::Open(const char *path, InputStream **isptr) {
+ using ::hadoop::hdfs::GetBlockLocationsRequestProto;
+ using ::hadoop::hdfs::GetBlockLocationsResponseProto;
+
+ GetBlockLocationsRequestProto req;
+ auto resp = std::make_shared<GetBlockLocationsResponseProto>();
+ req.set_src(path);
+ req.set_offset(0);
+ req.set_length(std::numeric_limits<long long>::max());
+ auto stat_p = std::make_shared<std::promise<Status>>();
+ std::future<Status> future(stat_p->get_future());
+ namenode_.GetBlockLocations(&req, resp,
+ [stat_p](const Status &status) { stat_p->set_value(status); });
+ Status stat = future.get();
+ if (!stat.ok()) {
+ return stat;
+ }
+
+ *isptr = new InputStreamImpl(this, &resp->locations());
+ return Status::OK();
+}
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h
new file mode 100644
index 0000000..30536eb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/filesystem.h
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FS_FILESYSTEM_H_
+#define FS_FILESYSTEM_H_
+
+#include "common/wrapper.h"
+#include "libhdfspp/hdfs.h"
+#include "rpc/rpc_engine.h"
+#include "ClientNamenodeProtocol.pb.h"
+#include "ClientNamenodeProtocol.hrpc.inl"
+
+namespace hdfs {
+
+class FileSystemImpl : public FileSystem {
+ public:
+ FileSystemImpl(IoService *io_service);
+ Status Connect(const char *server, unsigned short port);
+ virtual Status Open(const char *path, InputStream **isptr) override;
+ RpcEngine &rpc_engine() { return engine_; }
+ private:
+ IoServiceImpl *io_service_;
+ RpcEngine engine_;
+ ClientNamenodeProtocol namenode_;
+};
+
+class InputStreamImpl : public InputStream {
+ public:
+ InputStreamImpl(FileSystemImpl *fs, const ::hadoop::hdfs::LocatedBlocksProto *blocks);
+ virtual Status PositionRead(void *buf, size_t nbyte, size_t offset, size_t *read_bytes) override;
+ template<class MutableBufferSequence, class Handler>
+ void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers,
+ const Handler &handler);
+ private:
+ FileSystemImpl *fs_;
+ unsigned long long file_length_;
+ std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_;
+ struct HandshakeContinuation;
+ template<class MutableBufferSequence>
+ struct ReadBlockContinuation;
+};
+
+}
+
+#include "inputstream_impl.h"
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
new file mode 100644
index 0000000..a41c684
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream.cc
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "filesystem.h"
+
+namespace hdfs {
+
+using ::hadoop::hdfs::LocatedBlocksProto;
+
+InputStream::~InputStream()
+{}
+
+InputStreamImpl::InputStreamImpl(FileSystemImpl *fs, const LocatedBlocksProto *blocks)
+ : fs_(fs)
+ , file_length_(blocks->filelength())
+{
+ for (const auto &block : blocks->blocks()) {
+ blocks_.push_back(block);
+ }
+
+ if (blocks->has_lastblock() && blocks->lastblock().b().numbytes()) {
+ blocks_.push_back(blocks->lastblock());
+ }
+}
+
+Status InputStreamImpl::PositionRead(void *buf, size_t nbyte, size_t offset, size_t *read_bytes) {
+ auto stat = std::make_shared<std::promise<Status>>();
+ std::future<Status> future(stat->get_future());
+ auto handler = [stat,read_bytes](const Status &status, size_t transferred) {
+ *read_bytes = transferred;
+ stat->set_value(status);
+ };
+
+ AsyncPreadSome(offset, asio::buffer(buf, nbyte), handler);
+ return future.get();
+}
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
new file mode 100644
index 0000000..88e1912
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FS_INPUTSTREAM_IMPL_H_
+#define FS_INPUTSTREAM_IMPL_H_
+
+#include "reader/block_reader.h"
+
+#include "common/continuation/asio.h"
+#include "common/continuation/protobuf.h"
+
+#include <functional>
+#include <future>
+
+namespace hdfs {
+
+struct InputStreamImpl::HandshakeContinuation : continuation::Continuation {
+ typedef RemoteBlockReader<::asio::ip::tcp::socket> Reader;
+ HandshakeContinuation(Reader *reader, const std::string &client_name,
+ const hadoop::common::TokenProto *token,
+ const hadoop::hdfs::ExtendedBlockProto *block,
+ uint64_t length, uint64_t offset)
+ : reader_(reader)
+ , client_name_(client_name)
+ , length_(length)
+ , offset_(offset)
+ {
+ if (token) {
+ token_.reset(new hadoop::common::TokenProto());
+ token_->CheckTypeAndMergeFrom(*token);
+ }
+ block_.CheckTypeAndMergeFrom(*block);
+ }
+
+ virtual void Run(const Next& next) override {
+ reader_->async_connect(client_name_, token_.get(), &block_, length_, offset_, next);
+ }
+
+ private:
+ Reader *reader_;
+ const std::string client_name_;
+ std::unique_ptr<hadoop::common::TokenProto> token_;
+ hadoop::hdfs::ExtendedBlockProto block_;
+ uint64_t length_;
+ uint64_t offset_;
+};
+
+template<class MutableBufferSequence>
+struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation {
+ typedef RemoteBlockReader<::asio::ip::tcp::socket> Reader;
+ ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer,
+ size_t *transferred)
+ : reader_(reader)
+ , buffer_(buffer)
+ , buffer_size_(asio::buffer_size(buffer))
+ , transferred_(transferred)
+ {}
+
+ virtual void Run(const Next& next) override {
+ *transferred_ = 0;
+ next_ = next;
+ OnReadData(Status::OK(), 0);
+ }
+
+ private:
+ Reader *reader_;
+ MutableBufferSequence buffer_;
+ const size_t buffer_size_;
+ size_t *transferred_;
+ std::function<void(const Status &)> next_;
+
+ void OnReadData(const Status &status, size_t transferred) {
+ using std::placeholders::_1;
+ using std::placeholders::_2;
+ *transferred_ += transferred;
+ if (!status.ok()) {
+ next_(status);
+ } else if (*transferred_ >= buffer_size_) {
+ next_(status);
+ } else {
+ reader_->async_read_some(
+ asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_),
+ std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2));
+ }
+ }
+};
+
+
+template<class MutableBufferSequence, class Handler>
+void InputStreamImpl::AsyncPreadSome(
+ size_t offset, const MutableBufferSequence &buffers,
+ const Handler &handler) {
+ using ::hadoop::hdfs::LocatedBlockProto;
+ namespace ip = ::asio::ip;
+ using ::asio::ip::tcp;
+
+ auto it = std::find_if(
+ blocks_.begin(), blocks_.end(),
+ [offset](const LocatedBlockProto &p) {
+ return p.offset() <= offset && offset < p.offset() + p.b().numbytes();
+ });
+
+ if (it == blocks_.end()) {
+ handler(Status::InvalidArgument("Cannot find corresponding blocks"), 0);
+ return;
+ } else if (!it->locs_size()) {
+ handler(Status::ResourceUnavailable("No datanodes available"), 0);
+ return;
+ }
+
+ uint64_t offset_within_block = offset - it->offset();
+ uint64_t size_within_block =
+ std::min<uint64_t>(it->b().numbytes() - offset_within_block, asio::buffer_size(buffers));
+
+ struct State {
+ std::unique_ptr<tcp::socket> conn;
+ std::shared_ptr<RemoteBlockReader<tcp::socket> > reader;
+ LocatedBlockProto block;
+ std::vector<tcp::endpoint> endpoints;
+ size_t transferred;
+ };
+
+ auto m = continuation::Pipeline<State>::Create();
+ auto &s = m->state();
+ s.conn.reset(new tcp::socket(fs_->rpc_engine().io_service()));
+ s.reader = std::make_shared<RemoteBlockReader<tcp::socket> >(BlockReaderOptions(), s.conn.get());
+ s.block = *it;
+ for (auto &loc : it->locs()) {
+ auto datanode = loc.id();
+ s.endpoints.push_back(tcp::endpoint(ip::address::from_string(datanode.ipaddr()), datanode.xferport()));
+ }
+
+ m->Push(continuation::Connect(s.conn.get(), s.endpoints.begin(), s.endpoints.end()))
+ .Push(new HandshakeContinuation(s.reader.get(), fs_->rpc_engine().client_name(), nullptr,
+ &s.block.b(), size_within_block, offset_within_block))
+ .Push(new ReadBlockContinuation<::asio::mutable_buffers_1>(
+ s.reader.get(), asio::buffer(buffers, size_within_block), &s.transferred));
+
+ m->Run([handler](const Status &status, const State &state) {
+ handler(status, state.transferred);
+ });
+}
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc
new file mode 100644
index 0000000..dceac86
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/inputstream_test.cc
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "libhdfspp/hdfs.h"
+
+#include <iostream>
+#include <string>
+#include <thread>
+
+using namespace hdfs;
+
+class Executor {
+ public:
+ Executor()
+ : io_service_(IoService::New())
+ , thread_(std::bind(&IoService::Run, io_service_.get()))
+ {}
+
+ IoService *io_service() { return io_service_.get(); }
+
+ ~Executor() {
+ io_service_->Stop();
+ thread_.join();
+ }
+
+ std::unique_ptr<IoService> io_service_;
+ std::thread thread_;
+};
+
+int main(int argc, char *argv[]) {
+ if (argc != 4) {
+ std::cerr
+ << "Print files stored in a HDFS cluster.\n"
+ << "Usage: " << argv[0] << " "
+ << "<nnhost> <nnport> <file>\n";
+ return 1;
+ }
+
+ Executor executor;
+
+ FileSystem *fsptr;
+ Status stat = FileSystem::New(executor.io_service(), argv[1], std::stoi(argv[2]), &fsptr);
+ if (!stat.ok()) {
+ std::cerr << "Cannot create the filesystem: " << stat.ToString() << std::endl;
+ return 1;
+ }
+
+ std::unique_ptr<FileSystem> fs(fsptr);
+
+ InputStream *isptr;
+ stat = fs->Open(argv[3], &isptr);
+ if (!stat.ok()) {
+ std::cerr << "Cannot open the file: " << stat.ToString() << std::endl;
+ return 1;
+ }
+
+ std::unique_ptr<InputStream> is(isptr);
+
+ char buf[8192] = {0,};
+ size_t read_bytes = 0;
+ stat = is->PositionRead(buf, sizeof(buf), 0, &read_bytes);
+ if (!stat.ok()) {
+ std::cerr << "Read failures: " << stat.ToString() << std::endl;
+ }
+ std::cerr << "Read bytes:" << read_bytes << std::endl << buf << std::endl;
+ return 0;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6ca03c0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h
new file mode 100644
index 0000000..80aa237
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/fs/namenode_protocol.h
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef FS_NAMENODE_PROTOCOL_H_
+#define FS_NAMENODE_PROTOCOL_H_
+
+#include "ClientNamenodeProtocol.pb.h"
+#include "rpc/rpc_engine.h"
+
+namespace hdfs {
+
+class ClientNamenodeProtocol {
+ public:
+ ClientNamenodeProtocol(RpcEngine *engine)
+ : engine_(engine)
+ {}
+
+ Status GetBlockLocations(const ::hadoop::hdfs::GetBlockLocationsRequestProto *request,
+ std::shared_ptr<::hadoop::hdfs::GetBlockLocationsResponseProto> response) {
+ return engine_->Rpc("getBlockLocations", request, response);
+ }
+ private:
+ RpcEngine *engine_;
+};
+
+};
+
+#endif