You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dr...@apache.org on 2010/10/06 19:10:41 UTC

svn commit: r1005161 - in /incubator/thrift/trunk/lib/cpp: src/transport/TBufferTransports.cpp src/transport/TBufferTransports.h src/transport/TFDTransport.cpp src/transport/TZlibTransport.cpp test/TransportTest.cpp test/ZlibTest.cpp

Author: dreiss
Date: Wed Oct  6 17:10:40 2010
New Revision: 1005161

URL: http://svn.apache.org/viewvc?rev=1005161&view=rev
Log:
THRIFT-926. cpp: Fix inconsistencies in transport read() behavior

- TBufferedTransport::borrow() could block if not enough data was
  available.  Now it returns NULL immediately in this case, like all
  other transports.

- TBufferedTransport::read() could block some data was available in the
  readahead buffer, but not enough to satisfy the request.  It would
  attempt to call read() on the underlying transport, but this might
  block.  Now it just returns the remaining data in the readahead
  buffer.  The caller is responsible for calling read() again to get the
  rest of the data they want.

- TFrameTransport::read() threw an exception if read() on the underlying
  transport returned 0 when looking for a frame header.  Now
  TFrameTransport::read() returns 0, too.  (It still throws an exception
  if the underlying transport returns 0 after a partial frame or frame
  header has been read.)

- TFDTransport::read() threw an exception on EINTR.  Now it retries up
  to 5 times, similarly to the way TSocket::read() behaves.

- TZlibTransport::read() could block when less data than was requested
  is available.  Now it only calls read() on the underlying transport
  when it would otherwise have nothing to return.

  This does mean that TZlibTransport::read() now often returns less data
  than is actually available at the time.  This required updating
  several of the ZlibTest tests to use readAll() instead of read(),
  since they previously assumed read() would return all available data.

Modified:
    incubator/thrift/trunk/lib/cpp/src/transport/TBufferTransports.cpp
    incubator/thrift/trunk/lib/cpp/src/transport/TBufferTransports.h
    incubator/thrift/trunk/lib/cpp/src/transport/TFDTransport.cpp
    incubator/thrift/trunk/lib/cpp/src/transport/TZlibTransport.cpp
    incubator/thrift/trunk/lib/cpp/test/TransportTest.cpp
    incubator/thrift/trunk/lib/cpp/test/ZlibTest.cpp

Modified: incubator/thrift/trunk/lib/cpp/src/transport/TBufferTransports.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/transport/TBufferTransports.cpp?rev=1005161&r1=1005160&r2=1005161&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/transport/TBufferTransports.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/src/transport/TBufferTransports.cpp Wed Oct  6 17:10:40 2010
@@ -28,19 +28,23 @@ namespace apache { namespace thrift { na
 
 
 uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) {
-  uint32_t want = len;
   uint32_t have = rBound_ - rBase_;
 
   // We should only take the slow path if we can't satisfy the read
   // with the data already in the buffer.
-  assert(have < want);
+  assert(have < len);
 
-  // Copy out whatever we have.
+  // If we have some date in the buffer, copy it out and return it.
+  // We have to return it without attempting to read more, since we aren't
+  // guaranteed that the underlying transport actually has more data, so
+  // attempting to read from it could block.
   if (have > 0) {
     memcpy(buf, rBase_, have);
-    want -= have;
-    buf += have;
+    setReadBuffer(rBuf_.get(), 0);
+    return have;
   }
+
+  // No data is available in our buffer.
   // Get more from underlying transport up to buffer size.
   // Note that this makes a lot of sense if len < rBufSize_
   // and almost no sense otherwise.  TODO(dreiss): Fix that
@@ -48,12 +52,11 @@ uint32_t TBufferedTransport::readSlow(ui
   setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
 
   // Hand over whatever we have.
-  uint32_t give = std::min(want, static_cast<uint32_t>(rBound_ - rBase_));
+  uint32_t give = std::min(len, static_cast<uint32_t>(rBound_ - rBase_));
   memcpy(buf, rBase_, give);
   rBase_ += give;
-  want -= give;
 
-  return (len - want);
+  return give;
 }
 
 void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
@@ -106,43 +109,9 @@ void TBufferedTransport::writeSlow(const
 }
 
 const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
-  // If the request is bigger than our buffer, we are hosed.
-  if (*len > rBufSize_) {
-    return NULL;
-  }
-
-  // The number of bytes of data we have already.
-  uint32_t have = rBound_ - rBase_;
-  // The number of additional bytes we need from the underlying transport.
-  int32_t need = *len - have;
-  // The space from the start of the buffer to the end of our data.
-  uint32_t offset = rBound_ - rBuf_.get();
-  assert(need > 0);
-
-  // If we have less than half our buffer space available, shift the data
-  // we have down to the start.  If the borrow is big compared to our buffer,
-  // this could be kind of a waste, but if the borrow is small, it frees up
-  // space at the end of our buffer to do a bigger single read from the
-  // underlying transport.  Also, if our needs extend past the end of the
-  // buffer, we have to do a copy no matter what.
-  if ((offset > rBufSize_/2) || (offset + need > rBufSize_)) {
-    memmove(rBuf_.get(), rBase_, have);
-    setReadBuffer(rBuf_.get(), have);
-    offset = have;
-  }
-
-  // First try to fill up the buffer.
-  uint32_t got = transport_->read(rBound_, rBufSize_ - offset);
-  rBound_ += got;
-  need -= got;
-
-  // If that fails, readAll until we get what we need.
-  if (need > 0) {
-    rBound_ += transport_->readAll(rBound_, need);
-  }
-
-  *len = rBound_ - rBase_;
-  return rBase_;
+  // Simply return NULL.  We don't know if there is actually data available on
+  // the underlying transport, so calling read() might block.
+  return NULL;
 }
 
 void TBufferedTransport::flush()  {
@@ -177,7 +146,10 @@ uint32_t TFramedTransport::readSlow(uint
   }
 
   // Read another frame.
-  readFrame();
+  if (!readFrame()) {
+    // EOF.  No frame available.
+    return 0;
+  }
 
   // TODO(dreiss): Should we warn when reads cross frames?
 
@@ -190,13 +162,33 @@ uint32_t TFramedTransport::readSlow(uint
   return (len - want);
 }
 
-void TFramedTransport::readFrame() {
+bool TFramedTransport::readFrame() {
   // TODO(dreiss): Think about using readv here, even though it would
   // result in (gasp) read-ahead.
 
   // Read the size of the next frame.
+  // We can't use readAll(&sz, sizeof(sz)), since that always throws an
+  // exception on EOF.  We want to throw an exception only if EOF occurs after
+  // partial size data.
   int32_t sz;
-  transport_->readAll((uint8_t*)&sz, sizeof(sz));
+  uint32_t size_bytes_read = 0;
+  while (size_bytes_read < sizeof(sz)) {
+    uint8_t* szp = reinterpret_cast<uint8_t*>(&sz) + size_bytes_read;
+    uint32_t bytes_read = transport_->read(szp, sizeof(sz) - size_bytes_read);
+    if (bytes_read == 0) {
+      if (size_bytes_read == 0) {
+        // EOF before any data was read.
+        return false;
+      } else {
+        // EOF after a partial frame header.  Raise an exception.
+        throw TTransportException(TTransportException::END_OF_FILE,
+                                  "No more data to read after "
+                                  "partial frame header.");
+      }
+    }
+    size_bytes_read += bytes_read;
+  }
+
   sz = ntohl(sz);
 
   if (sz < 0) {
@@ -210,6 +202,7 @@ void TFramedTransport::readFrame() {
   }
   transport_->readAll(rBuf_.get(), sz);
   setReadBuffer(rBuf_.get(), sz);
+  return true;
 }
 
 void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len) {

Modified: incubator/thrift/trunk/lib/cpp/src/transport/TBufferTransports.h
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/transport/TBufferTransports.h?rev=1005161&r1=1005160&r2=1005161&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/transport/TBufferTransports.h (original)
+++ incubator/thrift/trunk/lib/cpp/src/transport/TBufferTransports.h Wed Oct  6 17:10:40 2010
@@ -389,8 +389,11 @@ class TFramedTransport
  protected:
   /**
    * Reads a frame of input from the underlying stream.
+   *
+   * Returns true if a frame was read successfully, or false on EOF.
+   * (Raises a TTransportException if EOF occurs after a partial frame.)
    */
-  void readFrame();
+  bool readFrame();
 
   void initPointers() {
     setReadBuffer(NULL, 0);

Modified: incubator/thrift/trunk/lib/cpp/src/transport/TFDTransport.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/transport/TFDTransport.cpp?rev=1005161&r1=1005160&r2=1005161&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/transport/TFDTransport.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/src/transport/TFDTransport.cpp Wed Oct  6 17:10:40 2010
@@ -45,14 +45,23 @@ void TFDTransport::close() {
 }
 
 uint32_t TFDTransport::read(uint8_t* buf, uint32_t len) {
-  ssize_t rv = ::read(fd_, buf, len);
-  if (rv < 0) {
-    int errno_copy = errno;
-    throw TTransportException(TTransportException::UNKNOWN,
-                              "TFDTransport::read()",
-                              errno_copy);
+  unsigned int maxRetries = 5; // same as the TSocket default
+  unsigned int retries = 0;
+  while (true) {
+    ssize_t rv = ::read(fd_, buf, len);
+    if (rv < 0) {
+      if (errno == EINTR && retries < maxRetries) {
+        // If interrupted, try again
+        ++retries;
+        continue;
+      }
+      int errno_copy = errno;
+      throw TTransportException(TTransportException::UNKNOWN,
+                                "TFDTransport::read()",
+                                errno_copy);
+    }
+    return rv;
   }
-  return rv;
 }
 
 void TFDTransport::write(const uint8_t* buf, uint32_t len) {

Modified: incubator/thrift/trunk/lib/cpp/src/transport/TZlibTransport.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/transport/TZlibTransport.cpp?rev=1005161&r1=1005160&r2=1005161&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/transport/TZlibTransport.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/src/transport/TZlibTransport.cpp Wed Oct  6 17:10:40 2010
@@ -149,6 +149,14 @@ uint32_t TZlibTransport::read(uint8_t* b
       return len;
     }
 
+    // If we will need to read from the underlying transport to get more data,
+    // but we already have some data available, return it now.  Reading from
+    // the underlying transport may block, and read() is only allowed to block
+    // when no data is available.
+    if (need < len && rstream_->avail_in == 0) {
+      return len - need;
+    }
+
     // If we get to this point, we need to get some more data.
 
     // If zlib has reported the end of a stream, we can't really do any more.

Modified: incubator/thrift/trunk/lib/cpp/test/TransportTest.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/test/TransportTest.cpp?rev=1005161&r1=1005160&r2=1005161&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/test/TransportTest.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/test/TransportTest.cpp Wed Oct  6 17:10:40 2010
@@ -581,6 +581,64 @@ void test_read_part_available() {
 }
 
 template <class CoupledTransports>
+void test_read_partial_midframe() {
+  CoupledTransports transports;
+  BOOST_REQUIRE(transports.in != NULL);
+  BOOST_REQUIRE(transports.out != NULL);
+
+  uint8_t write_buf[16];
+  uint8_t read_buf[16];
+  memset(write_buf, 'a', sizeof(write_buf));
+
+  // Attempt to read 10 bytes, when only 9 are available, but after we have
+  // already read part of the data that is available.  This exercises a
+  // different code path for several of the transports.
+  //
+  // For transports that add their own framing (e.g., TFramedTransport and
+  // TFileTransport), the two flush calls break up the data in to a 10 byte
+  // frame and a 3 byte frame.  The first read then puts us partway through the
+  // first frame, and then we attempt to read past the end of that frame, and
+  // through the next frame, too.
+  //
+  // For buffered transports that perform read-ahead (e.g.,
+  // TBufferedTransport), the read-ahead will most likely see all 13 bytes
+  // written on the first read.  The next read will then attempt to read past
+  // the end of the read-ahead buffer.
+  //
+  // Flush 10 bytes, then 3 bytes.  This creates 2 separate frames for
+  // transports that track framing internally.
+  transports.out->write(write_buf, 10);
+  transports.out->flush();
+  transports.out->write(write_buf, 3);
+  transports.out->flush();
+
+  // Now read 4 bytes, so that we are partway through the written data.
+  uint32_t bytes_read = transports.in->read(read_buf, 4);
+  BOOST_CHECK_EQUAL(bytes_read, 4);
+
+  // Now attempt to read 10 bytes.  Only 9 more are available.
+  //
+  // We should be able to get all 9 bytes, but it might take multiple read
+  // calls, since it is valid for read() to return fewer bytes than requested.
+  // (Most transports do immediately return 9 bytes, but the framing transports
+  // tend to only return to the end of the current frame, which is 6 bytes in
+  // this case.)
+  uint32_t total_read = 0;
+  while (total_read < 9) {
+    set_trigger(3, transports.out, 1);
+    bytes_read = transports.in->read(read_buf, 10);
+    BOOST_REQUIRE_EQUAL(numTriggersFired, 0);
+    BOOST_REQUIRE_GT(bytes_read, 0);
+    total_read += bytes_read;
+    BOOST_REQUIRE_LE(total_read, 9);
+  }
+
+  BOOST_CHECK_EQUAL(total_read, 9);
+
+  clear_triggers();
+}
+
+template <class CoupledTransports>
 void test_borrow_part_available() {
   CoupledTransports transports;
   BOOST_REQUIRE(transports.in != NULL);
@@ -851,6 +909,12 @@ class TransportTestGen {
           test_read_part_available<CoupledTransports>, name);
     suite_->add(tc, expectedFailures);
 
+    snprintf(name, sizeof(name), "%s::test_read_partial_midframe()",
+             transportName);
+    tc = boost::unit_test::make_test_case(
+          test_read_partial_midframe<CoupledTransports>, name);
+    suite_->add(tc, expectedFailures);
+
     snprintf(name, sizeof(name), "%s::test_read_none_available()",
              transportName);
     tc = boost::unit_test::make_test_case(

Modified: incubator/thrift/trunk/lib/cpp/test/ZlibTest.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/test/ZlibTest.cpp?rev=1005161&r1=1005160&r2=1005161&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/test/ZlibTest.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/test/ZlibTest.cpp Wed Oct  6 17:10:40 2010
@@ -148,7 +148,7 @@ void test_write_then_read(const uint8_t*
   zlib_trans->finish();
 
   boost::shared_array<uint8_t> mirror(new uint8_t[buf_len]);
-  uint32_t got = zlib_trans->read(mirror.get(), buf_len);
+  uint32_t got = zlib_trans->readAll(mirror.get(), buf_len);
   BOOST_REQUIRE_EQUAL(got, buf_len);
   BOOST_CHECK_EQUAL(memcmp(mirror.get(), buf, buf_len), 0);
   zlib_trans->verifyChecksum();
@@ -172,7 +172,7 @@ void test_separate_checksum(const uint8_
                                       tmp_buf.length()-1));
 
   boost::shared_array<uint8_t> mirror(new uint8_t[buf_len]);
-  uint32_t got = zlib_trans->read(mirror.get(), buf_len);
+  uint32_t got = zlib_trans->readAll(mirror.get(), buf_len);
   BOOST_REQUIRE_EQUAL(got, buf_len);
   BOOST_CHECK_EQUAL(memcmp(mirror.get(), buf, buf_len), 0);
   zlib_trans->verifyChecksum();
@@ -193,7 +193,7 @@ void test_incomplete_checksum(const uint
                       tmp_buf.length());
 
   boost::shared_array<uint8_t> mirror(new uint8_t[buf_len]);
-  uint32_t got = zlib_trans->read(mirror.get(), buf_len);
+  uint32_t got = zlib_trans->readAll(mirror.get(), buf_len);
   BOOST_REQUIRE_EQUAL(got, buf_len);
   BOOST_CHECK_EQUAL(memcmp(mirror.get(), buf, buf_len), 0);
   try {
@@ -233,7 +233,8 @@ void test_read_write_mix(const uint8_t* 
       expected_read_len = buf_len - tot;
     }
     uint32_t got = zlib_trans->read(mirror.get() + tot, read_len);
-    BOOST_REQUIRE_EQUAL(got, expected_read_len);
+    BOOST_REQUIRE_LE(got, expected_read_len);
+    BOOST_REQUIRE_NE(got, 0);
     tot += got;
   }
 
@@ -271,7 +272,7 @@ void test_invalid_checksum(const uint8_t
 
   boost::shared_array<uint8_t> mirror(new uint8_t[buf_len]);
   try {
-    zlib_trans->read(mirror.get(), buf_len);
+    zlib_trans->readAll(mirror.get(), buf_len);
     zlib_trans->verifyChecksum();
     BOOST_ERROR("verifyChecksum() did not report an error");
   } catch (TZlibTransportException& ex) {