You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dr...@apache.org on 2010/10/06 19:10:33 UTC

svn commit: r1005156 - in /incubator/thrift/trunk/lib/cpp: src/transport/TFileTransport.cpp src/transport/TFileTransport.h test/TFileTransportTest.cpp test/TransportTest.cpp

Author: dreiss
Date: Wed Oct  6 17:10:33 2010
New Revision: 1005156

URL: http://svn.apache.org/viewvc?rev=1005156&view=rev
Log:
THRIFT-926. cpp: Fix bugs in TFileTransport::flush()

Previously flush() had race conditions that could cause it to return
before all data had actually been flushed to disk.  Now the writer
makes sure both buffer queues have been flushed when forceFlush_ is set.

Also, flush() did not wake up the writer thread, so it normally had to
wait for the writer thread to wake up on its own time.  (By default,
this could take up to 3 seconds.)

Modified:
    incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.cpp
    incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.h
    incubator/thrift/trunk/lib/cpp/test/TFileTransportTest.cpp
    incubator/thrift/trunk/lib/cpp/test/TransportTest.cpp

Modified: incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.cpp?rev=1005156&r1=1005155&r2=1005156&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.cpp Wed Oct  6 17:10:33 2010
@@ -204,10 +204,10 @@ void TFileTransport::write(const uint8_t
     throw TTransportException("TFileTransport: attempting to write to file opened readonly");
   }
 
-  enqueueEvent(buf, len, false);
+  enqueueEvent(buf, len);
 }
 
-void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
+void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
   // can't enqueue more events if file is going to close
   if (closing_) {
     return;
@@ -249,6 +249,11 @@ void TFileTransport::enqueueEvent(const 
     pthread_cond_wait(&notFull_, &mutex_);
   }
 
+  // We shouldn't be trying to enqueue new data while a forced flush is
+  // requested.  (Otherwise the writer thread might not ever be able to finish
+  // the flush if more data keeps being enqueued.)
+  assert(!forceFlush_);
+
   // add to the buffer
   if (!enqueueBuffer_->addEvent(toEnqueue)) {
     delete toEnqueue;
@@ -259,10 +264,6 @@ void TFileTransport::enqueueEvent(const 
   // signal anybody who's waiting for the buffer to be non-empty
   pthread_cond_signal(&notEmpty_);
 
-  if (blockUntilFlush) {
-    pthread_cond_wait(&flushed_, &mutex_);
-  }
-
   // this really should be a loop where it makes sure it got flushed
   // because condition variables can get triggered by the os for no reason
   // it is probably a non-factor for the time being
@@ -449,28 +450,67 @@ void TFileTransport::writerThread() {
       continue;
     }
 
-    bool flushTimeElapsed = false;
-    struct timespec current_time;
-    clock_gettime(CLOCK_REALTIME, &current_time);
-
-    if (current_time.tv_sec > ts_next_flush.tv_sec ||
-        (current_time.tv_sec == ts_next_flush.tv_sec && current_time.tv_nsec > ts_next_flush.tv_nsec)) {
-      flushTimeElapsed = true;
-      getNextFlushTime(&ts_next_flush);
+    // Local variable to cache the state of forceFlush_.
+    //
+    // We only want to check the value of forceFlush_ once each time around the
+    // loop.  If we check it more than once without holding the lock the entire
+    // time, it could have changed state in between.  This will result in us
+    // making inconsistent decisions.
+    bool forced_flush = false;
+    pthread_mutex_lock(&mutex_);
+    if (forceFlush_) {
+      if (!enqueueBuffer_->isEmpty()) {
+        // If forceFlush_ is true, we need to flush all available data.
+        // If enqueueBuffer_ is not empty, go back to the start of the loop to
+        // write it out.
+        //
+        // We know the main thread is waiting on forceFlush_ to be cleared,
+        // so no new events will be added to enqueueBuffer_ until we clear
+        // forceFlush_.  Therefore the next time around the loop enqueueBuffer_
+        // is guaranteed to be empty.  (I.e., we're guaranteed to make progress
+        // and clear forceFlush_ the next time around the loop.)
+        pthread_mutex_unlock(&mutex_);
+        continue;
+      }
+      forced_flush = true;
     }
+    pthread_mutex_unlock(&mutex_);
 
-    // couple of cases from which a flush could be triggered
-    if ((flushTimeElapsed && unflushed > 0) ||
-       unflushed > flushMaxBytes_ ||
-       forceFlush_) {
+    // determine if we need to perform an fsync
+    bool flush = false;
+    if (forced_flush || unflushed > flushMaxBytes_) {
+      flush = true;
+    } else {
+      struct timespec current_time;
+      clock_gettime(CLOCK_REALTIME, &current_time);
+      if (current_time.tv_sec > ts_next_flush.tv_sec ||
+          (current_time.tv_sec == ts_next_flush.tv_sec &&
+           current_time.tv_nsec > ts_next_flush.tv_nsec)) {
+        if (unflushed > 0) {
+          flush = true;
+        } else {
+          // If there is no new data since the last fsync,
+          // don't perform the fsync, but do reset the timer.
+          getNextFlushTime(&ts_next_flush);
+        }
+      }
+    }
 
+    if (flush) {
       // sync (force flush) file to disk
       fsync(fd_);
       unflushed = 0;
+      getNextFlushTime(&ts_next_flush);
 
       // notify anybody waiting for flush completion
-      forceFlush_ = false;
-      pthread_cond_broadcast(&flushed_);
+      if (forced_flush) {
+        pthread_mutex_lock(&mutex_);
+        forceFlush_ = false;
+        assert(enqueueBuffer_->isEmpty());
+        assert(dequeueBuffer_->isEmpty());
+        pthread_cond_broadcast(&flushed_);
+        pthread_mutex_unlock(&mutex_);
+      }
     }
   }
 }
@@ -483,7 +523,10 @@ void TFileTransport::flush() {
   // wait for flush to take place
   pthread_mutex_lock(&mutex_);
 
+  // Indicate that we are requesting a flush
   forceFlush_ = true;
+  // Wake up the writer thread so it will perform the flush immediately
+  pthread_cond_signal(&notEmpty_);
 
   while (forceFlush_) {
     pthread_cond_wait(&flushed_, &mutex_);

Modified: incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.h
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.h?rev=1005156&r1=1005155&r2=1005156&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.h (original)
+++ incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.h Wed Oct  6 17:10:33 2010
@@ -290,7 +290,7 @@ class TFileTransport : public TFileReade
 
  private:
   // helper functions for writing to a file
-  void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
+  void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
   bool swapEventBuffers(struct timespec* deadline);
   bool initBufferAndWriteThread();
 

Modified: incubator/thrift/trunk/lib/cpp/test/TFileTransportTest.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/test/TFileTransportTest.cpp?rev=1005156&r1=1005155&r2=1005156&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/test/TFileTransportTest.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/test/TFileTransportTest.cpp Wed Oct  6 17:10:33 2010
@@ -45,14 +45,15 @@ FsyncLog* fsync_log;
 // Provide BOOST_CHECK_LT() and BOOST_CHECK_GT(), in case we're compiled
 // with an older version of boost
 #ifndef BOOST_CHECK_LT
-#define BOOST_CHECK_CMP(a, b, op) \
-  BOOST_CHECK_MESSAGE((a) op (b), \
-                      "check " BOOST_STRINGIZE(a) " " BOOST_STRINGIZE(op) " " \
-                      BOOST_STRINGIZE(b) " failed: " BOOST_STRINGIZE(a) "=" <<\
-                      (a) << " " BOOST_STRINGIZE(b) "=" << (b))
-
-#define BOOST_CHECK_LT(a, b) BOOST_CHECK_CMP(a, b, <)
-#define BOOST_CHECK_GT(a, b) BOOST_CHECK_CMP(a, b, >)
+#define BOOST_CHECK_CMP(a, b, op, check_fn) \
+  check_fn((a) op (b), \
+           "check " BOOST_STRINGIZE(a) " " BOOST_STRINGIZE(op) " " \
+           BOOST_STRINGIZE(b) " failed: " BOOST_STRINGIZE(a) "=" << (a) << \
+           " " BOOST_STRINGIZE(b) "=" << (b))
+
+#define BOOST_CHECK_LT(a, b) BOOST_CHECK_CMP(a, b, <, BOOST_CHECK_MESSAGE)
+#define BOOST_CHECK_GT(a, b) BOOST_CHECK_CMP(a, b, >, BOOST_CHECK_MESSAGE)
+#define BOOST_REQUIRE_LT(a, b) BOOST_CHECK_CMP(a, b, <, BOOST_REQUIRE_MESSAGE)
 #endif // BOOST_CHECK_LT
 
 /**
@@ -144,7 +145,6 @@ int fsync(int fd) {
   return 0;
 }
 
-
 int time_diff(const struct timeval* t1, const struct timeval* t2) {
   return (t2->tv_usec - t1->tv_usec) + (t2->tv_sec - t1->tv_sec) * 1000000;
 }
@@ -301,6 +301,39 @@ void test_flush_max_us3() {
   test_flush_max_us_impl(400000, 300000, 1000000);
 }
 
+/**
+ * Make sure flush() is fast when there is nothing to do.
+ *
+ * TFileTransport used to have a bug where flush() would wait for the fsync
+ * timeout to expire.
+ */
+void test_noop_flush() {
+  TempFile f(tmp_dir, "thrift.TFileTransportTest.");
+  TFileTransport transport(f.getPath());
+
+  // Write something to start the writer thread.
+  uint8_t buf[] = "a";
+  transport.write(buf, 1);
+
+  struct timeval start;
+  gettimeofday(&start, NULL);
+
+  for (unsigned int n = 0; n < 10; ++n) {
+    transport.flush();
+
+    struct timeval now;
+    gettimeofday(&now, NULL);
+
+    // Fail if at any point we've been running for longer than half a second.
+    // (With the buggy code, TFileTransport used to take 3 seconds per flush())
+    //
+    // Use a fatal fail so we break out early, rather than continuing to make
+    // many more slow flush() calls.
+    int delta = time_diff(&start, &now);
+    BOOST_REQUIRE_LT(delta, 500000);
+  }
+}
+
 /**************************************************************************
  * General Initialization
  **************************************************************************/
@@ -358,6 +391,7 @@ boost::unit_test::test_suite* init_unit_
   suite->add(BOOST_TEST_CASE(test_flush_max_us1));
   suite->add(BOOST_TEST_CASE(test_flush_max_us2));
   suite->add(BOOST_TEST_CASE(test_flush_max_us3));
+  suite->add(BOOST_TEST_CASE(test_noop_flush));
 
   return suite;
 }

Modified: incubator/thrift/trunk/lib/cpp/test/TransportTest.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/test/TransportTest.cpp?rev=1005156&r1=1005155&r2=1005156&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/test/TransportTest.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/test/TransportTest.cpp Wed Oct  6 17:10:33 2010
@@ -450,13 +450,13 @@ void test_rw(uint32_t totalSize,
   TEST_RW_BUF(CoupledTransports, 1024*1024*30, 0, 0); \
   TEST_RW_BUF(CoupledTransports, 1024*1024*10, rand4k, rand4k); \
   TEST_RW_BUF(CoupledTransports, 1024*1024*10, 167, 163); \
-  TEST_RW_BUF(CoupledTransports, 1024*1024, 1, 1); \
+  TEST_RW_BUF(CoupledTransports, 1024*512, 1, 1); \
   \
   TEST_RW_BUF(CoupledTransports, 1024*1024*10, 0, 0, rand4k, rand4k); \
   TEST_RW_BUF(CoupledTransports, 1024*1024*10, \
               rand4k, rand4k, rand4k, rand4k); \
   TEST_RW_BUF(CoupledTransports, 1024*1024*10, 167, 163, rand4k, rand4k); \
-  TEST_RW_BUF(CoupledTransports, 1024*1024*2, 1, 1, rand4k, rand4k);
+  TEST_RW_BUF(CoupledTransports, 1024*512, 1, 1, rand4k, rand4k);
 
 class TransportTestGen {
  public: