You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2012/07/06 20:18:39 UTC

svn commit: r1358330 - /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java

Author: jxiang
Date: Fri Jul  6 18:18:38 2012
New Revision: 1358330

URL: http://svn.apache.org/viewvc?rev=1358330&view=rev
Log:
HBASE-6318 SplitLogWorker exited due to ConcurrentModificationException

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1358330&r1=1358329&r2=1358330&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Fri Jul  6 18:18:38 2012
@@ -1293,7 +1293,7 @@ public class HLogSplitter {
 
       boolean progress_failed = false;
       try {
-        for (int i = 0; i < logWriters.size(); i++) {
+        for (int i = 0, n = logWriters.size(); i < n; i++) {
           Future<Void> future = completionService.take();
           future.get();
           if (!progress_failed && !reportProgressIfIsDistributedLogSplitting()) {
@@ -1327,18 +1327,36 @@ public class HLogSplitter {
         if (thrown == null) {
           thrown = Lists.newArrayList();
         }
-        for (WriterAndPath wap : logWriters.values()) {
-          try {
-            wap.w.close();
-          } catch (IOException ioe) {
-            LOG.error("Couldn't close log at " + wap.p, ioe);
-            thrown.add(ioe);
-            continue;
+        try {
+          for (WriterThread t : writerThreads) {
+            while (t.isAlive()) {
+              t.shouldStop = true;
+              t.interrupt();
+              try {
+                t.join(10);
+              } catch (InterruptedException e) {
+                IOException iie = new InterruptedIOException();
+                iie.initCause(e);
+                throw iie;
+              }
+            }
           }
-          LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten
-              + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)");
+        } finally {
+          synchronized (logWriters) {
+            for (WriterAndPath wap : logWriters.values()) {
+              try {
+                wap.w.close();
+              } catch (IOException ioe) {
+                LOG.error("Couldn't close log at " + wap.p, ioe);
+                thrown.add(ioe);
+                continue;
+              }
+              LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten
+                  + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)");
+            }
+          }
+          logWritersClosed = true;
         }
-        logWritersClosed = true;
       }
       return thrown;
     }
@@ -1423,11 +1441,6 @@ public class HLogSplitter {
     long editsWritten = 0;
     /* Number of nanos spent writing to this log */
     long nanosSpent = 0;
-    
-    /* To check whether a close has already been tried on the
-     * writer
-     */
-    boolean writerClosed = false;
 
     WriterAndPath(final Path p, final Writer w) {
       this.p = p;