You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2011/11/15 22:48:57 UTC
svn commit: r1202435 - in
/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log:
RemoteLogger.java TabletServerLogger.java
Author: kturner
Date: Tue Nov 15 21:48:57 2011
New Revision: 1202435
URL: http://svn.apache.org/viewvc?rev=1202435&view=rev
Log:
ACCUMULO-119 initial checkin of group commit
Modified:
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java
incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java?rev=1202435&r1=1202434&r2=1202435&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java Tue Nov 15 21:48:57 2011
@@ -17,8 +17,12 @@
package org.apache.accumulo.server.tabletserver.log;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.KeyExtent;
@@ -29,6 +33,7 @@ import org.apache.accumulo.core.tabletse
import org.apache.accumulo.core.tabletserver.thrift.MutationLogger;
import org.apache.accumulo.core.tabletserver.thrift.NoSuchLogIDException;
import org.apache.accumulo.core.tabletserver.thrift.TabletMutations;
+import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.security.SecurityConstants;
@@ -44,6 +49,99 @@ import org.apache.thrift.transport.TTran
public class RemoteLogger {
private static Logger log = Logger.getLogger(RemoteLogger.class);
+ private LinkedBlockingQueue<LogWork> workQueue = new LinkedBlockingQueue<LogWork>();
+
+ private String closeLock = new String("foo");
+
+ private static final LogWork CLOSED_MARKER = new LogWork(null, null);
+
+ private boolean closed = false;
+
+ public static class LoggerOperation {
+ private LogWork work;
+
+ public LoggerOperation(LogWork work) {
+ this.work = work;
+ }
+
+ public void await() throws NoSuchLogIDException, LoggerClosedException, TException {
+ try {
+ work.latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (work.exception != null) {
+ if (work.exception instanceof NoSuchLogIDException)
+ throw (NoSuchLogIDException) work.exception;
+ else if (work.exception instanceof LoggerClosedException)
+ throw (LoggerClosedException) work.exception;
+ else if (work.exception instanceof TException)
+ throw (TException) work.exception;
+ else if (work.exception instanceof RuntimeException)
+ throw (RuntimeException) work.exception;
+ else
+ throw new RuntimeException(work.exception);
+ }
+ }
+ }
+
+ private static class LogWork {
+ List<TabletMutations> mutations;
+ CountDownLatch latch;
+ volatile Exception exception;
+
+ public LogWork(List<TabletMutations> mutations, CountDownLatch latch) {
+ this.mutations = mutations;
+ this.latch = latch;
+ }
+ }
+
+ private class LogWriterTask implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ ArrayList<LogWork> work = new ArrayList<LogWork>();
+ ArrayList<TabletMutations> mutations = new ArrayList<TabletMutations>();
+ while (true) {
+
+ work.clear();
+ mutations.clear();
+
+ work.add(workQueue.take());
+ workQueue.drainTo(work);
+
+ for (LogWork logWork : work)
+ if (logWork != CLOSED_MARKER)
+ mutations.addAll(logWork.mutations);
+
+ synchronized (RemoteLogger.this) {
+ try {
+ client.logManyTablets(null, logFile.id, mutations);
+ } catch (Exception e) {
+ for (LogWork logWork : work)
+ if (logWork != CLOSED_MARKER)
+ logWork.exception = e;
+ }
+ }
+
+ boolean sawClosedMarker = false;
+ for (LogWork logWork : work)
+ if (logWork == CLOSED_MARKER)
+ sawClosedMarker = true;
+ else
+ logWork.latch.countDown();
+
+ if (sawClosedMarker)
+ break;
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+
@Override
public boolean equals(Object obj) {
// filename is unique
@@ -87,6 +185,10 @@ public class RemoteLogger {
client = null;
throw te;
}
+
+ Thread t = new Daemon(new LogWriterTask());
+ t.setName("Accumulo WALog thread " + toString());
+ t.start();
}
public RemoteLogger(String address) throws IOException {
@@ -123,6 +225,19 @@ public class RemoteLogger {
}
public synchronized void close() throws NoSuchLogIDException, LoggerClosedException, TException {
+
+ synchronized (closeLock) {
+ if (closed)
+ return;
+ // after closed is set to true, nothing else should be added to the queue
+ // CLOSED_MARKER should be the last thing on the queue, therefore when the
+ // background thread sees the marker and exits there should be nothing else
+ // to process... so nothing should be left waiting for the background
+ // thread to do work
+ closed = true;
+ workQueue.add(CLOSED_MARKER);
+ }
+
try {
client.close(null, logFile.id);
} finally {
@@ -135,12 +250,23 @@ public class RemoteLogger {
client.defineTablet(null, logFile.id, seq, tid, tablet.toThrift());
}
- public synchronized void log(int seq, int tid, Mutation mutation) throws NoSuchLogIDException, LoggerClosedException, TException {
- client.log(null, logFile.id, seq, tid, mutation.toThrift());
+ public LoggerOperation log(int seq, int tid, Mutation mutation) throws NoSuchLogIDException, LoggerClosedException, TException {
+ return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation.toThrift()))));
}
- public synchronized void logManyTablets(List<TabletMutations> mutations) throws NoSuchLogIDException, LoggerClosedException, TException {
- client.logManyTablets(null, logFile.id, mutations);
+ public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws NoSuchLogIDException, LoggerClosedException, TException {
+ LogWork work = new LogWork(mutations, new CountDownLatch(1));
+
+ synchronized (closeLock) {
+ // use a differnt lock for close check so that adding to work queue does not need
+ // to wait on walog I/O operations
+
+ if (closed)
+ throw new NoSuchLogIDException();
+ workQueue.add(work);
+ }
+
+ return new LoggerOperation(work);
}
public synchronized void minorCompactionFinished(int seq, int tid, String fqfn) throws NoSuchLogIDException, LoggerClosedException, TException {
Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1202435&r1=1202434&r2=1202435&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Tue Nov 15 21:48:57 2011
@@ -40,8 +40,9 @@ import org.apache.accumulo.core.tabletse
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.tabletserver.Tablet;
-import org.apache.accumulo.server.tabletserver.TabletServer;
import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
+import org.apache.accumulo.server.tabletserver.TabletServer;
+import org.apache.accumulo.server.tabletserver.log.RemoteLogger.LoggerOperation;
import org.apache.log4j.Logger;
/**
@@ -247,7 +248,7 @@ public class TabletServerLogger {
}
interface Writer {
- void write(RemoteLogger logger, int seq) throws Exception;
+ LoggerOperation write(RemoteLogger logger, int seq) throws Exception;
}
private int write(CommitSession commitSession, boolean mincFinish, Writer writer) throws IOException {
@@ -294,8 +295,15 @@ public class TabletServerLogger {
seq = seqGen.incrementAndGet();
if (seq < 0)
throw new RuntimeException("Logger sequence generator wrapped! Onos!!!11!eleven");
+ ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size());
for (RemoteLogger wal : copy) {
- writer.write(wal, seq);
+ LoggerOperation lop = writer.write(wal, seq);
+ if (lop != null)
+ queuedOperations.add(lop);
+ }
+
+ for (LoggerOperation lop : queuedOperations) {
+ lop.await();
}
// double-check: did the log set change?
@@ -350,8 +358,9 @@ public class TabletServerLogger {
return -1;
return write(commitSession, false, new Writer() {
@Override
- public void write(RemoteLogger logger, int ignored) throws Exception {
+ public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent());
+ return null;
}
});
}
@@ -361,8 +370,8 @@ public class TabletServerLogger {
return -1;
int seq = write(commitSession, false, new Writer() {
@Override
- public void write(RemoteLogger logger, int ignored) throws Exception {
- logger.log(tabletSeq, commitSession.getLogId(), m);
+ public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
+ return logger.log(tabletSeq, commitSession.getLogId(), m);
}
});
logSizeEstimate.addAndGet(m.numBytes());
@@ -381,7 +390,7 @@ public class TabletServerLogger {
int seq = write(loggables.keySet(), false, new Writer() {
@Override
- public void write(RemoteLogger logger, int ignored) throws Exception {
+ public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
List<TabletMutations> copy = new ArrayList<TabletMutations>(loggables.size());
for (Entry<CommitSession,List<Mutation>> entry : loggables.entrySet()) {
CommitSession cs = entry.getKey();
@@ -390,7 +399,7 @@ public class TabletServerLogger {
tmutations.add(m.toThrift());
copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(), tmutations));
}
- logger.logManyTablets(copy);
+ return logger.logManyTablets(copy);
}
});
for (List<Mutation> entry : loggables.values()) {
@@ -412,8 +421,9 @@ public class TabletServerLogger {
int seq = write(commitSession, true, new Writer() {
@Override
- public void write(RemoteLogger logger, int ignored) throws Exception {
+ public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName);
+ return null;
}
});
@@ -427,8 +437,9 @@ public class TabletServerLogger {
return -1;
write(commitSession, false, new Writer() {
@Override
- public void write(RemoteLogger logger, int ignored) throws Exception {
+ public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception {
logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName);
+ return null;
}
});
return seq;