You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dr...@apache.org on 2010/10/06 19:10:28 UTC

svn commit: r1005152 - in /incubator/thrift/trunk/lib/cpp: src/transport/TZlibTransport.cpp src/transport/TZlibTransport.h test/TransportTest.cpp test/ZlibTest.cpp

Author: dreiss
Date: Wed Oct  6 17:10:27 2010
New Revision: 1005152

URL: http://svn.apache.org/viewvc?rev=1005152&view=rev
Log:
THRIFT-926. cpp: remove "standalone" distinction in TZlibTransport

Now that TZlibTransport::flush() behaves the same way as other
transports, there is no need to distinguish between RPC and standalone
behavior for TZlibTransport.

Modified:
    incubator/thrift/trunk/lib/cpp/src/transport/TZlibTransport.cpp
    incubator/thrift/trunk/lib/cpp/src/transport/TZlibTransport.h
    incubator/thrift/trunk/lib/cpp/test/TransportTest.cpp
    incubator/thrift/trunk/lib/cpp/test/ZlibTest.cpp

Modified: incubator/thrift/trunk/lib/cpp/src/transport/TZlibTransport.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/transport/TZlibTransport.cpp?rev=1005152&r1=1005151&r2=1005152&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/transport/TZlibTransport.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/src/transport/TZlibTransport.cpp Wed Oct  6 17:10:27 2010
@@ -161,26 +161,10 @@ uint32_t TZlibTransport::read(uint8_t* b
     rstream_->avail_out = urbuf_size_;
     urpos_ = 0;
 
-    // If we don't have any more compressed data available,
-    // read some from the underlying transport.
-    if (rstream_->avail_in == 0) {
-      uint32_t got = transport_->read(crbuf_, crbuf_size_);
-      if (got == 0) {
-        return len - need;
-      }
-      rstream_->next_in  = crbuf_;
-      rstream_->avail_in = got;
-    }
-
-    // We have some compressed data now.  Uncompress it.
-    int zlib_rv = inflate(rstream_, Z_SYNC_FLUSH);
-
-    if (zlib_rv == Z_STREAM_END) {
-      if (standalone_) {
-        input_ended_ = true;
-      }
-    } else {
-      checkZlibRv(zlib_rv, rstream_->msg);
+    // Call inflate() to uncompress some more data
+    if (!readFromZlib()) {
+      // no data available from underlying transport
+      return len - need;
     }
 
     // Okay.  The read buffer should have whatever we can give it now.
@@ -188,6 +172,32 @@ uint32_t TZlibTransport::read(uint8_t* b
   }
 }
 
+bool TZlibTransport::readFromZlib() {
+  assert(!input_ended_);
+
+  // If we don't have any more compressed data available,
+  // read some from the underlying transport.
+  if (rstream_->avail_in == 0) {
+    uint32_t got = transport_->read(crbuf_, crbuf_size_);
+    if (got == 0) {
+      return false;
+    }
+    rstream_->next_in  = crbuf_;
+    rstream_->avail_in = got;
+  }
+
+  // We have some compressed data now.  Uncompress it.
+  int zlib_rv = inflate(rstream_, Z_SYNC_FLUSH);
+
+  if (zlib_rv == Z_STREAM_END) {
+    input_ended_ = true;
+  } else {
+    checkZlibRv(zlib_rv, rstream_->msg);
+  }
+
+  return true;
+}
+
 
 // WRITING STRATEGY
 //
@@ -315,30 +325,60 @@ void TZlibTransport::consume(uint32_t le
 }
 
 void TZlibTransport::verifyChecksum() {
-  if (!standalone_) {
-    throw TTransportException(
-        TTransportException::BAD_ARGS,
-        "TZLibTransport can only verify checksums for standalone objects.");
+  // If zlib has already reported the end of the stream,
+  // it has verified the checksum.
+  if (input_ended_) {
+    return;
   }
 
-  if (!input_ended_) {
-    // This should only be called when reading is complete,
-    // but it's possible that the whole checksum has not been fed to zlib yet.
-    // We try to read an extra byte here to force zlib to finish the stream.
-    // It might not always be easy to "unread" this byte,
-    // but we throw an exception if we get it, which is not really
-    // a recoverable error, so it doesn't matter.
-    uint8_t buf[1];
-    uint32_t got = this->read(buf, sizeof(buf));
-    if (got || !input_ended_) {
-      throw TTransportException(
-          TTransportException::CORRUPTED_DATA,
-          "Zlib stream not complete.");
-    }
+  // This should only be called when reading is complete.
+  // If the caller still has unread data, throw an exception.
+  if (readAvail() > 0) {
+    throw TTransportException(
+        TTransportException::CORRUPTED_DATA,
+        "verifyChecksum() called before end of zlib stream");
   }
 
-  // If the checksum had been bad, we would have gotten an error while
-  // inflating.
+  // Reset the rstream fields, in case avail_out is 0.
+  // (Since readAvail() is 0, we know there is no unread data in urbuf_)
+  rstream_->next_out  = urbuf_;
+  rstream_->avail_out = urbuf_size_;
+  urpos_ = 0;
+
+  // Call inflate()
+  // This will throw an exception if the checksum is bad.
+  bool performed_inflate = readFromZlib();
+  if (!performed_inflate) {
+    // We needed to read from the underlying transport, and the read() call
+    // returned 0.
+    //
+    // Not all TTransport implementations behave the same way here, so we'll
+    // end up with different behavior depending on the underlying transport.
+    //
+    // For some transports (e.g., TFDTransport), read() blocks if no more data
+    // is available.  They only return 0 if EOF has been reached, or if the
+    // remote endpoint has closed the connection.  For those transports,
+    // verifyChecksum() will block until the checksum becomes available.
+    //
+    // Other transport types (e.g., TMemoryBuffer) always return 0 immediately
+    // if no more data is available.  For those transport types, verifyChecksum
+    // will raise the following exception if the checksum is not available from
+    // the underlying transport yet.
+    throw TTransportException(TTransportException::CORRUPTED_DATA,
+                              "checksum not available yet in "
+                              "verifyChecksum()");
+  }
+
+  // If input_ended_ is true now, the checksum has been verified
+  if (input_ended_) {
+    return;
+  }
+
+  // The caller invoked us before the actual end of the data stream
+  assert(rstream_->avail_out < urbuf_size_);
+  throw TTransportException(TTransportException::CORRUPTED_DATA,
+                            "verifyChecksum() called before end of "
+                            "zlib stream");
 }
 
 

Modified: incubator/thrift/trunk/lib/cpp/src/transport/TZlibTransport.h
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/transport/TZlibTransport.h?rev=1005152&r1=1005151&r2=1005152&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/transport/TZlibTransport.h (original)
+++ incubator/thrift/trunk/lib/cpp/src/transport/TZlibTransport.h Wed Oct  6 17:10:27 2010
@@ -76,8 +76,6 @@ class TZlibTransport : public TVirtualTr
   /**
    * @param transport    The transport to read compressed data from
    *                     and write compressed data to.
-   * @param use_for_rpc  True if this object will be used for RPC,
-   *                     false if this is a standalone object.
    * @param urbuf_size   Uncompressed buffer size for reading.
    * @param crbuf_size   Compressed buffer size for reading.
    * @param uwbuf_size   Uncompressed buffer size for writing.
@@ -86,13 +84,11 @@ class TZlibTransport : public TVirtualTr
    * TODO(dreiss): Write a constructor that isn't a pain.
    */
   TZlibTransport(boost::shared_ptr<TTransport> transport,
-                 bool use_for_rpc,
                  int urbuf_size = DEFAULT_URBUF_SIZE,
                  int crbuf_size = DEFAULT_CRBUF_SIZE,
                  int uwbuf_size = DEFAULT_UWBUF_SIZE,
                  int cwbuf_size = DEFAULT_CWBUF_SIZE) :
     transport_(transport),
-    standalone_(!use_for_rpc),
     urpos_(0),
     uwpos_(0),
     input_ended_(false),
@@ -108,13 +104,6 @@ class TZlibTransport : public TVirtualTr
     rstream_(NULL),
     wstream_(NULL)
   {
-
-    if (!standalone_) {
-      throw TTransportException(
-          TTransportException::BAD_ARGS,
-          "TZLibTransport has not been tested for RPC.");
-    }
-
     if (uwbuf_size_ < MIN_DIRECT_DEFLATE_SIZE) {
       // Have to copy this into a local because of a linking issue.
       int minimum = MIN_DIRECT_DEFLATE_SIZE;
@@ -206,22 +195,47 @@ class TZlibTransport : public TVirtualTr
   inline int readAvail();
   void flushToTransport(int flush);
   void flushToZlib(const uint8_t* buf, int len, int flush);
+  bool readFromZlib();
 
+ private:
+  // Deprecated constructor signature.
+  //
+  // This used to be the constructor signature.  If you are getting a compile
+  // error because you are trying to use this constructor, you need to update
+  // your code as follows:
+  // - Remove the use_for_rpc argument in the constructur.
+  //   There is no longer any distinction between RPC and standalone zlib
+  //   transports.  (Previously, only standalone was allowed, anyway.)
+  // - Replace TZlibTransport::flush() calls with TZlibTransport::finish()
+  //   in your code.  Previously, flush() used to finish the zlib stream.
+  //   Now flush() only flushes out pending data, so more writes can be
+  //   performed after a flush().  The finish() method can be used to finalize
+  //   the zlib stream.
+  //
+  // If we don't declare this constructor, old code written as
+  // TZlibTransport(trans, false) still compiles but behaves incorrectly.
+  // The second bool argument is converted to an integer and used as the
+  // urbuf_size.
+  TZlibTransport(boost::shared_ptr<TTransport> transport,
+                 bool use_for_rpc,
+                 int urbuf_size = DEFAULT_URBUF_SIZE,
+                 int crbuf_size = DEFAULT_CRBUF_SIZE,
+                 int uwbuf_size = DEFAULT_UWBUF_SIZE,
+                 int cwbuf_size = DEFAULT_CWBUF_SIZE);
+
+ protected:
   // Writes smaller than this are buffered up.
   // Larger (or equal) writes are dumped straight to zlib.
   static const int MIN_DIRECT_DEFLATE_SIZE = 32;
 
   boost::shared_ptr<TTransport> transport_;
-  bool standalone_;
 
   int urpos_;
   int uwpos_;
 
-  /// True iff zlib has reached the end of a stream.
-  /// This is only ever true in standalone protcol objects.
+  /// True iff zlib has reached the end of the input stream.
   bool input_ended_;
   /// True iff we have finished the output stream.
-  /// This is only ever true in standalone protcol objects.
   bool output_finished_;
 
   int urbuf_size_;

Modified: incubator/thrift/trunk/lib/cpp/test/TransportTest.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/test/TransportTest.cpp?rev=1005152&r1=1005151&r2=1005152&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/test/TransportTest.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/test/TransportTest.cpp Wed Oct  6 17:10:27 2010
@@ -183,8 +183,8 @@ class CoupledZlibTransports : public Cou
  public:
   CoupledZlibTransports() :
     buf(new TMemoryBuffer) {
-    in = new TZlibTransport(buf, false);
-    out = new TZlibTransport(buf, false);
+    in = new TZlibTransport(buf);
+    out = new TZlibTransport(buf);
   }
 
   ~CoupledZlibTransports() {

Modified: incubator/thrift/trunk/lib/cpp/test/ZlibTest.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/test/ZlibTest.cpp?rev=1005152&r1=1005151&r2=1005152&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/test/ZlibTest.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/test/ZlibTest.cpp Wed Oct  6 17:10:27 2010
@@ -143,7 +143,7 @@ uint8_t* gen_random_buffer(uint32_t buf_
 
 void test_write_then_read(const uint8_t* buf, uint32_t buf_len) {
   shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
-  shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
+  shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf));
   zlib_trans->write(buf, buf_len);
   zlib_trans->finish();
 
@@ -162,12 +162,12 @@ void test_separate_checksum(const uint8_
   // the stream was not complete.  I'm about to go fix that.
   // It worked.  Awesome.
   shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
-  shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
+  shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf));
   zlib_trans->write(buf, buf_len);
   zlib_trans->finish();
   string tmp_buf;
   membuf->appendBufferToString(tmp_buf);
-  zlib_trans.reset(new TZlibTransport(membuf, false,
+  zlib_trans.reset(new TZlibTransport(membuf,
                                       TZlibTransport::DEFAULT_URBUF_SIZE,
                                       tmp_buf.length()-1));
 
@@ -182,7 +182,7 @@ void test_incomplete_checksum(const uint
   // Make sure we still get that "not complete" error if
   // it really isn't complete.
   shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
-  shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
+  shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf));
   zlib_trans->write(buf, buf_len);
   zlib_trans->finish();
   string tmp_buf;
@@ -209,7 +209,7 @@ void test_read_write_mix(const uint8_t* 
                          const shared_ptr<SizeGenerator>& read_gen) {
   // Try it with a mix of read/write sizes.
   shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
-  shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
+  shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf));
   unsigned int tot;
 
   tot = 0;
@@ -244,7 +244,7 @@ void test_read_write_mix(const uint8_t* 
 void test_invalid_checksum(const uint8_t* buf, uint32_t buf_len) {
   // Verify checksum checking.
   shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
-  shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
+  shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf));
   zlib_trans->write(buf, buf_len);
   zlib_trans->finish();
   string tmp_buf;
@@ -282,7 +282,7 @@ void test_invalid_checksum(const uint8_t
 void test_write_after_flush(const uint8_t* buf, uint32_t buf_len) {
   // write some data
   shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
-  shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
+  shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf));
   zlib_trans->write(buf, buf_len);
 
   // call finish()
@@ -321,7 +321,7 @@ void test_no_write() {
   {
     // Create a TZlibTransport object, and immediately destroy it
     // when it goes out of scope.
-    TZlibTransport w_zlib_trans(membuf, false);
+    TZlibTransport w_zlib_trans(membuf);
   }
 
   BOOST_CHECK_EQUAL(membuf->available_read(), 0);