You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/06/03 19:16:48 UTC

git commit: ACCUMULO-2766 fix wal group commit

Repository: accumulo
Updated Branches:
  refs/heads/1.5.2-SNAPSHOT 05a64c13d -> 1e16159cd


ACCUMULO-2766 fix wal group commit


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1e16159c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1e16159c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1e16159c

Branch: refs/heads/1.5.2-SNAPSHOT
Commit: 1e16159cdb25121123a3ea64df8a47c0eda54709
Parents: 05a64c1
Author: Keith Turner <kt...@apache.org>
Authored: Thu May 15 08:52:04 2014 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Tue Jun 3 12:52:19 2014 -0400

----------------------------------------------------------------------
 .../server/tabletserver/log/DfsLogger.java      | 60 +++++++++-----------
 1 file changed, 28 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e16159c/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
index 01c2448..fb21ba5 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.LoggingRunnable;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.StringUtil;
 import org.apache.accumulo.server.logger.LogFileKey;
@@ -100,7 +101,8 @@ public class DfsLogger {
     @Override
     public void run() {
       ArrayList<DfsLogger.LogWork> work = new ArrayList<DfsLogger.LogWork>();
-      while (true) {
+      boolean sawClosedMarker = false;
+      while (!sawClosedMarker) {
         work.clear();
 
         try {
@@ -110,36 +112,20 @@ public class DfsLogger {
         }
         workQueue.drainTo(work);
 
-        synchronized (closeLock) {
-          if (!closed) {
-            try {
-              sync.invoke(logFile);
-            } catch (Exception ex) {
-              log.warn("Exception syncing " + ex);
-              for (DfsLogger.LogWork logWork : work) {
-                logWork.exception = ex;
-              }
-            }
-          } else {
-            for (DfsLogger.LogWork logWork : work) {
-              logWork.exception = new LogClosedException();
-            }
+        try {
+          sync.invoke(logFile);
+        } catch (Exception ex) {
+          log.warn("Exception syncing " + ex);
+          for (DfsLogger.LogWork logWork : work) {
+            logWork.exception = ex;
           }
         }
 
-        boolean sawClosedMarker = false;
         for (DfsLogger.LogWork logWork : work)
           if (logWork == CLOSED_MARKER)
             sawClosedMarker = true;
           else
             logWork.latch.countDown();
-
-        if (sawClosedMarker) {
-          synchronized (closeLock) {
-            closeLock.notifyAll();
-          }
-          break;
-        }
       }
     }
   }
@@ -200,6 +186,7 @@ public class DfsLogger {
   private Method sync;
   private Path logPath;
   private String logger;
+  private Daemon syncThread;
 
   public DfsLogger(ServerResources conf) throws IOException {
     this.conf = conf;
@@ -319,9 +306,9 @@ public class DfsLogger {
       throw ex;
     }
 
-    Thread t = new Daemon(new LogSyncingTask());
-    t.setName("Accumulo WALog thread " + toString());
-    t.start();
+    syncThread = new Daemon(new LoggingRunnable(log, new LogSyncingTask()));
+    syncThread.setName("Accumulo WALog thread " + toString());
+    syncThread.start();
   }
 
   private FSDataOutputStream create(FileSystem fs, Path logPath, boolean b, int buffersize, short replication, long blockSize) throws IOException {
@@ -386,14 +373,23 @@ public class DfsLogger {
       // thread to do work
       closed = true;
       workQueue.add(CLOSED_MARKER);
-      while (!workQueue.isEmpty())
-        try {
-          closeLock.wait();
-        } catch (InterruptedException e) {
-          log.info("Interrupted");
-        }
     }
 
+    // wait for background thread to finish before closing log file
+    if(syncThread != null){
+      try {
+        syncThread.join();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    //expect workq should be empty at this point
+    if(workQueue.size() != 0){
+      log.error("WAL work queue not empty after sync thread exited");
+      throw new IllegalStateException("WAL work queue not empty after sync thread exited");
+    }
+    
     if (logFile != null)
       try {
         logFile.close();