You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2018/05/04 14:35:51 UTC

[GitHub] keith-turner closed pull request #444: fixes #432 Made GC clean up recovery logs

keith-turner closed pull request #444: fixes #432 Made GC clean up recovery logs
URL: https://github.com/apache/accumulo/pull/444
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index d2acb58084..405a7a0f86 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -47,6 +47,7 @@
 import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.log.WalStateManager;
 import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
@@ -59,6 +60,7 @@
 import org.apache.accumulo.server.master.state.TabletLocationState;
 import org.apache.accumulo.server.master.state.TabletState;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
@@ -142,6 +144,8 @@ public void collect(GCStatus status) {
     try {
       status.currentLog.started = System.currentTimeMillis();
 
+      Map<UUID,Path> recoveryLogs = getSortedWALogs();
+
       Map<TServerInstance,Set<UUID>> logsByServer = new HashMap<>();
       Map<UUID,Pair<WalState,Path>> logsState = new HashMap<>();
       // Scan for log file info first: the order is important
@@ -164,7 +168,7 @@ public void collect(GCStatus status) {
       Map<UUID,TServerInstance> uuidToTServer;
       span = Trace.start("removeEntriesInUse");
       try {
-        uuidToTServer = removeEntriesInUse(logsByServer, currentServers, logsState);
+        uuidToTServer = removeEntriesInUse(logsByServer, currentServers, logsState, recoveryLogs);
         count = uuidToTServer.size();
       } catch (Exception ex) {
         log.error("Unable to scan metadata table", ex);
@@ -199,6 +203,9 @@ public void collect(GCStatus status) {
       long removeStop = System.currentTimeMillis();
       log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count,
           logsByServer.size(), (removeStop - logEntryScanStop) / 1000.));
+
+      count = removeFiles(recoveryLogs.values());
+      log.info("{} recovery logs removed", count);
       span.stop();
 
       span = Trace.start("removeMarkers");
@@ -244,36 +251,53 @@ private long removeTabletServerMarkers(Map<UUID,TServerInstance> uidMap,
     return result;
   }
 
+  private long removeFile(Path path) {
+    try {
+      if (!useTrash || !fs.moveToTrash(path)) {
+        fs.deleteRecursively(path);
+      }
+      return 1;
+    } catch (FileNotFoundException ex) {
+      log.debug("Attempted to delete WAL {} that did not exists : {}", path, ex.getMessage());
+    } catch (IOException ex) {
+      log.error("Unable to delete wal {}", path, ex);
+    }
+
+    return 0;
+  }
+
   private long removeFiles(Collection<Pair<WalState,Path>> collection, final GCStatus status) {
     for (Pair<WalState,Path> stateFile : collection) {
       Path path = stateFile.getSecond();
-      log.debug("Removing " + stateFile.getFirst() + " WAL " + path);
-      try {
-        if (!useTrash || !fs.moveToTrash(path)) {
-          fs.deleteRecursively(path);
-        }
-        status.currentLog.deleted++;
-      } catch (FileNotFoundException ex) {
-        // ignored
-      } catch (IOException ex) {
-        log.error("Unable to delete wal " + path + ": " + ex);
-      }
+      log.debug("Removing {} WAL {}", stateFile.getFirst(), path);
+      status.currentLog.deleted += removeFile(path);
     }
     return status.currentLog.deleted;
   }
 
+  private long removeFiles(Collection<Path> values) {
+    long count = 0;
+    for (Path path : values) {
+      log.debug("Removing recovery log {}", path);
+      count += removeFile(path);
+    }
+    return count;
+  }
+
   private UUID path2uuid(Path path) {
     return UUID.fromString(path.getName());
   }
 
   private Map<UUID,TServerInstance> removeEntriesInUse(Map<TServerInstance,Set<UUID>> candidates,
-      Set<TServerInstance> liveServers, Map<UUID,Pair<WalState,Path>> logsState)
-      throws IOException, KeeperException, InterruptedException {
+      Set<TServerInstance> liveServers, Map<UUID,Pair<WalState,Path>> logsState,
+      Map<UUID,Path> recoveryLogs) throws IOException, KeeperException, InterruptedException {
 
     Map<UUID,TServerInstance> result = new HashMap<>();
     for (Entry<TServerInstance,Set<UUID>> entry : candidates.entrySet()) {
       for (UUID id : entry.getValue()) {
-        result.put(id, entry.getKey());
+        if (result.put(id, entry.getKey()) != null) {
+          throw new IllegalArgumentException("WAL " + id + " owned by multiple tservers");
+        }
       }
     }
 
@@ -287,9 +311,8 @@ private UUID path2uuid(Path path) {
       if (state.getState(liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) {
         Set<UUID> idsToIgnore = candidates.remove(state.current);
         if (idsToIgnore != null) {
-          for (UUID id : idsToIgnore) {
-            result.remove(id);
-          }
+          result.keySet().removeAll(idsToIgnore);
+          recoveryLogs.keySet().removeAll(idsToIgnore);
         }
       }
       // Tablet is being recovered and has WAL references, remove all the WALs for the dead server
@@ -301,9 +324,8 @@ private UUID path2uuid(Path path) {
           // There's a reference to a log file, so skip that server's logs
           Set<UUID> idsToIgnore = candidates.remove(dead);
           if (idsToIgnore != null) {
-            for (UUID id : idsToIgnore) {
-              result.remove(id);
-            }
+            result.keySet().removeAll(idsToIgnore);
+            recoveryLogs.keySet().removeAll(idsToIgnore);
           }
         }
       }
@@ -320,6 +342,8 @@ private UUID path2uuid(Path path) {
             result.remove(id);
           }
         }
+
+        recoveryLogs.keySet().removeAll(idsForServer);
       }
     }
     return result;
@@ -384,4 +408,29 @@ private long getCurrent(Map<TServerInstance,Set<UUID>> logsByServer,
     }
     return result;
   }
+
+  /**
+   * Looks for write-ahead logs in recovery directories.
+   *
+   * @return map of log uuids to paths
+   */
+  protected Map<UUID,Path> getSortedWALogs() throws IOException {
+    Map<UUID,Path> result = new HashMap<>();
+
+    for (String dir : ServerConstants.getRecoveryDirs()) {
+      Path recoveryDir = new Path(dir);
+      if (fs.exists(recoveryDir)) {
+        for (FileStatus status : fs.listStatus(recoveryDir)) {
+          try {
+            UUID logId = path2uuid(status.getPath());
+            result.put(logId, status.getPath());
+          } catch (IllegalArgumentException iae) {
+            log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
+          }
+
+        }
+      }
+    }
+    return result;
+  }
 }
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 556a33671a..58c9089587 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -106,6 +106,11 @@ protected int removeReplicationEntries(Map<UUID,TServerInstance> candidates)
           throws IOException, KeeperException, InterruptedException {
         return 0;
       }
+
+      @Override
+      protected Map<UUID,Path> getSortedWALogs() throws IOException {
+        return Collections.emptyMap();
+      }
     };
     gc.collect(status);
     EasyMock.verify(context, fs, marker, tserverSet);
@@ -131,6 +136,11 @@ protected int removeReplicationEntries(Map<UUID,TServerInstance> candidates)
           throws IOException, KeeperException, InterruptedException {
         return 0;
       }
+
+      @Override
+      protected Map<UUID,Path> getSortedWALogs() throws IOException {
+        return Collections.emptyMap();
+      }
     };
     gc.collect(status);
     EasyMock.verify(context, marker, tserverSet, fs);
@@ -172,7 +182,12 @@ public void deleteUnreferenceLogOnDeadServer() throws Exception {
     EasyMock.expectLastCall().once();
     EasyMock.replay(context, fs, marker, tserverSet, conn, rscanner, mscanner);
     GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false,
-        tserverSet, marker, tabletOnServer1List);
+        tserverSet, marker, tabletOnServer1List) {
+      @Override
+      protected Map<UUID,Path> getSortedWALogs() throws IOException {
+        return Collections.emptyMap();
+      }
+    };
     gc.collect(status);
     EasyMock.verify(context, fs, marker, tserverSet, conn, rscanner, mscanner);
   }
@@ -208,7 +223,12 @@ public void ignoreReferenceLogOnDeadServer() throws Exception {
     EasyMock.expect(mscanner.iterator()).andReturn(emptyKV);
     EasyMock.replay(context, fs, marker, tserverSet, conn, rscanner, mscanner);
     GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false,
-        tserverSet, marker, tabletOnServer2List);
+        tserverSet, marker, tabletOnServer2List) {
+      @Override
+      protected Map<UUID,Path> getSortedWALogs() throws IOException {
+        return Collections.emptyMap();
+      }
+    };
     gc.collect(status);
     EasyMock.verify(context, fs, marker, tserverSet, conn, rscanner, mscanner);
   }
@@ -250,7 +270,12 @@ public void replicationDelaysFileCollection() throws Exception {
     EasyMock.expect(mscanner.iterator()).andReturn(replicationWork.entrySet().iterator());
     EasyMock.replay(context, fs, marker, tserverSet, conn, rscanner, mscanner);
     GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false,
-        tserverSet, marker, tabletOnServer1List);
+        tserverSet, marker, tabletOnServer1List) {
+      @Override
+      protected Map<UUID,Path> getSortedWALogs() throws IOException {
+        return Collections.emptyMap();
+      }
+    };
     gc.collect(status);
     EasyMock.verify(context, fs, marker, tserverSet, conn, rscanner, mscanner);
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services