You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/06/03 19:16:48 UTC
git commit: ACCUMULO-2766 fix wal group commit
Repository: accumulo
Updated Branches:
refs/heads/1.5.2-SNAPSHOT 05a64c13d -> 1e16159cd
ACCUMULO-2766 fix wal group commit
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1e16159c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1e16159c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1e16159c
Branch: refs/heads/1.5.2-SNAPSHOT
Commit: 1e16159cdb25121123a3ea64df8a47c0eda54709
Parents: 05a64c1
Author: Keith Turner <kt...@apache.org>
Authored: Thu May 15 08:52:04 2014 -0400
Committer: Keith Turner <kt...@apache.org>
Committed: Tue Jun 3 12:52:19 2014 -0400
----------------------------------------------------------------------
.../server/tabletserver/log/DfsLogger.java | 60 +++++++++-----------
1 file changed, 28 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1e16159c/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
index 01c2448..fb21ba5 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.LoggingRunnable;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.StringUtil;
import org.apache.accumulo.server.logger.LogFileKey;
@@ -100,7 +101,8 @@ public class DfsLogger {
@Override
public void run() {
ArrayList<DfsLogger.LogWork> work = new ArrayList<DfsLogger.LogWork>();
- while (true) {
+ boolean sawClosedMarker = false;
+ while (!sawClosedMarker) {
work.clear();
try {
@@ -110,36 +112,20 @@ public class DfsLogger {
}
workQueue.drainTo(work);
- synchronized (closeLock) {
- if (!closed) {
- try {
- sync.invoke(logFile);
- } catch (Exception ex) {
- log.warn("Exception syncing " + ex);
- for (DfsLogger.LogWork logWork : work) {
- logWork.exception = ex;
- }
- }
- } else {
- for (DfsLogger.LogWork logWork : work) {
- logWork.exception = new LogClosedException();
- }
+ try {
+ sync.invoke(logFile);
+ } catch (Exception ex) {
+ log.warn("Exception syncing " + ex);
+ for (DfsLogger.LogWork logWork : work) {
+ logWork.exception = ex;
}
}
- boolean sawClosedMarker = false;
for (DfsLogger.LogWork logWork : work)
if (logWork == CLOSED_MARKER)
sawClosedMarker = true;
else
logWork.latch.countDown();
-
- if (sawClosedMarker) {
- synchronized (closeLock) {
- closeLock.notifyAll();
- }
- break;
- }
}
}
}
@@ -200,6 +186,7 @@ public class DfsLogger {
private Method sync;
private Path logPath;
private String logger;
+ private Daemon syncThread;
public DfsLogger(ServerResources conf) throws IOException {
this.conf = conf;
@@ -319,9 +306,9 @@ public class DfsLogger {
throw ex;
}
- Thread t = new Daemon(new LogSyncingTask());
- t.setName("Accumulo WALog thread " + toString());
- t.start();
+ syncThread = new Daemon(new LoggingRunnable(log, new LogSyncingTask()));
+ syncThread.setName("Accumulo WALog thread " + toString());
+ syncThread.start();
}
private FSDataOutputStream create(FileSystem fs, Path logPath, boolean b, int buffersize, short replication, long blockSize) throws IOException {
@@ -386,14 +373,23 @@ public class DfsLogger {
// thread to do work
closed = true;
workQueue.add(CLOSED_MARKER);
- while (!workQueue.isEmpty())
- try {
- closeLock.wait();
- } catch (InterruptedException e) {
- log.info("Interrupted");
- }
}
+ // wait for background thread to finish before closing log file
+ if(syncThread != null){
+ try {
+ syncThread.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ //expect workq should be empty at this point
+ if(workQueue.size() != 0){
+ log.error("WAL work queue not empty after sync thread exited");
+ throw new IllegalStateException("WAL work queue not empty after sync thread exited");
+ }
+
if (logFile != null)
try {
logFile.close();