You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/05/07 19:58:30 UTC
[1/3] accumulo git commit: ACCUMULO-3775 always sync the OPEN record
Repository: accumulo
Updated Branches:
refs/heads/1.7 f55751bc0 -> 931bf897e
refs/heads/master 134300382 -> 6acfd4149
ACCUMULO-3775 always sync the OPEN record
Signed-off-by: Josh Elser <el...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/931bf897
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/931bf897
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/931bf897
Branch: refs/heads/1.7
Commit: 931bf897e337756a26064039ca86b848fec556bc
Parents: f55751b
Author: Eric C. Newton <er...@gmail.com>
Authored: Wed May 6 10:39:57 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 7 13:18:51 2015 -0400
----------------------------------------------------------------------
.../apache/accumulo/tserver/log/DfsLogger.java | 14 +++++++--
.../tserver/log/TabletServerLogger.java | 32 +++++++++++++++++---
2 files changed, 40 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/931bf897/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index cd7ce08..bf60d45 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -369,6 +369,14 @@ public class DfsLogger implements Comparable<DfsLogger> {
return new DFSLoggerInputStreams(input, decryptingInput);
}
+ /**
+ * Opens a Write-Ahead Log file and writes the necessary header information and OPEN entry to the file.
+ * The file is ready to be used for ingest if this method returns successfully. If an exception is thrown
+ * from this method, it is the callers responsibility to ensure that {@link #close()} is called to prevent
+ * leaking the file handle and/or syncing thread.
+ *
+ * @param address The address of the host using this WAL
+ */
public synchronized void open(String address) throws IOException {
String filename = UUID.randomUUID().toString();
log.debug("Address is " + address);
@@ -381,6 +389,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
+ Path.SEPARATOR + filename;
metaReference = toString();
+ LoggerOperation op = null;
try {
short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
if (replication == 0)
@@ -430,8 +439,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
key.event = OPEN;
key.tserverSession = filename;
key.filename = filename;
- write(key, EMPTY);
- log.debug("Got new write-ahead log: " + this);
+ op = logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), Durability.SYNC);
} catch (Exception ex) {
if (logFile != null)
logFile.close();
@@ -443,6 +451,8 @@ public class DfsLogger implements Comparable<DfsLogger> {
syncThread = new Daemon(new LoggingRunnable(log, new LogSyncingTask()));
syncThread.setName("Accumulo WALog thread " + toString());
syncThread.start();
+ op.await();
+ log.debug("Got new write-ahead log: " + this);
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/931bf897/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index a2ab551..d468695 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -52,6 +52,7 @@ import org.apache.accumulo.tserver.Mutations;
import org.apache.accumulo.tserver.TabletMutations;
import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;
+import org.apache.accumulo.tserver.log.DfsLogger.ServerResources;
import org.apache.accumulo.tserver.tablet.CommitSession;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -237,17 +238,40 @@ public class TabletServerLogger {
nextLogMaker.submit(new Runnable() {
@Override
public void run() {
+ final ServerResources conf = tserver.getServerConfig();
+ final VolumeManager fs = conf.getFileSystem();
while (!nextLogMaker.isShutdown()) {
+ DfsLogger alog = null;
try {
log.debug("Creating next WAL");
- DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
+ alog = new DfsLogger(conf, syncCounter, flushCounter);
alog.open(tserver.getClientAddressString());
- log.debug("Created next WAL " + alog.getFileName());
+ String fileName = alog.getFileName();
+ log.debug("Created next WAL " + fileName);
while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) {
- log.info("Our WAL was not used for 12 hours: " + alog.getFileName());
+ log.info("Our WAL was not used for 12 hours: " + fileName);
}
} catch (Exception t) {
- log.error("{}", t.getMessage(), t);
+ log.error("Failed to open WAL", t);
+ if (null != alog) {
+ // It's possible that the sync of the header and OPEN record to the WAL failed
+ // We want to make sure that clean up the resources/thread inside the DfsLogger
+ // object before trying to create a new one.
+ try {
+ alog.close();
+ } catch (IOException e) {
+ log.error("Failed to close WAL after it failed to open", e);
+ }
+ // Try to avoid leaving a bunch of empty WALs lying around
+ try {
+ Path path = alog.getPath();
+ if (fs.exists(path)) {
+ fs.delete(path);
+ }
+ } catch (IOException e) {
+ log.warn("Failed to delete a WAL that failed to open", e);
+ }
+ }
try {
nextLog.offer(t, 12, TimeUnit.HOURS);
} catch (InterruptedException ex) {
[2/3] accumulo git commit: ACCUMULO-3775 always sync the OPEN record
Posted by el...@apache.org.
ACCUMULO-3775 always sync the OPEN record
Signed-off-by: Josh Elser <el...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/931bf897
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/931bf897
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/931bf897
Branch: refs/heads/master
Commit: 931bf897e337756a26064039ca86b848fec556bc
Parents: f55751b
Author: Eric C. Newton <er...@gmail.com>
Authored: Wed May 6 10:39:57 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 7 13:18:51 2015 -0400
----------------------------------------------------------------------
.../apache/accumulo/tserver/log/DfsLogger.java | 14 +++++++--
.../tserver/log/TabletServerLogger.java | 32 +++++++++++++++++---
2 files changed, 40 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/931bf897/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index cd7ce08..bf60d45 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -369,6 +369,14 @@ public class DfsLogger implements Comparable<DfsLogger> {
return new DFSLoggerInputStreams(input, decryptingInput);
}
+ /**
+ * Opens a Write-Ahead Log file and writes the necessary header information and OPEN entry to the file.
+ * The file is ready to be used for ingest if this method returns successfully. If an exception is thrown
+ * from this method, it is the callers responsibility to ensure that {@link #close()} is called to prevent
+ * leaking the file handle and/or syncing thread.
+ *
+ * @param address The address of the host using this WAL
+ */
public synchronized void open(String address) throws IOException {
String filename = UUID.randomUUID().toString();
log.debug("Address is " + address);
@@ -381,6 +389,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
+ Path.SEPARATOR + filename;
metaReference = toString();
+ LoggerOperation op = null;
try {
short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
if (replication == 0)
@@ -430,8 +439,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
key.event = OPEN;
key.tserverSession = filename;
key.filename = filename;
- write(key, EMPTY);
- log.debug("Got new write-ahead log: " + this);
+ op = logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), Durability.SYNC);
} catch (Exception ex) {
if (logFile != null)
logFile.close();
@@ -443,6 +451,8 @@ public class DfsLogger implements Comparable<DfsLogger> {
syncThread = new Daemon(new LoggingRunnable(log, new LogSyncingTask()));
syncThread.setName("Accumulo WALog thread " + toString());
syncThread.start();
+ op.await();
+ log.debug("Got new write-ahead log: " + this);
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/931bf897/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index a2ab551..d468695 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -52,6 +52,7 @@ import org.apache.accumulo.tserver.Mutations;
import org.apache.accumulo.tserver.TabletMutations;
import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;
+import org.apache.accumulo.tserver.log.DfsLogger.ServerResources;
import org.apache.accumulo.tserver.tablet.CommitSession;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -237,17 +238,40 @@ public class TabletServerLogger {
nextLogMaker.submit(new Runnable() {
@Override
public void run() {
+ final ServerResources conf = tserver.getServerConfig();
+ final VolumeManager fs = conf.getFileSystem();
while (!nextLogMaker.isShutdown()) {
+ DfsLogger alog = null;
try {
log.debug("Creating next WAL");
- DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
+ alog = new DfsLogger(conf, syncCounter, flushCounter);
alog.open(tserver.getClientAddressString());
- log.debug("Created next WAL " + alog.getFileName());
+ String fileName = alog.getFileName();
+ log.debug("Created next WAL " + fileName);
while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) {
- log.info("Our WAL was not used for 12 hours: " + alog.getFileName());
+ log.info("Our WAL was not used for 12 hours: " + fileName);
}
} catch (Exception t) {
- log.error("{}", t.getMessage(), t);
+ log.error("Failed to open WAL", t);
+ if (null != alog) {
+ // It's possible that the sync of the header and OPEN record to the WAL failed
+ // We want to make sure that clean up the resources/thread inside the DfsLogger
+ // object before trying to create a new one.
+ try {
+ alog.close();
+ } catch (IOException e) {
+ log.error("Failed to close WAL after it failed to open", e);
+ }
+ // Try to avoid leaving a bunch of empty WALs lying around
+ try {
+ Path path = alog.getPath();
+ if (fs.exists(path)) {
+ fs.delete(path);
+ }
+ } catch (IOException e) {
+ log.warn("Failed to delete a WAL that failed to open", e);
+ }
+ }
try {
nextLog.offer(t, 12, TimeUnit.HOURS);
} catch (InterruptedException ex) {
[3/3] accumulo git commit: Merge branch '1.7'
Posted by el...@apache.org.
Merge branch '1.7'
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6acfd414
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6acfd414
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6acfd414
Branch: refs/heads/master
Commit: 6acfd4149bcc3bebdd2245771f6ceffa9b3d601f
Parents: 1343003 931bf89
Author: Josh Elser <el...@apache.org>
Authored: Thu May 7 13:57:52 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 7 13:57:52 2015 -0400
----------------------------------------------------------------------
.../apache/accumulo/tserver/log/DfsLogger.java | 14 +++++++--
.../tserver/log/TabletServerLogger.java | 32 +++++++++++++++++---
2 files changed, 40 insertions(+), 6 deletions(-)
----------------------------------------------------------------------