You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2014/06/12 22:06:52 UTC
[2/4] git commit: ACCUMULO-2899 Correctly handle older internals.
ACCUMULO-2899 Correctly handle older internals.
Make sure that we behave correctly when our cluster has been upgraded 1.4 -> 1.5 -> 1.6
* dfsloggers need to track the cq used to create them so they can be used to issue putDeletes
* rewrite 1.4 WAL paths to the top level wal directory
* adjust recovery path to compensate for 1.4 recovered wals
* fix GC to properly handle upgraded WALs
Author: Mike Drob <md...@cloudera.com>
Author: Sean Busbey <bu...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/31aea2ad
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/31aea2ad
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/31aea2ad
Branch: refs/heads/master
Commit: 31aea2ad89b9b988995a9b8f9096091788eead74
Parents: 1b170e8
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed May 14 15:57:27 2014 -0400
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Thu Jun 12 14:46:34 2014 -0400
----------------------------------------------------------------------
.../accumulo/server/fs/VolumeManagerImpl.java | 11 ++++++++--
.../server/master/recovery/RecoveryPath.java | 7 +++++--
.../gc/GarbageCollectWriteAheadLogs.java | 8 ++++++--
.../gc/GarbageCollectWriteAheadLogsTest.java | 13 +++++++++---
.../org/apache/accumulo/tserver/Tablet.java | 13 +++++++++---
.../apache/accumulo/tserver/log/DfsLogger.java | 21 +++++++++++++++++++-
6 files changed, 60 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/31aea2ad/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index 5c1194a..d4a2d4f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -529,8 +529,15 @@ public class VolumeManagerImpl implements VolumeManager {
@Override
public Path getFullPath(FileType fileType, String path) {
- if (path.contains(":"))
- return new Path(path);
+ int colon = path.indexOf(':');
+ if (colon > -1) {
+ // Check if this is really an absolute path or if this is a 1.4 style relative path for a WAL
+ if (fileType == FileType.WAL && path.charAt(colon + 1) != '/') {
+ path = path.substring(path.indexOf('/'));
+ } else {
+ return new Path(path);
+ }
+ }
// normalize the path
Path fullPath = new Path(defaultVolume.getBasePath(), fileType.getDirectory());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/31aea2ad/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java
index 1da945d..4a6638a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java
@@ -34,8 +34,11 @@ public class RecoveryPath {
String uuid = walPath.getName();
// drop uuid
walPath = walPath.getParent();
- // drop server
- walPath = walPath.getParent();
+ // recovered 1.4 WALs won't have a server component
+ if (!walPath.getName().equals(FileType.WAL.getDirectory())) {
+ // drop server
+ walPath = walPath.getParent();
+ }
if (!walPath.getName().equals(FileType.WAL.getDirectory()))
throw new IllegalArgumentException("Bad path " + walPath);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/31aea2ad/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index ae850af..56a0fd5 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -281,7 +281,9 @@ public class GarbageCollectWriteAheadLogs {
while (iterator.hasNext()) {
for (String entry : iterator.next().logSet) {
- String uuid = new Path(entry).getName();
+ // old style WALs will have the IP:Port of their logger and new style will either be a Path either absolute or relative, in all cases
+ // the last "/" will mark a UUID file name.
+ String uuid = entry.substring(entry.lastIndexOf("/") + 1);
if (!isUUID(uuid)) {
// fully expect this to be a uuid, if its not then something is wrong and walog GC should not proceed!
throw new IllegalArgumentException("Expected uuid, but got " + uuid + " from " + entry);
@@ -327,8 +329,8 @@ public class GarbageCollectWriteAheadLogs {
continue;
for (FileStatus status : listing) {
String server = status.getPath().getName();
- servers.add(server);
if (status.isDir()) {
+ servers.add(server);
for (FileStatus file : fs.listStatus(new Path(walRoot, server))) {
if (isUUID(file.getPath().getName())) {
fileToServerMap.put(file.getPath(), server);
@@ -339,7 +341,9 @@ public class GarbageCollectWriteAheadLogs {
}
} else if (isUUID(server)) {
// old-style WAL are not under a directory
+ servers.add("");
fileToServerMap.put(status.getPath(), "");
+ nameToFileMap.put(server, status.getPath());
} else {
log.info("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/31aea2ad/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index f90b965..ce1f026 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -205,18 +205,25 @@ public class GarbageCollectWriteAheadLogsTest {
Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap);
/*
+ * Expect only a single server, the non-server entry for upgrade WALs
+ */
+ assertEquals(1, count);
+ /*
* Expected fileToServerMap:
* /dir1/uuid1 -> ""
* /dir3/uuid3 -> ""
*/
- assertEquals(2, count);
assertEquals(2, fileToServerMap.size());
assertEquals("", fileToServerMap.get(serverFile1Path));
assertEquals("", fileToServerMap.get(serverFile3Path));
/*
- * Expected nameToFileMap: empty
+ * Expected nameToFileMap:
+ * uuid1 -> /dir1/uuid1
+ * uuid3 -> /dir3/uuid3
*/
- assertEquals(0, nameToFileMap.size());
+ assertEquals(2, nameToFileMap.size());
+ assertEquals(serverFile1Path, nameToFileMap.get(UUID1));
+ assertEquals(serverFile3Path, nameToFileMap.get(UUID3));
}
@Test
http://git-wip-us.apache.org/repos/asf/accumulo/blob/31aea2ad/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
index f73d4ca..36b2289 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
@@ -1357,6 +1357,8 @@ public class Tablet {
tabletResources.setTablet(this, acuTableConf);
if (!logEntries.isEmpty()) {
log.info("Starting Write-Ahead Log recovery for " + this.extent);
+ // count[0] = entries used on tablet
+ // count[1] = track max time from walog entries wihtout timestamps
final long[] count = new long[2];
final CommitSession commitSession = tabletMemory.getCommitSession();
count[1] = Long.MIN_VALUE;
@@ -1388,6 +1390,7 @@ public class Tablet {
commitSession.updateMaxCommittedTime(tabletTime.getTime());
if (count[0] == 0) {
+ log.debug("No replayed mutations applied, removing unused entries for " + extent);
MetadataTableUtil.removeUnusedWALEntries(extent, logEntries, tabletServer.getLock());
logEntries.clear();
}
@@ -1403,7 +1406,7 @@ public class Tablet {
currentLogs = new HashSet<DfsLogger>();
for (LogEntry logEntry : logEntries) {
for (String log : logEntry.logSet) {
- currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), log));
+ currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), log, logEntry.getColumnQualifier().toString()));
}
}
@@ -3661,12 +3664,12 @@ public class Tablet {
for (DfsLogger logger : otherLogs) {
otherLogsCopy.add(logger.toString());
- doomed.add(logger.toString());
+ doomed.add(logger.getMeta());
}
for (DfsLogger logger : currentLogs) {
currentLogsCopy.add(logger.toString());
- doomed.remove(logger.toString());
+ doomed.remove(logger.getMeta());
}
otherLogs = Collections.emptySet();
@@ -3684,6 +3687,10 @@ public class Tablet {
log.debug("Logs for current memory: " + getExtent() + " " + logger);
}
+ for (String logger : doomed) {
+ log.debug("Logs to be destroyed: " + getExtent() + " " + logger);
+ }
+
return doomed;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/31aea2ad/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index cca2953..b152380 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -220,13 +220,21 @@ public class DfsLogger {
private String logPath;
private Daemon syncThread;
+ /* Track what's actually in +r/!0 for this logger ref */
+ private String metaReference;
+
public DfsLogger(ServerResources conf) throws IOException {
this.conf = conf;
}
- public DfsLogger(ServerResources conf, String filename) throws IOException {
+ /**
+ * Refernce a pre-existing log file.
+ * @param meta the cq for the "log" entry in +r/!0
+ */
+ public DfsLogger(ServerResources conf, String filename, String meta) throws IOException {
this.conf = conf;
this.logPath = filename;
+ metaReference = meta;
}
public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager fs, Path path, AccumuloConfiguration conf) throws IOException {
@@ -315,6 +323,7 @@ public class DfsLogger {
VolumeManager fs = conf.getFileSystem();
logPath = fs.choose(ServerConstants.getWalDirs()) + "/" + logger + "/" + filename;
+ metaReference = toString();
try {
short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
if (replication == 0)
@@ -400,6 +409,16 @@ public class DfsLogger {
return fileName;
}
+ /**
+ * get the cq needed to reference this logger's entry in +r/!0
+ */
+ public String getMeta() {
+ if (null == metaReference) {
+ throw new IllegalStateException("logger doesn't have meta reference. " + this);
+ }
+ return metaReference;
+ }
+
public String getFileName() {
return logPath.toString();
}