You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by dr...@apache.org on 2010/03/09 06:20:17 UTC

svn commit: r920686 - in /incubator/thrift/trunk/lib/cpp/src/transport: TFileTransport.cpp TFileTransport.h

Author: dreiss
Date: Tue Mar  9 05:20:17 2010
New Revision: 920686

URL: http://svn.apache.org/viewvc?rev=920686&view=rev
Log:
cpp: Prevent TFileTransport from throwing uncaught exceptions

FilleTransport::writerThread throws exceptions. This function is run
inside the writer thread, so these exceptions are not caught. When these
exceptions happen, the system aborts.

The fix is to first eliminate all the throw commands inside this function. In
addition, add some error recovery logic into the code: whenever an IO error
happens, we enter into an error recovery mode. Go to sleep for a few seconds
then try to reopen the file.

Note: Currently, when errors happen, we drop events.

Modified:
    incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.cpp
    incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.h

Modified: incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.cpp
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.cpp?rev=920686&r1=920685&r2=920686&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.cpp (original)
+++ incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.cpp Tue Mar  9 05:20:17 2010
@@ -82,6 +82,7 @@ TFileTransport::TFileTransport(string pa
   , maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS)
   , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US)
   , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US)
+  , writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US)
   , writerThreadId_(0)
   , dequeueBuffer_(NULL)
   , enqueueBuffer_(NULL)
@@ -301,52 +302,85 @@ bool TFileTransport::swapEventBuffers(st
 
 
 void TFileTransport::writerThread() {
+  bool hasIOError = false;
+
   // open file if it is not open
   if(!fd_) {
-    openLogFile();
+    try {
+      openLogFile();
+    } catch (...) {
+      int errno_copy = errno;
+      GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy);
+      fd_ = 0;
+      hasIOError = true;
+    }
   }
 
   // set the offset to the correct value (EOF)
-  try {
-    seekToEnd();
-  } catch (TException &te) {
+  if (!hasIOError) {
+    try {
+      seekToEnd();
+      // throw away any partial events
+      offset_ += readState_.lastDispatchPtr_;
+      ftruncate(fd_, offset_);
+      readState_.resetAllValues();
+    } catch (...) {
+      int errno_copy = errno;
+      GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy);
+      hasIOError = true;
+    }
   }
 
-  // throw away any partial events
-  offset_ += readState_.lastDispatchPtr_;
-  ftruncate(fd_, offset_);
-  readState_.resetAllValues();
-
   // Figure out the next time by which a flush must take place
-
   struct timespec ts_next_flush;
   getNextFlushTime(&ts_next_flush);
   uint32_t unflushed = 0;
 
-  while(1) {
+  while (1) {
     // this will only be true when the destructor is being invoked
-    if(closing_) {
-      // empty out both the buffers
+    if (closing_) {
+      if (hasIOError) {
+        pthread_exit(NULL);
+      }
+
+      // Try to empty buffers before exit 
       if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
+        fsync(fd_);
         if (-1 == ::close(fd_)) {
           int errno_copy = errno;
           GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
-          throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);
         }
-        // just be safe and sync to disk
-        fsync(fd_);
-        fd_ = 0;
         pthread_exit(NULL);
-        return;
       }
     }
 
     if (swapEventBuffers(&ts_next_flush)) {
       eventInfo* outEvent;
       while (NULL != (outEvent = dequeueBuffer_->getNext())) {
-        if (!outEvent) {
-          T_DEBUG_L(1, "Got an empty event");
-          return;
+        // Remove an event from the buffer and write it out to disk. If there is any IO error, for instance,
+        // the output file is unmounted or deleted, then this event is dropped. However, the writer thread
+        // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then start writing
+        // from the end.
+
+        while (hasIOError) {
+          T_ERROR("TFileTransport: writer thread going to sleep for %d microseconds due to IO errors", writerThreadIOErrorSleepTime_);
+          usleep(writerThreadIOErrorSleepTime_);
+          if (closing_) {
+            pthread_exit(NULL);
+          }
+          if (!fd_) {
+            ::close(fd_);
+            fd_ = 0;
+          }
+          try {
+            openLogFile();
+            seekToEnd();
+            unflushed = 0;
+            hasIOError = false;
+            T_LOG_OPER("TFileTransport: log file %s reopened by writer thread during error recovery", filename_.c_str());
+          } catch (...) {
+            T_ERROR("TFileTransport: unable to reopen log file %s during error recovery", filename_.c_str());
+          }
         }
 
         // sanity check on event
@@ -357,11 +391,9 @@ void TFileTransport::writerThread() {
 
         // If chunking is required, then make sure that msg does not cross chunk boundary
         if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
-
           // event size must be less than chunk size
-          if(outEvent->eventSize_ > chunkSize_) {
-            T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",
-                  outEvent->eventSize_, chunkSize_);
+          if (outEvent->eventSize_ > chunkSize_) {
+            T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event", outEvent->eventSize_, chunkSize_);
             continue;
           }
 
@@ -372,14 +404,15 @@ void TFileTransport::writerThread() {
           if (chunk1 != chunk2) {
             // refetch the offset to keep in sync
             offset_ = lseek(fd_, 0, SEEK_CUR);
-            int32_t padding = (int32_t)((offset_/chunkSize_ + 1)*chunkSize_ - offset_);
+            int32_t padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_);
 
             uint8_t zeros[padding];
             bzero(zeros, padding);
             if (-1 == ::write(fd_, zeros, padding)) {
               int errno_copy = errno;
               GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy);
-              throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error while padding zeros", errno_copy);
+              hasIOError = true;
+              continue;
             }
             unflushed += padding;
             offset_ += padding;
@@ -391,9 +424,9 @@ void TFileTransport::writerThread() {
           if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
             int errno_copy = errno;
             GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
-            throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error while writing event", errno_copy);
+            hasIOError = true;
+            continue;
           }
-
           unflushed += outEvent->eventSize_;
           offset_ += outEvent->eventSize_;
         }
@@ -401,6 +434,10 @@ void TFileTransport::writerThread() {
       dequeueBuffer_->reset();
     }
 
+    if (hasIOError) {
+      continue;
+    }
+
     bool flushTimeElapsed = false;
     struct timespec current_time;
     clock_gettime(CLOCK_REALTIME, &current_time);

Modified: incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.h
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.h?rev=920686&r1=920685&r2=920686&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.h (original)
+++ incubator/thrift/trunk/lib/cpp/src/transport/TFileTransport.h Tue Mar  9 05:20:17 2010
@@ -338,6 +338,10 @@ class TFileTransport : public TFileReade
   uint32_t corruptedEventSleepTime_;
   static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;
 
+  // sleep duration in seconds when an IO error is encountered in the writer thread
+  uint32_t writerThreadIOErrorSleepTime_;
+  static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;
+
   // writer thread id
   pthread_t writerThreadId_;