You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2011/11/15 22:48:57 UTC

svn commit: r1202435 - in /incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log: RemoteLogger.java TabletServerLogger.java

Author: kturner
Date: Tue Nov 15 21:48:57 2011
New Revision: 1202435

URL: http://svn.apache.org/viewvc?rev=1202435&view=rev
Log:
ACCUMULO-119 initial checkin of group commit

Modified:
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java?rev=1202435&r1=1202434&r2=1202435&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java Tue Nov 15 21:48:57 2011
@@ -17,8 +17,12 @@
 package org.apache.accumulo.server.tabletserver.log;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.KeyExtent;
@@ -29,6 +33,7 @@ import org.apache.accumulo.core.tabletse
 import org.apache.accumulo.core.tabletserver.thrift.MutationLogger;
 import org.apache.accumulo.core.tabletserver.thrift.NoSuchLogIDException;
 import org.apache.accumulo.core.tabletserver.thrift.TabletMutations;
+import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.security.SecurityConstants;
@@ -44,6 +49,99 @@ import org.apache.thrift.transport.TTran
 public class RemoteLogger {
   private static Logger log = Logger.getLogger(RemoteLogger.class);
   
+  private LinkedBlockingQueue<LogWork> workQueue = new LinkedBlockingQueue<LogWork>();
+  
+  private String closeLock = new String("foo");
+  
+  private static final LogWork CLOSED_MARKER = new LogWork(null, null);
+  
+  private boolean closed = false;
+
+  public static class LoggerOperation {
+    private LogWork work;
+    
+    public LoggerOperation(LogWork work) {
+      this.work = work;
+    }
+    
+    public void await() throws NoSuchLogIDException, LoggerClosedException, TException {
+      try {
+        work.latch.await();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      
+      if (work.exception != null) {
+        if (work.exception instanceof NoSuchLogIDException)
+          throw (NoSuchLogIDException) work.exception;
+        else if (work.exception instanceof LoggerClosedException)
+          throw (LoggerClosedException) work.exception;
+        else if (work.exception instanceof TException)
+          throw (TException) work.exception;
+        else if (work.exception instanceof RuntimeException)
+          throw (RuntimeException) work.exception;
+        else
+          throw new RuntimeException(work.exception);
+      }
+    }
+  }
+
+  private static class LogWork {
+    List<TabletMutations> mutations;
+    CountDownLatch latch;
+    volatile Exception exception;
+    
+    public LogWork(List<TabletMutations> mutations, CountDownLatch latch) {
+      this.mutations = mutations;
+      this.latch = latch;
+    }
+  }
+  
+  private class LogWriterTask implements Runnable {
+
+    @Override
+    public void run() {
+      try {
+        ArrayList<LogWork> work = new ArrayList<LogWork>();
+        ArrayList<TabletMutations> mutations = new ArrayList<TabletMutations>();
+        while (true) {
+          
+          work.clear();
+          mutations.clear();
+          
+          work.add(workQueue.take());
+          workQueue.drainTo(work);
+          
+          for (LogWork logWork : work)
+            if (logWork != CLOSED_MARKER)
+              mutations.addAll(logWork.mutations);
+          
+          synchronized (RemoteLogger.this) {
+            try {
+              client.logManyTablets(null, logFile.id, mutations);
+            } catch (Exception e) {
+              for (LogWork logWork : work)
+                if (logWork != CLOSED_MARKER)
+                  logWork.exception = e;
+            }
+          }
+          
+          boolean sawClosedMarker = false;
+          for (LogWork logWork : work)
+            if (logWork == CLOSED_MARKER)
+              sawClosedMarker = true;
+            else
+              logWork.latch.countDown();
+          
+          if (sawClosedMarker)
+            break;
+        }
+      } catch (Exception e) {
+        log.error(e.getMessage(), e);
+      }
+    }
+  }
+
   @Override
   public boolean equals(Object obj) {
     // filename is unique
@@ -87,6 +185,10 @@ public class RemoteLogger {
       client = null;
       throw te;
     }
+    
+    Thread t = new Daemon(new LogWriterTask());
+    t.setName("Accumulo WALog thread " + toString());
+    t.start();
   }
   
   public RemoteLogger(String address) throws IOException {
@@ -123,6 +225,19 @@ public class RemoteLogger {
   }
   
   public synchronized void close() throws NoSuchLogIDException, LoggerClosedException, TException {
+    
+    synchronized (closeLock) {
+      if (closed)
+        return;
+      // after closed is set to true, nothing else should be added to the queue
+      // CLOSED_MARKER should be the last thing on the queue, therefore when the
+      // background thread sees the marker and exits there should be nothing else
+      // to process... so nothing should be left waiting for the background
+      // thread to do work
+      closed = true;
+      workQueue.add(CLOSED_MARKER);
+    }
+
     try {
       client.close(null, logFile.id);
     } finally {
@@ -135,12 +250,23 @@ public class RemoteLogger {
     client.defineTablet(null, logFile.id, seq, tid, tablet.toThrift());
   }
   
-  public synchronized void log(int seq, int tid, Mutation mutation) throws NoSuchLogIDException, LoggerClosedException, TException {
-    client.log(null, logFile.id, seq, tid, mutation.toThrift());
+  public LoggerOperation log(int seq, int tid, Mutation mutation) throws NoSuchLogIDException, LoggerClosedException, TException {
+    return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation.toThrift()))));
   }
   
-  public synchronized void logManyTablets(List<TabletMutations> mutations) throws NoSuchLogIDException, LoggerClosedException, TException {
-    client.logManyTablets(null, logFile.id, mutations);
+  public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws NoSuchLogIDException, LoggerClosedException, TException {
+    LogWork work = new LogWork(mutations, new CountDownLatch(1));
+    
+    synchronized (closeLock) {
+      // use a differnt lock for close check so that adding to work queue does not need
+      // to wait on walog I/O operations
+
+      if (closed)
+        throw new NoSuchLogIDException();
+      workQueue.add(work);
+    }
+
+    return new LoggerOperation(work);
   }
   
   public synchronized void minorCompactionFinished(int seq, int tid, String fqfn) throws NoSuchLogIDException, LoggerClosedException, TException {

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1202435&r1=1202434&r2=1202435&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Tue Nov 15 21:48:57 2011
@@ -40,8 +40,9 @@ import org.apache.accumulo.core.tabletse
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.tabletserver.Tablet;
-import org.apache.accumulo.server.tabletserver.TabletServer;
 import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
+import org.apache.accumulo.server.tabletserver.TabletServer;
+import org.apache.accumulo.server.tabletserver.log.RemoteLogger.LoggerOperation;
 import org.apache.log4j.Logger;
 
 /**
@@ -247,7 +248,7 @@ public class TabletServerLogger {
   }
   
   interface Writer {
-    void write(RemoteLogger logger, int seq) throws Exception;
+    LoggerOperation write(RemoteLogger logger, int seq) throws Exception;
   }
   
   private int write(CommitSession commitSession, boolean mincFinish, Writer writer) throws IOException {
@@ -294,8 +295,15 @@ public class TabletServerLogger {
           seq = seqGen.incrementAndGet();
           if (seq < 0)
             throw new RuntimeException("Logger sequence generator wrapped!  Onos!!!11!eleven");
+          ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size());
           for (RemoteLogger wal : copy) {
-            writer.write(wal, seq);
+            LoggerOperation lop = writer.write(wal, seq);
+            if (lop != null)
+              queuedOperations.add(lop);
+          }
+          
+          for (LoggerOperation lop : queuedOperations) {
+            lop.await();
           }
           
           // double-check: did the log set change?
@@ -350,8 +358,9 @@ public class TabletServerLogger {
       return -1;
     return write(commitSession, false, new Writer() {
       @Override
-      public void write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
         logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent());
+        return null;
       }
     });
   }
@@ -361,8 +370,8 @@ public class TabletServerLogger {
       return -1;
     int seq = write(commitSession, false, new Writer() {
       @Override
-      public void write(RemoteLogger logger, int ignored) throws Exception {
-        logger.log(tabletSeq, commitSession.getLogId(), m);
+      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+        return logger.log(tabletSeq, commitSession.getLogId(), m);
       }
     });
     logSizeEstimate.addAndGet(m.numBytes());
@@ -381,7 +390,7 @@ public class TabletServerLogger {
     
     int seq = write(loggables.keySet(), false, new Writer() {
       @Override
-      public void write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
         List<TabletMutations> copy = new ArrayList<TabletMutations>(loggables.size());
         for (Entry<CommitSession,List<Mutation>> entry : loggables.entrySet()) {
           CommitSession cs = entry.getKey();
@@ -390,7 +399,7 @@ public class TabletServerLogger {
             tmutations.add(m.toThrift());
           copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(), tmutations));
         }
-        logger.logManyTablets(copy);
+        return logger.logManyTablets(copy);
       }
     });
     for (List<Mutation> entry : loggables.values()) {
@@ -412,8 +421,9 @@ public class TabletServerLogger {
     
     int seq = write(commitSession, true, new Writer() {
       @Override
-      public void write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
         logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName);
+        return null;
       }
     });
     
@@ -427,8 +437,9 @@ public class TabletServerLogger {
       return -1;
     write(commitSession, false, new Writer() {
       @Override
-      public void write(RemoteLogger logger, int ignored) throws Exception {
+      public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
         logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName);
+        return null;
       }
     });
     return seq;