You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2012/01/22 14:20:15 UTC

svn commit: r1234509 - in /hbase/trunk: CHANGES.txt src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java

Author: ramkrishna
Date: Sun Jan 22 13:20:15 2012
New Revision: 1234509

URL: http://svn.apache.org/viewvc?rev=1234509&view=rev
Log:
HBASE-5235 HLogSplitter writer thread's streams not getting closed when any of the writer threads has exceptions.(Ram)

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1234509&r1=1234508&r2=1234509&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Sun Jan 22 13:20:15 2012
@@ -7,6 +7,8 @@ Release 0.92.1 - Unreleased
   BUG FIXES
    HBASE-5176  AssignmentManager#getRegion: logging nit  adds a redundant '+' (Karthik K)
    HBASE-5237  Addendum for HBASE-5160 and HBASE-4397 (Ram)
+   HBASE-5235  HLogSplitter writer thread's streams not getting closed when any 
+               of the writer threads has exceptions. (Ram)
 
   TESTS
    HBASE-5223  TestMetaReaderEditor is missing call to CatalogTracker.stop()

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1234509&r1=1234508&r2=1234509&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Sun Jan 22 13:20:15 2012
@@ -1142,7 +1142,9 @@ public class HLogSplitter {
     private final Set<byte[]> blacklistedRegions = Collections.synchronizedSet(
         new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
 
-    private boolean hasClosed = false;
+    private boolean closeAndCleanCompleted = false;
+    
+    private boolean logWritersClosed  = false;
 
     /**
      * Start the threads that will pump data from the entryBuffers
@@ -1167,20 +1169,27 @@ public class HLogSplitter {
 
     List<Path> finishWritingAndClose() throws IOException {
       LOG.info("Waiting for split writer threads to finish");
-      for (WriterThread t : writerThreads) {
-        t.finish();
-      }
-      for (WriterThread t: writerThreads) {
-        try {
-          t.join();
-        } catch (InterruptedException ie) {
-          throw new IOException(ie);
+      try {
+        for (WriterThread t : writerThreads) {
+          t.finish();
         }
-        checkForErrors();
-      }
-      LOG.info("Split writers finished");
+        for (WriterThread t : writerThreads) {
+          try {
+            t.join();
+          } catch (InterruptedException ie) {
+            throw new IOException(ie);
+          }
+          checkForErrors();
+        }
+        LOG.info("Split writers finished");
 
-      return closeStreams();
+        return closeStreams();
+      } finally {
+        List<IOException> thrown = closeLogWriters(null);
+        if (thrown != null && !thrown.isEmpty()) {
+          throw MultipleIOException.createIOException(thrown);
+        }
+      }
     }
 
     /**
@@ -1188,21 +1197,12 @@ public class HLogSplitter {
      * @return the list of paths written.
      */
     private List<Path> closeStreams() throws IOException {
-      Preconditions.checkState(!hasClosed);
+      Preconditions.checkState(!closeAndCleanCompleted);
 
       List<Path> paths = new ArrayList<Path>();
       List<IOException> thrown = Lists.newArrayList();
-
+      closeLogWriters(thrown);
       for (WriterAndPath wap : logWriters.values()) {
-        try {
-          wap.w.close();
-        } catch (IOException ioe) {
-          LOG.error("Couldn't close log at " + wap.p, ioe);
-          thrown.add(ioe);
-          continue;
-        }
-        LOG.info("Closed path " + wap.p +" (wrote " + wap.editsWritten + " edits in "
-            + (wap.nanosSpent / 1000/ 1000) + "ms)");
         Path dst = getCompletedRecoveredEditsFilePath(wap.p);
         try {
           if (!dst.equals(wap.p) && fs.exists(dst)) {
@@ -1233,9 +1233,31 @@ public class HLogSplitter {
         throw MultipleIOException.createIOException(thrown);
       }
 
-      hasClosed = true;
+      closeAndCleanCompleted = true;
       return paths;
     }
+    
+    private List<IOException> closeLogWriters(List<IOException> thrown)
+        throws IOException {
+      if (!logWritersClosed) {
+        if (thrown == null) {
+          thrown = Lists.newArrayList();
+        }
+        for (WriterAndPath wap : logWriters.values()) {
+          try {
+            wap.w.close();
+          } catch (IOException ioe) {
+            LOG.error("Couldn't close log at " + wap.p, ioe);
+            thrown.add(ioe);
+            continue;
+          }
+          LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten
+              + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)");
+        }
+        logWritersClosed = true;
+      }
+      return thrown;
+    }
 
     /**
      * Get a writer and path for a log starting at the given entry.