You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2020/08/12 15:34:13 UTC

[qpid-proton] 03/05: PROTON-2247: Tests for common raw connection code

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 1b4bdedb696988271e9455b70d95f6fef90ca3b3
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Thu Mar 12 14:30:27 2020 -0400

    PROTON-2247: Tests for common raw connection code
---
 c/tests/CMakeLists.txt          |   4 +
 c/tests/raw_connection_test.cpp | 723 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 727 insertions(+)

diff --git a/c/tests/CMakeLists.txt b/c/tests/CMakeLists.txt
index 6c762b4..3417304 100644
--- a/c/tests/CMakeLists.txt
+++ b/c/tests/CMakeLists.txt
@@ -75,6 +75,10 @@ if (CMAKE_CXX_COMPILER)
     add_c_test(c-proactor-test pn_test_proactor.cpp proactor_test.cpp)
     target_link_libraries(c-proactor-test qpid-proton-core qpid-proton-proactor ${PLATFORM_LIBS})
 
+    list(TRANSFORM qpid-proton-proactor PREPEND "../" OUTPUT_VARIABLE qpid-proton-proactor-src)
+    add_c_test(c-raw-connection-test raw_connection_test.cpp ${qpid-proton-proactor-src})
+    target_link_libraries(c-raw-connection-test qpid-proton-core ${PLATFORM_LIBS} ${PROACTOR_LIBS})
+
     add_c_test(c-ssl-proactor-test pn_test_proactor.cpp ssl_proactor_test.cpp)
     target_link_libraries(c-ssl-proactor-test qpid-proton-core qpid-proton-proactor ${PLATFORM_LIBS})
 
diff --git a/c/tests/raw_connection_test.cpp b/c/tests/raw_connection_test.cpp
new file mode 100644
index 0000000..d32103b
--- /dev/null
+++ b/c/tests/raw_connection_test.cpp
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/raw_connection.h>
+#include "proactor/raw_connection-internal.h"
+
+#include "pn_test.hpp"
+
+#ifdef _WIN32
+#include <errno.h>
+#else
+#include <sys/socket.h>
+#include <unistd.h>
+#include <errno.h>
+#endif
+
+#include <string.h>
+
+#include <array>
+#include <utility>
+#include <string>
+#include <vector>
+
+using namespace pn_test;
+using Catch::Matchers::Contains;
+using Catch::Matchers::Equals;
+
+namespace {
+  pn_raw_connection_t* mk_raw_connection() {
+    pn_raw_connection_t* rc = (pn_raw_connection_t*) calloc(1, sizeof(struct pn_raw_connection_t));
+    pni_raw_initialize(rc);
+    return rc;
+  }
+  void free_raw_connection(pn_raw_connection_t* c) {
+    pni_raw_finalize(c);
+    free(c);
+  }
+  int read_err;
+  void set_read_error(pn_raw_connection_t*, const char*, int err) {
+    read_err = err;
+  }
+  int write_err;
+  void set_write_error(pn_raw_connection_t*, const char*, int err) {
+    write_err = err;
+  }
+
+  size_t max_send_size = 0;
+  size_t max_recv_size = 0;
+
+#ifdef MSG_DONTWAIT
+  long rcv(int fd, void* b, size_t s) {
+    read_err = 0;
+    if (max_recv_size && max_recv_size < s) s = max_recv_size;
+    return ::recv(fd, b, s, MSG_DONTWAIT);
+  }
+
+  void freepair(int fds[2]) {
+      ::close(fds[0]);
+      ::close(fds[1]);
+  }
+
+  void rcv_stop(int fd) {
+      ::shutdown(fd, SHUT_RD);
+  }
+
+  void snd_stop(int fd) {
+      ::shutdown(fd, SHUT_WR);
+  }
+
+#ifdef MSG_NOSIGNAL
+  long snd(int fd, const void* b, size_t s) {
+    write_err = 0;
+    if (max_send_size && max_send_size < s) s = max_send_size;
+    return ::send(fd, b, s, MSG_NOSIGNAL | MSG_DONTWAIT);
+  }
+
+  int makepair(int fds[2]) {
+    return ::socketpair(AF_LOCAL, SOCK_STREAM, PF_UNSPEC, fds);
+  }
+#elif defined(SO_NOSIGPIPE)
+  long snd(int fd, const void* b, size_t s) {
+    write_err = 0;
+    if (max_send_size && max_send_size < s) s = max_send_size;
+    return ::send(fd, b, s, MSG_DONTWAIT);
+  }
+
+  int makepair(int fds[2]) {
+    int rc = ::socketpair(AF_LOCAL, SOCK_STREAM, PF_UNSPEC, fds);
+    if (rc == 0) {
+      int optval = 1;
+      ::setsockopt(fds[0], SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof(optval));
+      ::setsockopt(fds[1], SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof(optval));
+    }
+    return rc;
+  }
+#endif
+#else
+  // Simple mock up of the read/write functions of a socketpair for testing
+  // systems without socketpairs (Windows really)
+  // TODO: perhaps this should used everywhere
+  static const uint16_t buffsize = 4096;
+  struct fbuf {
+    uint8_t buff[buffsize*2] = {};
+    int linked_fd = -1;
+    size_t head = 0;
+    size_t size = 0;
+    bool rclosed = true;
+    bool wclosed = true;
+
+    bool closed() {
+      return rclosed && wclosed && linked_fd == -1;
+    }
+
+    void open_linked(int linked_fd0) {
+      CHECK(closed());
+      CHECK(head == 0);
+      CHECK(size == 0);
+      linked_fd = linked_fd0;
+      rclosed = false;
+      wclosed = false;
+    }
+
+    void shutdown_rd() {
+      rclosed = true;
+    }
+
+    void shutdown_wrt() {
+      wclosed = true;
+    }
+
+    void close() {
+      CHECK_FALSE(closed());
+      linked_fd = -1;
+      rclosed = true;
+      wclosed = true;
+      head = 0;
+      size = 0;
+    }
+  };
+
+  static std::vector<fbuf> buffers;
+
+  long rcv(int fd, void* b, size_t s){
+    CHECK(fd < buffers.size());
+    read_err = 0;
+    if (max_recv_size && max_recv_size < s) s = max_recv_size;
+
+    fbuf& buffer = buffers[fd];
+    if (buffer.size == 0) {
+      if (buffer.rclosed) {
+        return 0;
+      } else {
+        errno = EWOULDBLOCK;
+        return -1;
+      }
+    }
+
+    if (buffer.size < s) s = buffer.size;
+
+    ::memcpy(b, &buffer.buff[buffer.head], s);
+    buffer.head += s % buffsize;
+    buffer.size -= s;
+    return s;
+  }
+
+  long snd(int fd, const void* b, size_t s){
+    CHECK(fd < buffers.size());
+    write_err = 0;
+    if (max_send_size && max_send_size < s) s = max_send_size;
+
+    // Write to linked buffer
+    fbuf& buffer = buffers[fd];
+    fbuf& linked_buffer = buffers[buffer.linked_fd];
+    if (linked_buffer.size == buffsize) {
+      errno = EWOULDBLOCK;
+      return -1;
+    }
+    if (linked_buffer.rclosed) {
+      errno = EPIPE;
+      return -1;
+    }
+    if (s + linked_buffer.size > buffsize) s = buffsize - linked_buffer.size;
+    ::memcpy(&linked_buffer.buff[linked_buffer.head+linked_buffer.size], b, s);
+    // If we wrote into the second half them write again into the hole at the front
+    if (linked_buffer.head+linked_buffer.size > buffsize) {
+      size_t r = linked_buffer.head+linked_buffer.size - buffsize;
+      ::memmove(&linked_buffer.buff[0], &linked_buffer.buff[buffsize], r);
+    }
+    linked_buffer.size += s;
+    return s;
+  }
+
+  void rcv_stop(int fd) {
+    CHECK(fd < buffers.size());
+    buffers[fd].shutdown_rd();
+  }
+
+  void snd_stop(int fd) {
+    CHECK(fd < buffers.size());
+    buffers[fd].shutdown_wrt();
+    buffers[buffers[fd].linked_fd].shutdown_rd();
+  }
+
+  int makepair(int fds[2]) {
+    size_t maximum_fd = buffers.size();
+    buffers.resize( buffers.size()+2);
+    buffers[maximum_fd].open_linked(maximum_fd+1);
+    buffers[maximum_fd+1].open_linked(maximum_fd);
+    fds[0] = maximum_fd;
+    fds[1] = maximum_fd+1;
+    return 0;
+  }
+
+  void freepair(int fds[2]) {
+    CHECK(fds[0] < buffers.size());
+    CHECK(fds[1] < buffers.size());
+    buffers[fds[0]].close();
+    buffers[fds[1]].close();
+  }
+#endif
+
+  // Block of memory for buffers
+  const size_t BUFFMEMSIZE = 8*1024;
+  const size_t RBUFFCOUNT = 32;
+  const size_t WBUFFCOUNT = 32;
+
+  char rbuffer_memory[BUFFMEMSIZE];
+  char *rbuffer_brk = rbuffer_memory;
+
+  pn_raw_buffer_t rbuffs[RBUFFCOUNT];
+  pn_raw_buffer_t wbuffs[WBUFFCOUNT];
+
+  class BufferAllocator {
+    char* buffer;
+    uint32_t size;
+    uint32_t brk;
+
+  public:
+    BufferAllocator(char* b, uint32_t s) : buffer(b), size(s), brk(0) {};
+
+    char* next(uint32_t s) {
+      if ( brk+s > size) return NULL;
+
+      char *r = buffer+brk;
+      brk += s;
+      return r;
+    }
+
+    template <class B>
+    B next_buffer(uint32_t s);
+
+    template <class B, int N>
+    void split_buffers(B (&buffers)[N]) {
+      uint32_t buffsize  = (size-brk)/N;
+      uint32_t remainder = (size-brk)%N;
+      for (int i = 0; i<N; ++i) {
+        buffers[i] = next_buffer<B>(i==0 ? buffsize+remainder : buffsize);
+      }
+    }
+  };
+
+  template <>
+  pn_raw_buffer_t BufferAllocator::next_buffer(uint32_t s) {
+    pn_raw_buffer_t b = {};
+    b.bytes = next(s);
+    if (b.bytes) {b.capacity = s; b.size = s;}
+    return b;
+  }
+}
+
+char message[] =
+"Jabberwocky\n"
+"By Lewis Carroll\n"
+"\n"
+"'Twas brillig, and the slithy toves\n"
+"Did gyre and gimble in the wabe:\n"
+"All mimsy were the borogroves,\n"
+"And the mome raths outgrabe.\n"
+"\n"
+"Beware the Jabberwock, my son!\n"
+"The jaws that bite, the claws that catch!\n"
+"Beware the Jubjub bird, and shun\n"
+"The frumious Bandersnatch!\n"
+"\n"
+"He took his vorpal sword in hand;\n"
+"Long time the manxome foe he sought-\n"
+"So rested he by the Tumtum tree\n"
+"And stood awhile in thought.\n"
+"\n"
+"And, as in uffish thought he stood,\n"
+"The Jabberwock with eyes of flame,\n"
+"Came whiffling through the tulgey wood,\n"
+"And burbled as it came!\n"
+"\n"
+"One, two! One, two! And through and through,\n"
+"The vorpal blade went snicker-snack!\n"
+"He left it dead, and with its head\n"
+"He went galumphing back.\n"
+"\n"
+"\"And hast thou slain the JabberWock?\n"
+"Come to my arms, my beamish boy!\n"
+"O frabjous day! Callooh! Callay!\"\n"
+"He chortled in his joy.\n"
+"\n"
+"'Twas brillig, and the slithy toves\n"
+"Did gyre and gimble in the wabe:\n"
+"All mimsy were the borogroves,\n"
+"And the mome raths outgrabe.\n"
+;
+
+TEST_CASE("raw connection") {
+  auto_free<pn_raw_connection_t, free_raw_connection> p(mk_raw_connection());
+  max_send_size = 0;
+
+  REQUIRE(p);
+  REQUIRE(pni_raw_validate(p));
+  CHECK_FALSE(pn_raw_connection_is_read_closed(p));
+  CHECK_FALSE(pn_raw_connection_is_write_closed(p));
+
+  int rbuff_count = pn_raw_connection_read_buffers_capacity(p);
+  CHECK(rbuff_count>0);
+  int wbuff_count = pn_raw_connection_write_buffers_capacity(p);
+  CHECK(wbuff_count>0);
+
+  BufferAllocator rb(rbuffer_memory, sizeof(rbuffer_memory));
+  BufferAllocator wb(message, sizeof(message));
+
+  rb.split_buffers(rbuffs);
+  wb.split_buffers(wbuffs);
+
+  int rtaken = pn_raw_connection_give_read_buffers(p, rbuffs, RBUFFCOUNT);
+  REQUIRE(pni_raw_validate(p));
+  REQUIRE(rtaken==rbuff_count);
+
+  SECTION("Write multiple per event loop") {
+    int wtaken = 0;
+    for (size_t i = 0; i < WBUFFCOUNT; ++i) {
+      int taken = pn_raw_connection_write_buffers(p, &wbuffs[i], 1);
+      if (taken==0) break;
+      REQUIRE(pni_raw_validate(p));
+      REQUIRE(taken==1);
+      wtaken += taken;
+    }
+
+    CHECK(pn_raw_connection_read_buffers_capacity(p) == 0);
+    CHECK(pn_raw_connection_write_buffers_capacity(p) == 0);
+
+    std::vector<pn_raw_buffer_t> read(rtaken);
+    std::vector<pn_raw_buffer_t> written(wtaken);
+
+    SECTION("Simple tests using a looped back socketpair") {
+      int fds[2];
+      REQUIRE(makepair(fds) == 0);
+      pni_raw_connected(p);
+
+      // First event is always connected
+      REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_CONNECTED);
+      // Mo need buffers event as we already gave buffers
+      REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+
+      SECTION("Write then read") {
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == 0);
+        int wgiven = pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        CHECK(wgiven==wtaken);
+
+        // Write more
+        for (size_t i = wtaken; i < WBUFFCOUNT; ++i) {
+          int taken = pn_raw_connection_write_buffers(p, &wbuffs[i], 1);
+          if (taken==0) break;
+          REQUIRE(pni_raw_validate(p));
+          CHECK(taken==1);
+          wtaken += taken;
+        }
+
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
+        wgiven += pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(wgiven==wtaken);
+
+        // At this point we've written every buffer
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == wbuff_count);
+
+        pni_raw_read(p, fds[1], rcv, set_read_error);
+        CHECK(read_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == 0);
+        int rgiven = pn_raw_connection_take_read_buffers(p, &read[0], read.size());
+        REQUIRE(pni_raw_validate(p));
+        CHECK(rgiven > 0);
+
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == rgiven);
+
+        // At this point we should have read everything - make sure it matches
+        char* start = message;
+        for (int i = 0; i < rgiven; ++i) {
+          CHECK(read[i].size > 0);
+          CHECK(std::string(read[i].bytes, read[i].size) == std::string(start, read[i].size));
+          start += read[i].size;
+        }
+        REQUIRE(start-message == sizeof(message));
+      }
+      SECTION("Write then read, short writes") {
+        max_send_size = 10;
+        int wgiven = 0;
+        do {
+          do {
+            pni_raw_write(p, fds[0], snd, set_write_error);
+            CHECK(write_err == 0);
+            REQUIRE(pni_raw_validate(p));
+          } while (pn_event_type(pni_raw_event_next(p)) != PN_RAW_CONNECTION_WRITTEN);
+          CHECK(pn_raw_connection_write_buffers_capacity(p) == wgiven);
+          int given = pn_raw_connection_take_written_buffers(p, &written[wgiven], written.size()-wgiven);
+          REQUIRE(pni_raw_validate(p));
+          CHECK(given == 1);
+          CHECK(written[wgiven].offset == wbuffs[wgiven].offset);
+          CHECK(written[wgiven].size == wbuffs[wgiven].size);
+          wgiven += given;
+        } while (wgiven != wtaken);
+
+        // Write more
+        for (size_t i = wtaken; i < WBUFFCOUNT; ++i) {
+          int taken = pn_raw_connection_write_buffers(p, &wbuffs[i], 1);
+          if (taken==0) break;
+          REQUIRE(pni_raw_validate(p));
+          CHECK(taken==1);
+          wtaken += taken;
+        }
+
+        do {
+          do {
+            pni_raw_write(p, fds[0], snd, set_write_error);
+            CHECK(write_err == 0);
+            REQUIRE(pni_raw_validate(p));
+          } while (pn_event_type(pni_raw_event_next(p)) != PN_RAW_CONNECTION_WRITTEN);
+        } while (pn_event_type(pni_raw_event_next(p)) != PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
+        wgiven += pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(wgiven==wtaken);
+
+        // At this point we've written every buffer
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == wbuff_count);
+
+        pni_raw_read(p, fds[1], rcv, set_read_error);
+        CHECK(read_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == 0);
+        int rgiven = pn_raw_connection_take_read_buffers(p, &read[0], read.size());
+        REQUIRE(pni_raw_validate(p));
+        CHECK(rgiven > 0);
+
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == rgiven);
+
+        // At this point we should have read everything - make sure it matches
+        char* start = message;
+        for (int i = 0; i < rgiven; ++i) {
+          CHECK(read[i].size > 0);
+          CHECK(std::string(read[i].bytes, read[i].size) == std::string(start, read[i].size));
+          start += read[i].size;
+        }
+        REQUIRE(start-message == sizeof(message));
+      }
+      freepair(fds);
+    }
+  }
+
+  SECTION("Write once per event loop") {
+    int wtaken = pn_raw_connection_write_buffers(p, wbuffs, WBUFFCOUNT);
+    REQUIRE(pni_raw_validate(p));
+    CHECK(wtaken==wbuff_count);
+
+    CHECK(pn_raw_connection_read_buffers_capacity(p) == 0);
+    CHECK(pn_raw_connection_write_buffers_capacity(p) == 0);
+
+    std::vector<pn_raw_buffer_t> read(rtaken);
+    std::vector<pn_raw_buffer_t> written(wtaken);
+
+    SECTION("Check no change in buffer use without read/write") {
+
+      int rgiven = pn_raw_connection_take_read_buffers(p, &read[0], rtaken);
+      REQUIRE(pni_raw_validate(p));
+      CHECK(rgiven==0);
+      int wgiven = pn_raw_connection_take_written_buffers(p, &written[0], wtaken);
+      REQUIRE(pni_raw_validate(p));
+      CHECK(wgiven==0);
+    }
+
+    SECTION("Simple tests using a looped back socketpair") {
+      int fds[2];
+      REQUIRE(makepair(fds) == 0);
+      pni_raw_connected(p);
+
+      // First event is always connected
+      REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_CONNECTED);
+      // Mo need buffers event as we already gave buffers
+      REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+
+      SECTION("Ensure nothing is read if nothing is written") {
+        pni_raw_read(p, fds[1], rcv, set_read_error);
+        CHECK(read_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == 0);
+        CHECK(pn_raw_connection_take_read_buffers(p, &read[0], read.size()) == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pni_raw_event_next(p) == NULL);
+
+        snd_stop(fds[0]);
+        pni_raw_read(p, fds[1], rcv, set_read_error);
+        CHECK(read_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(pn_raw_connection_is_read_closed(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_CLOSED_READ);
+        rcv_stop(fds[1]);
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == EPIPE);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(pn_raw_connection_is_write_closed(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_CLOSED_WRITE);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_DISCONNECTED);
+      }
+
+      SECTION("Read/Write interleaved") {
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == 0);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        int wgiven = pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(wgiven==wtaken);
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == wbuff_count);
+
+        pni_raw_read(p, fds[1], rcv, set_read_error);
+        CHECK(read_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == 0);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+        int rgiven = pn_raw_connection_take_read_buffers(p, &read[0], read.size());
+        REQUIRE(pni_raw_validate(p));
+        CHECK(rgiven > 0);
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == rgiven);
+
+        // Write more
+        wtaken += pn_raw_connection_write_buffers(p, &wbuffs[wtaken], WBUFFCOUNT-wtaken);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(wtaken==WBUFFCOUNT);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        wgiven += pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
+        CHECK(wgiven==wtaken);
+
+        // At this point we've written every buffer
+
+        pni_raw_read(p, fds[1], rcv, set_read_error);
+        CHECK(read_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+        int rgiven_before = rgiven;
+        rgiven += pn_raw_connection_take_read_buffers(p, &read[rgiven], read.size()-rgiven);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(rgiven > rgiven_before);
+
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == rgiven);
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == wbuff_count);
+
+        // At this point we should have read everything - make sure it matches
+        char* start = message;
+        for (int i = 0; i < rgiven; ++i) {
+          CHECK(read[i].size > 0);
+          CHECK(std::string(read[i].bytes, read[i].size) == std::string(start, read[i].size));
+          start += read[i].size;
+        }
+        REQUIRE(start-message == sizeof(message));
+      }
+
+      SECTION("Write then read") {
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == 0);
+        int wgiven = pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        CHECK(wgiven==wtaken);
+
+        // Write more
+        wtaken += pn_raw_connection_write_buffers(p, &wbuffs[wtaken], WBUFFCOUNT-wtaken);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(wtaken==WBUFFCOUNT);
+
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
+        wgiven += pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(wgiven==wtaken);
+
+        // At this point we've written every buffer
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == wbuff_count);
+
+        pni_raw_read(p, fds[1], rcv, set_read_error);
+        CHECK(read_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == 0);
+        int rgiven = pn_raw_connection_take_read_buffers(p, &read[0], read.size());
+        REQUIRE(pni_raw_validate(p));
+        CHECK(rgiven > 0);
+
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == rgiven);
+
+        // At this point we should have read everything - make sure it matches
+        char* start = message;
+        for (int i = 0; i < rgiven; ++i) {
+          CHECK(read[i].size > 0);
+          CHECK(std::string(read[i].bytes, read[i].size) == std::string(start, read[i].size));
+          start += read[i].size;
+        }
+        REQUIRE(start-message == sizeof(message));
+      }
+
+      SECTION("Write, close, then read") {
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == 0);
+        int wgiven = pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        CHECK(wgiven==wtaken);
+
+        // Write more
+        wtaken += pn_raw_connection_write_buffers(p, &wbuffs[wtaken], WBUFFCOUNT-wtaken);
+        REQUIRE(pni_raw_validate(p));
+        CHECK(wtaken==WBUFFCOUNT);
+
+        pni_raw_write(p, fds[0], snd, set_write_error);
+        CHECK(write_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_WRITTEN);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
+        wgiven += pn_raw_connection_take_written_buffers(p, &written[0], written.size());
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(wgiven==wtaken);
+
+        // At this point we've written every buffer
+        CHECK(pn_raw_connection_write_buffers_capacity(p) == wbuff_count);
+
+        snd_stop(fds[0]);
+        pni_raw_read(p, fds[1], rcv, set_read_error);
+        CHECK(read_err == 0);
+        REQUIRE(pni_raw_validate(p));
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_READ);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_RAW_CONNECTION_CLOSED_READ);
+        REQUIRE(pn_event_type(pni_raw_event_next(p)) == PN_EVENT_NONE);
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == 0);
+        int rgiven = pn_raw_connection_take_read_buffers(p, &read[0], read.size());
+        REQUIRE(pni_raw_validate(p));
+        CHECK(rgiven > 0);
+
+        CHECK(pn_raw_connection_read_buffers_capacity(p) == rgiven);
+        CHECK(read[rgiven-1].size == 0);
+
+        // At this point we should have read everything - make sure it matches
+        char* start = message;
+        for (int i = 0; i < rgiven-1; ++i) {
+          CHECK(read[i].size > 0);
+          CHECK(std::string(read[i].bytes, read[i].size) == std::string(start, read[i].size));
+          start += read[i].size;
+        }
+        REQUIRE(start-message == sizeof(message));
+      }
+
+      freepair(fds);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org