You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/09/21 20:00:50 UTC

hadoop git commit: HDFS-9093. Initialize protobuf fields in RemoteBlockReaderTest. Contributed by Haohui Mai.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 24abbead5 -> ebaed4bd4


HDFS-9093. Initialize protobuf fields in RemoteBlockReaderTest. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ebaed4bd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ebaed4bd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ebaed4bd

Branch: refs/heads/HDFS-8707
Commit: ebaed4bd4c95684fe6ac06c73e6a90789889e849
Parents: 24abbea
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Sep 16 16:48:56 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Wed Sep 16 16:53:52 2015 -0700

----------------------------------------------------------------------
 .../native/libhdfspp/tests/mock_connection.cc   |   4 +
 .../native/libhdfspp/tests/mock_connection.h    |  10 +-
 .../libhdfspp/tests/remote_block_reader_test.cc | 160 +++++++++++--------
 3 files changed, 104 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ebaed4bd/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc
index e1dfdc7..93a3099 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc
@@ -20,6 +20,10 @@
 
 namespace hdfs {
 
+MockConnectionBase::MockConnectionBase(::asio::io_service *io_service)
+    : io_service_(io_service)
+{}
+
 MockConnectionBase::~MockConnectionBase() {}
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ebaed4bd/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h
index e917e9d..086797f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h
@@ -21,12 +21,15 @@
 #include <asio/error_code.hpp>
 #include <asio/buffer.hpp>
 #include <asio/streambuf.hpp>
+#include <asio/io_service.hpp>
+
 #include <gmock/gmock.h>
 
 namespace hdfs {
 
 class MockConnectionBase {
 public:
+  MockConnectionBase(::asio::io_service *io_service);
   virtual ~MockConnectionBase();
   typedef std::pair<asio::error_code, std::string> ProducerResult;
   template <class MutableBufferSequence, class Handler>
@@ -34,7 +37,7 @@ public:
     if (produced_.size() == 0) {
       ProducerResult r = Produce();
       if (r.first) {
-        handler(r.first, 0);
+        io_service_->post(std::bind(handler, r.first, 0));
       }
       asio::mutable_buffers_1 data = produced_.prepare(r.second.size());
       asio::buffer_copy(data, asio::buffer(r.second));
@@ -44,17 +47,18 @@ public:
     size_t len = std::min(asio::buffer_size(buf), produced_.size());
     asio::buffer_copy(buf, produced_.data());
     produced_.consume(len);
-    handler(asio::error_code(), len);
+    io_service_->post(std::bind(handler, asio::error_code(), len));
   }
 
   template <class ConstBufferSequence, class Handler>
   void async_write_some(const ConstBufferSequence &buf, Handler &&handler) {
     // CompletionResult res = OnWrite(buf);
-    handler(asio::error_code(), asio::buffer_size(buf));
+    io_service_->post(std::bind(handler, asio::error_code(), asio::buffer_size(buf)));
   }
 
 protected:
   virtual ProducerResult Produce() = 0;
+  ::asio::io_service *io_service_;
 
 private:
   asio::streambuf produced_;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ebaed4bd/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
index 5307d39..388a106 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc
@@ -51,6 +51,8 @@ namespace hdfs {
 
 class MockDNConnection : public MockConnectionBase {
 public:
+  MockDNConnection(::asio::io_service &io_service)
+      : MockConnectionBase(&io_service) {}
   MOCK_METHOD0(Produce, ProducerResult());
 };
 }
@@ -91,35 +93,30 @@ ProducePacket(const std::string &data, const std::string &checksum,
   return std::make_pair(error_code(), std::move(payload));
 }
 
-template<class Stream = MockDNConnection>
+template <class Stream = MockDNConnection, class Handler>
 static std::shared_ptr<RemoteBlockReader<Stream>>
-ReadContent(Stream *conn, TokenProto *token,
-            const ExtendedBlockProto &block, uint64_t length, uint64_t offset,
-            const mutable_buffers_1 &buf, Status *status, size_t *transferred) {
+ReadContent(Stream *conn, TokenProto *token, const ExtendedBlockProto &block,
+            uint64_t length, uint64_t offset, const mutable_buffers_1 &buf,
+            const Handler &handler) {
   BlockReaderOptions options;
-  auto reader =
-      std::make_shared<RemoteBlockReader<Stream>>(options, conn);
+  auto reader = std::make_shared<RemoteBlockReader<Stream>>(options, conn);
   Status result;
-  reader->async_connect(
-      "libhdfs++", token, &block, length, offset,
-      [buf, reader, status, transferred](const Status &stat) {
-        if (!stat.ok()) {
-          *status = stat;
-        } else {
-          reader->async_read_some(
-              buf, [status, transferred](const Status &stat, size_t t) {
-                *transferred = t;
-                *status = stat;
-              });
-        }
-      });
+  reader->async_connect("libhdfs++", token, &block, length, offset,
+                        [buf, reader, handler](const Status &stat) {
+                          if (!stat.ok()) {
+                            handler(stat, 0);
+                          } else {
+                            reader->async_read_some(buf, handler);
+                          }
+                        });
   return reader;
 }
 
 TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
   static const size_t kChunkSize = 512;
   static const string kChunkData(kChunkSize, 'a');
-  MockDNConnection conn;
+  ::asio::io_service io_service;
+  MockDNConnection conn(io_service);
   BlockOpResponseProto block_op_resp;
 
   block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
@@ -128,15 +125,20 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
       .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
 
   ExtendedBlockProto block;
+  block.set_poolid("foo");
+  block.set_blockid(0);
+  block.set_generationstamp(0);
+  
   std::string data(kChunkSize, 0);
-  size_t transferred = 0;
-  Status stat;
   ReadContent(&conn, nullptr, block, kChunkSize, 0,
-              buffer(const_cast<char *>(data.c_str()), data.size()), &stat,
-              &transferred);
-  ASSERT_TRUE(stat.ok());
-  ASSERT_EQ(kChunkSize, transferred);
-  ASSERT_EQ(kChunkData, data);
+              buffer(const_cast<char *>(data.c_str()), data.size()),
+              [&data, &io_service](const Status &stat, size_t transferred) {
+                ASSERT_TRUE(stat.ok());
+                ASSERT_EQ(kChunkSize, transferred);
+                ASSERT_EQ(kChunkData, data);
+                io_service.stop();
+              });
+  io_service.run();
 }
 
 TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
@@ -145,7 +147,8 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
   static const size_t kOffset = kChunkSize / 4;
   static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b');
 
-  MockDNConnection conn;
+  ::asio::io_service io_service;
+  MockDNConnection conn(io_service);
   BlockOpResponseProto block_op_resp;
   ReadOpChecksumInfoProto *checksum_info =
       block_op_resp.mutable_readopchecksuminfo();
@@ -160,22 +163,28 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) {
       .WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true)));
 
   ExtendedBlockProto block;
+  block.set_poolid("foo");
+  block.set_blockid(0);
+  block.set_generationstamp(0);
+
   string data(kLength, 0);
-  size_t transferred = 0;
-  Status stat;
   ReadContent(&conn, nullptr, block, data.size(), kOffset,
-              buffer(const_cast<char *>(data.c_str()), data.size()), &stat,
-              &transferred);
-  ASSERT_TRUE(stat.ok());
-  ASSERT_EQ(kLength, transferred);
-  ASSERT_EQ(kChunkData.substr(kOffset, kLength), data);
+              buffer(const_cast<char *>(data.c_str()), data.size()),
+              [&data, &io_service](const Status &stat, size_t transferred) {
+                ASSERT_TRUE(stat.ok());
+                ASSERT_EQ(kLength, transferred);
+                ASSERT_EQ(kChunkData.substr(kOffset, kLength), data);
+                io_service.stop();
+              });
+  io_service.run();
 }
 
 TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
   static const size_t kChunkSize = 1024;
   static const string kChunkData(kChunkSize, 'a');
 
-  MockDNConnection conn;
+  ::asio::io_service io_service;
+  MockDNConnection conn(io_service);
   BlockOpResponseProto block_op_resp;
   block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
 
@@ -185,25 +194,37 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
       .WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true)));
 
   ExtendedBlockProto block;
+  block.set_poolid("foo");
+  block.set_blockid(0);
+  block.set_generationstamp(0);
+
   string data(kChunkSize, 0);
-  size_t transferred = 0;
-  Status stat;
   mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size());
-  auto reader = ReadContent(&conn, nullptr, block, data.size(), 0, buf, &stat,
-                            &transferred);
-  ASSERT_TRUE(stat.ok());
-  ASSERT_EQ(kChunkSize, transferred);
-  ASSERT_EQ(kChunkData, data);
-
-  data.clear();
-  data.resize(kChunkSize);
-  transferred = 0;
-
-  reader->async_read_some(buf, [&data](const Status &stat, size_t transferred) {
-    ASSERT_TRUE(stat.ok());
-    ASSERT_EQ(kChunkSize, transferred);
-    ASSERT_EQ(kChunkData, data);
-  });
+  BlockReaderOptions options;
+  auto reader = std::make_shared<RemoteBlockReader<MockDNConnection> >(options, &conn);
+  Status result;
+  reader->async_connect(
+      "libhdfs++", nullptr, &block, data.size(), 0,
+      [buf, reader, &data, &io_service](const Status &stat) {
+        ASSERT_TRUE(stat.ok());
+        reader->async_read_some(
+            buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) {
+              ASSERT_TRUE(stat.ok());
+              ASSERT_EQ(kChunkSize, transferred);
+              ASSERT_EQ(kChunkData, data);
+              data.clear();
+              data.resize(kChunkSize);
+              transferred = 0;
+              reader->async_read_some(
+                  buf, [&data,&io_service](const Status &stat, size_t transferred) {
+                    ASSERT_TRUE(stat.ok());
+                    ASSERT_EQ(kChunkSize, transferred);
+                    ASSERT_EQ(kChunkData, data);
+                    io_service.stop();
+                  });
+            });
+      });
+  io_service.run();
 }
 
 TEST(RemoteBlockReaderTest, TestSaslConnection) {
@@ -212,7 +233,8 @@ TEST(RemoteBlockReaderTest, TestSaslConnection) {
   static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/"
                                      "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\","
                                      "charset=utf-8,algorithm=md5-sess";
-  MockDNConnection conn;
+  ::asio::io_service io_service;
+  MockDNConnection conn(io_service);
   BlockOpResponseProto block_op_resp;
   block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
 
@@ -233,20 +255,24 @@ TEST(RemoteBlockReaderTest, TestSaslConnection) {
 
   DataTransferSaslStream<MockDNConnection> sasl_conn(&conn, "foo", "bar");
   ExtendedBlockProto block;
+  block.set_poolid("foo");
+  block.set_blockid(0);
+  block.set_generationstamp(0);
+
   std::string data(kChunkSize, 0);
-  size_t transferred = 0;
-  Status stat;
-  sasl_conn.Handshake([&stat](const Status &s) {
-      stat = s;
-    });
-
-  ASSERT_TRUE(stat.ok());
-  ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0,
-              buffer(const_cast<char *>(data.c_str()), data.size()), &stat,
-              &transferred);
-  ASSERT_TRUE(stat.ok());
-  ASSERT_EQ(kChunkSize, transferred);
-  ASSERT_EQ(kChunkData, data);
+  sasl_conn.Handshake([&sasl_conn, &block, &data, &io_service](
+      const Status &s) {
+    ASSERT_TRUE(s.ok());
+    ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0,
+                buffer(const_cast<char *>(data.c_str()), data.size()),
+                [&data, &io_service](const Status &stat, size_t transferred) {
+                  ASSERT_TRUE(stat.ok());
+                  ASSERT_EQ(kChunkSize, transferred);
+                  ASSERT_EQ(kChunkData, data);
+                  io_service.stop();
+                });
+  });
+  io_service.run();
 }
 
 int main(int argc, char *argv[]) {