You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/14 06:35:56 UTC

[15/50] [abbrv] git commit: ACCUMULO-2899 Correctly handle older internals.

ACCUMULO-2899 Correctly handle older internals.

Make sure that we behave correctly when our cluster has been upgraded 1.4 -> 1.5 -> 1.6

* dfsloggers need to track the cq used to create them so they can be used to issue putDeletes
* rewrite 1.4 WAL paths to the top level wal directory
* adjust recovery path to compensate for 1.4 recovered wals
* fix GC to properly handle upgraded WALs

Author: Mike Drob <md...@cloudera.com>
Author: Sean Busbey <bu...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/31aea2ad
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/31aea2ad
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/31aea2ad

Branch: refs/heads/ACCUMULO-378
Commit: 31aea2ad89b9b988995a9b8f9096091788eead74
Parents: 1b170e8
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed May 14 15:57:27 2014 -0400
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Thu Jun 12 14:46:34 2014 -0400

----------------------------------------------------------------------
 .../accumulo/server/fs/VolumeManagerImpl.java   | 11 ++++++++--
 .../server/master/recovery/RecoveryPath.java    |  7 +++++--
 .../gc/GarbageCollectWriteAheadLogs.java        |  8 ++++++--
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 13 +++++++++---
 .../org/apache/accumulo/tserver/Tablet.java     | 13 +++++++++---
 .../apache/accumulo/tserver/log/DfsLogger.java  | 21 +++++++++++++++++++-
 6 files changed, 60 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/31aea2ad/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index 5c1194a..d4a2d4f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -529,8 +529,15 @@ public class VolumeManagerImpl implements VolumeManager {
 
   @Override
   public Path getFullPath(FileType fileType, String path) {
-    if (path.contains(":"))
-      return new Path(path);
+    int colon = path.indexOf(':');
+    if (colon > -1) {
+      // Check if this is really an absolute path or if this is a 1.4 style relative path for a WAL
+      if (fileType == FileType.WAL && path.charAt(colon + 1) != '/') {
+        path = path.substring(path.indexOf('/'));
+      } else {
+        return new Path(path);
+      }
+    }
 
     // normalize the path
     Path fullPath = new Path(defaultVolume.getBasePath(), fileType.getDirectory());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/31aea2ad/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java
index 1da945d..4a6638a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryPath.java
@@ -34,8 +34,11 @@ public class RecoveryPath {
       String uuid = walPath.getName();
       // drop uuid
       walPath = walPath.getParent();
-      // drop server
-      walPath = walPath.getParent();
+      // recovered 1.4 WALs won't have a server component
+      if (!walPath.getName().equals(FileType.WAL.getDirectory())) {
+        // drop server
+        walPath = walPath.getParent();
+      }
   
       if (!walPath.getName().equals(FileType.WAL.getDirectory()))
         throw new IllegalArgumentException("Bad path " + walPath);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/31aea2ad/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index ae850af..56a0fd5 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -281,7 +281,9 @@ public class GarbageCollectWriteAheadLogs {
 
     while (iterator.hasNext()) {
       for (String entry : iterator.next().logSet) {
-        String uuid = new Path(entry).getName();
+        // old style WALs will have the IP:Port of their logger and new style will either be a Path either absolute or relative, in all cases
+        // the last "/" will mark a UUID file name.
+        String uuid = entry.substring(entry.lastIndexOf("/") + 1);
         if (!isUUID(uuid)) {
           // fully expect this to be a uuid, if its not then something is wrong and walog GC should not proceed!
           throw new IllegalArgumentException("Expected uuid, but got " + uuid + " from " + entry);
@@ -327,8 +329,8 @@ public class GarbageCollectWriteAheadLogs {
         continue;
       for (FileStatus status : listing) {
         String server = status.getPath().getName();
-        servers.add(server);
         if (status.isDir()) {
+          servers.add(server);
           for (FileStatus file : fs.listStatus(new Path(walRoot, server))) {
             if (isUUID(file.getPath().getName())) {
               fileToServerMap.put(file.getPath(), server);
@@ -339,7 +341,9 @@ public class GarbageCollectWriteAheadLogs {
           }
         } else if (isUUID(server)) {
           // old-style WAL are not under a directory
+          servers.add("");
           fileToServerMap.put(status.getPath(), "");
+          nameToFileMap.put(server, status.getPath());
         } else {
           log.info("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
         }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/31aea2ad/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index f90b965..ce1f026 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -205,18 +205,25 @@ public class GarbageCollectWriteAheadLogsTest {
     Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
     int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap);
     /*
+     * Expect only a single server, the non-server entry for upgrade WALs
+     */
+    assertEquals(1, count);
+    /*
      * Expected fileToServerMap:
      * /dir1/uuid1 -> ""
      * /dir3/uuid3 -> ""
      */
-    assertEquals(2, count);
     assertEquals(2, fileToServerMap.size());
     assertEquals("", fileToServerMap.get(serverFile1Path));
     assertEquals("", fileToServerMap.get(serverFile3Path));
     /*
-     * Expected nameToFileMap: empty
+     * Expected nameToFileMap:
+     * uuid1 -> /dir1/uuid1
+     * uuid3 -> /dir3/uuid3
      */
-    assertEquals(0, nameToFileMap.size());
+    assertEquals(2, nameToFileMap.size());
+    assertEquals(serverFile1Path, nameToFileMap.get(UUID1));
+    assertEquals(serverFile3Path, nameToFileMap.get(UUID3));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/accumulo/blob/31aea2ad/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
index f73d4ca..36b2289 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
@@ -1357,6 +1357,8 @@ public class Tablet {
     tabletResources.setTablet(this, acuTableConf);
     if (!logEntries.isEmpty()) {
       log.info("Starting Write-Ahead Log recovery for " + this.extent);
+      // count[0] = entries used on tablet
+      // count[1] = track max time from walog entries wihtout timestamps
       final long[] count = new long[2];
       final CommitSession commitSession = tabletMemory.getCommitSession();
       count[1] = Long.MIN_VALUE;
@@ -1388,6 +1390,7 @@ public class Tablet {
         commitSession.updateMaxCommittedTime(tabletTime.getTime());
 
         if (count[0] == 0) {
+          log.debug("No replayed mutations applied, removing unused entries for " + extent);
           MetadataTableUtil.removeUnusedWALEntries(extent, logEntries, tabletServer.getLock());
           logEntries.clear();
         }
@@ -1403,7 +1406,7 @@ public class Tablet {
       currentLogs = new HashSet<DfsLogger>();
       for (LogEntry logEntry : logEntries) {
         for (String log : logEntry.logSet) {
-          currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), log));
+          currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), log, logEntry.getColumnQualifier().toString()));
         }
       }
 
@@ -3661,12 +3664,12 @@ public class Tablet {
 
       for (DfsLogger logger : otherLogs) {
         otherLogsCopy.add(logger.toString());
-        doomed.add(logger.toString());
+        doomed.add(logger.getMeta());
       }
 
       for (DfsLogger logger : currentLogs) {
         currentLogsCopy.add(logger.toString());
-        doomed.remove(logger.toString());
+        doomed.remove(logger.getMeta());
       }
 
       otherLogs = Collections.emptySet();
@@ -3684,6 +3687,10 @@ public class Tablet {
       log.debug("Logs for current memory: " + getExtent() + " " + logger);
     }
 
+    for (String logger : doomed) {
+      log.debug("Logs to be destroyed: " + getExtent() + " " + logger);
+    }
+
     return doomed;
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/31aea2ad/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index cca2953..b152380 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -220,13 +220,21 @@ public class DfsLogger {
   private String logPath;
   private Daemon syncThread;
 
+  /* Track what's actually in +r/!0 for this logger ref */
+  private String metaReference;
+
   public DfsLogger(ServerResources conf) throws IOException {
     this.conf = conf;
   }
 
-  public DfsLogger(ServerResources conf, String filename) throws IOException {
+  /**
+   * Refernce a pre-existing log file.
+   * @param meta the cq for the "log" entry in +r/!0
+   */
+  public DfsLogger(ServerResources conf, String filename, String meta) throws IOException {
     this.conf = conf;
     this.logPath = filename;
+    metaReference = meta;
   }
 
   public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager fs, Path path, AccumuloConfiguration conf) throws IOException {
@@ -315,6 +323,7 @@ public class DfsLogger {
     VolumeManager fs = conf.getFileSystem();
 
     logPath = fs.choose(ServerConstants.getWalDirs()) + "/" + logger + "/" + filename;
+    metaReference = toString();
     try {
       short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
       if (replication == 0)
@@ -400,6 +409,16 @@ public class DfsLogger {
     return fileName;
   }
 
+  /**
+   * get the cq needed to reference this logger's entry in +r/!0
+   */
+  public String getMeta() {
+    if (null == metaReference) {
+      throw new IllegalStateException("logger doesn't have meta reference. " + this);
+    }
+    return metaReference;
+  }
+
   public String getFileName() {
     return logPath.toString();
   }