You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/02/28 23:09:44 UTC
svn commit: r1451388 - in /accumulo/trunk: ./ assemble/ core/ examples/
fate/src/main/java/org/apache/accumulo/fate/
fate/src/main/java/org/apache/accumulo/fate/zookeeper/ server/
server/src/main/java/org/apache/accumulo/server/ server/src/main/java/or...
Author: kturner
Date: Thu Feb 28 22:09:43 2013
New Revision: 1451388
URL: http://svn.apache.org/r1451388
Log:
ACCUMULO-1126 Made AGC clean up unrefed sorted walogs. Also made it delete unrefed walogs of offline servers.
Modified:
accumulo/trunk/ (props changed)
accumulo/trunk/assemble/ (props changed)
accumulo/trunk/core/ (props changed)
accumulo/trunk/examples/ (props changed)
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java (props changed)
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java (props changed)
accumulo/trunk/server/ (props changed)
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/Accumulo.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
accumulo/trunk/src/ (props changed)
Propchange: accumulo/trunk/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5:r1451294-1451338,1451340-1451387
Propchange: accumulo/trunk/assemble/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/assemble:r1451294-1451338,1451340-1451387
Propchange: accumulo/trunk/core/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/core:r1451294-1451338,1451340-1451387
Propchange: accumulo/trunk/examples/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/examples:r1451294-1451338,1451340-1451387
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1451294-1451338,1451340-1451387
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1451294-1451338,1451340-1451387
Propchange: accumulo/trunk/server/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/server:r1451294-1451338,1451340-1451387
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/Accumulo.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/Accumulo.java?rev=1451388&r1=1451387&r2=1451388&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/Accumulo.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/Accumulo.java Thu Feb 28 22:09:43 2013
@@ -165,7 +165,7 @@ public class Accumulo {
log.error(t, t);
}
}
- }, 1000, 10 * 1000);
+ }, 1000, 10 * 60 * 1000);
}
public static String getLocalAddress(String[] args) throws UnknownHostException {
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java?rev=1451388&r1=1451387&r2=1451388&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java Thu Feb 28 22:09:43 2013
@@ -20,10 +20,12 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.UUID;
import org.apache.accumulo.core.Constants;
@@ -72,6 +74,9 @@ public class GarbageCollectWriteAheadLog
Span span = Trace.start("scanServers");
try {
+
+ Set<String> sortedWALogs = getSortedWALogs();
+
status.currentLog.started = System.currentTimeMillis();
Map<String,String> fileToServerMap = new HashMap<String,String>();
@@ -84,7 +89,7 @@ public class GarbageCollectWriteAheadLog
span = Trace.start("removeMetadataEntries");
try {
- count = removeMetadataEntries(fileToServerMap, status);
+ count = removeMetadataEntries(fileToServerMap, sortedWALogs, status);
} catch (Exception ex) {
log.error("Unable to scan metadata table", ex);
return;
@@ -98,7 +103,7 @@ public class GarbageCollectWriteAheadLog
span = Trace.start("removeFiles");
Map<String,ArrayList<String>> serverToFileMap = mapServersToFiles(fileToServerMap);
- count = removeFiles(serverToFileMap, status);
+ count = removeFiles(serverToFileMap, sortedWALogs, status);
long removeStop = System.currentTimeMillis();
log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, serverToFileMap.size(), (removeStop - logEntryScanStop) / 1000.));
@@ -126,7 +131,7 @@ public class GarbageCollectWriteAheadLog
}
}
- private int removeFiles(Map<String,ArrayList<String>> serverToFileMap, final GCStatus status) {
+ private int removeFiles(Map<String,ArrayList<String>> serverToFileMap, Set<String> sortedWALogs, final GCStatus status) {
AccumuloConfiguration conf = instance.getConfiguration();
for (Entry<String,ArrayList<String>> entry : serverToFileMap.entrySet()) {
if (entry.getKey().length() == 0) {
@@ -143,22 +148,50 @@ public class GarbageCollectWriteAheadLog
}
} else {
InetSocketAddress address = AddressUtil.parseAddress(entry.getKey(), Property.TSERV_CLIENTPORT);
- if (!holdsLock(address))
+ if (!holdsLock(address)) {
+ Path serverPath = new Path(Constants.getWalDirectory(conf), entry.getKey());
+ for (String filename : entry.getValue()) {
+ log.debug("Removing WAL for offline server " + filename);
+ try {
+ Path path = new Path(serverPath, filename);
+ if (trash == null || !trash.moveToTrash(path))
+ fs.delete(path, true);
+ } catch (IOException ex) {
+ log.error("Unable to delete wal " + filename + ": " + ex);
+ }
+ }
continue;
- Client tserver = null;
- try {
- tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
- tserver.removeLogs(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(), entry.getValue());
- log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
- status.currentLog.deleted += entry.getValue().size();
- } catch (TException e) {
- log.warn("Error talking to " + address + ": " + e);
- } finally {
- if (tserver != null)
- ThriftUtil.returnClient(tserver);
+ } else {
+ Client tserver = null;
+ try {
+ tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+ tserver.removeLogs(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(), entry.getValue());
+ log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
+ status.currentLog.deleted += entry.getValue().size();
+ } catch (TException e) {
+ log.warn("Error talking to " + address + ": " + e);
+ } finally {
+ if (tserver != null)
+ ThriftUtil.returnClient(tserver);
+ }
+ }
+ }
+ }
+
+ Path recoveryDir = new Path(Constants.getRecoveryDir(conf));
+
+ for (String sortedWALog : sortedWALogs) {
+ log.debug("Removing sorted WAL " + sortedWALog);
+ try {
+ Path swalog = new Path(recoveryDir, sortedWALog);
+ if (trash == null || !trash.moveToTrash(swalog)) {
+ fs.delete(swalog, true);
}
+ } catch (IOException ioe) {
+ log.error("Unable to delete sorted walog " + sortedWALogs + ": " + ioe);
}
}
+
return 0;
}
@@ -175,7 +208,8 @@ public class GarbageCollectWriteAheadLog
return serverToFileMap;
}
- private static int removeMetadataEntries(Map<String,String> fileToServerMap, GCStatus status) throws IOException, KeeperException, InterruptedException {
+ private static int removeMetadataEntries(Map<String,String> fileToServerMap, Set<String> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+ InterruptedException {
int count = 0;
Iterator<LogEntry> iterator = MetadataTable.getLogEntries(SecurityConstants.getSystemCredentials());
while (iterator.hasNext()) {
@@ -183,6 +217,9 @@ public class GarbageCollectWriteAheadLog
filename = filename.split("/", 2)[1];
if (fileToServerMap.remove(filename) != null)
status.currentLog.inUse++;
+
+ sortedWALogs.remove(filename);
+
count++;
}
}
@@ -214,6 +251,25 @@ public class GarbageCollectWriteAheadLog
return count;
}
+ private Set<String> getSortedWALogs() throws IOException {
+ AccumuloConfiguration conf = instance.getConfiguration();
+ Path recoveryDir = new Path(Constants.getRecoveryDir(conf));
+
+ Set<String> sortedWALogs = new HashSet<String>();
+
+ if (fs.exists(recoveryDir)) {
+ for (FileStatus status : fs.listStatus(recoveryDir)) {
+ if (isUUID(status.getPath().getName())) {
+ sortedWALogs.add(status.getPath().getName());
+ } else {
+ log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
+ }
+ }
+ }
+
+ return sortedWALogs;
+ }
+
/**
* @param name
* @return
Propchange: accumulo/trunk/src/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/src:r1451294-1451338,1451340-1451387