You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/04/25 01:21:15 UTC
[29/34] accumulo git commit: ACCUMULO-3423 optimize WAL metadata
table updates
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 1735c0d..9f537af 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -18,7 +18,7 @@ package org.apache.accumulo.gc;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -28,49 +28,56 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.gc.thrift.GcCycleStats;
import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
-import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
-import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
-import org.apache.accumulo.core.trace.Tracer;
-import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.master.LiveTServerSet;
+import org.apache.accumulo.server.master.LiveTServerSet.Listener;
+import org.apache.accumulo.server.master.state.MetaDataStateStore;
+import org.apache.accumulo.server.master.state.RootTabletStateStore;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletState;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
-import org.apache.accumulo.server.util.MetadataTableUtil;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.thrift.TException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.Text;
+import org.apache.htrace.Span;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
import com.google.common.net.HostAndPort;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -79,8 +86,8 @@ public class GarbageCollectWriteAheadLogs {
private final AccumuloServerContext context;
private final VolumeManager fs;
-
- private boolean useTrash;
+ private final boolean useTrash;
+ private final LiveTServerSet liveServers;
/**
* Creates a new GC WAL object.
@@ -96,56 +103,37 @@ public class GarbageCollectWriteAheadLogs {
this.context = context;
this.fs = fs;
this.useTrash = useTrash;
- }
-
- /**
- * Gets the instance used by this object.
- *
- * @return instance
- */
- Instance getInstance() {
- return context.getInstance();
- }
-
- /**
- * Gets the volume manager used by this object.
- *
- * @return volume manager
- */
- VolumeManager getVolumeManager() {
- return fs;
- }
-
- /**
- * Checks if the volume manager should move files to the trash rather than delete them.
- *
- * @return true if trash is used
- */
- boolean isUsingTrash() {
- return useTrash;
+ this.liveServers = new LiveTServerSet(context, new Listener() {
+ @Override
+ public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) {
+ log.debug("New tablet servers noticed: " + added);
+ log.debug("Tablet servers removed: " + deleted);
+ }
+ });
+ liveServers.startListeningForTabletServerChanges();
}
public void collect(GCStatus status) {
- Span span = Trace.start("scanServers");
+ Span span = Trace.start("getCandidates");
try {
+ Set<TServerInstance> currentServers = liveServers.getCurrentServers();
- Map<String,Path> sortedWALogs = getSortedWALogs();
status.currentLog.started = System.currentTimeMillis();
- Map<Path,String> fileToServerMap = new HashMap<Path,String>();
- Map<String,Path> nameToFileMap = new HashMap<String,Path>();
- int count = scanServers(fileToServerMap, nameToFileMap);
+ Map<TServerInstance, Set<Path> > candidates = new HashMap<>();
+ long count = getCurrent(candidates, currentServers);
long fileScanStop = System.currentTimeMillis();
- log.info(String.format("Fetched %d files from %d servers in %.2f seconds", fileToServerMap.size(), count,
+
+ log.info(String.format("Fetched %d files for %d servers in %.2f seconds", count, candidates.size(),
(fileScanStop - status.currentLog.started) / 1000.));
- status.currentLog.candidates = fileToServerMap.size();
+ status.currentLog.candidates = count;
span.stop();
- span = Trace.start("removeMetadataEntries");
+ span = Trace.start("removeEntriesInUse");
try {
- count = removeMetadataEntries(nameToFileMap, sortedWALogs, status);
+ count = removeEntriesInUse(candidates, status, currentServers);
} catch (Exception ex) {
log.error("Unable to scan metadata table", ex);
return;
@@ -158,7 +146,7 @@ public class GarbageCollectWriteAheadLogs {
span = Trace.start("removeReplicationEntries");
try {
- count = removeReplicationEntries(nameToFileMap, sortedWALogs, status);
+ count = removeReplicationEntries(candidates, status);
} catch (Exception ex) {
log.error("Unable to scan replication table", ex);
return;
@@ -170,16 +158,23 @@ public class GarbageCollectWriteAheadLogs {
log.info(String.format("%d replication entries scanned in %.2f seconds", count, (replicationEntryScanStop - logEntryScanStop) / 1000.));
span = Trace.start("removeFiles");
- Map<String,ArrayList<Path>> serverToFileMap = mapServersToFiles(fileToServerMap, nameToFileMap);
- count = removeFiles(nameToFileMap, serverToFileMap, sortedWALogs, status);
+ count = removeFiles(candidates, status);
long removeStop = System.currentTimeMillis();
- log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, serverToFileMap.size(), (removeStop - logEntryScanStop) / 1000.));
+ log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, candidates.size(), (removeStop - logEntryScanStop) / 1000.));
+ span.stop();
+
+ span = Trace.start("removeMarkers");
+ count = removeTabletServerMarkers(candidates);
+ long removeMarkersStop = System.currentTimeMillis();
+ log.info(String.format("%d markers removed in %.2f seconds", count, (removeMarkersStop - removeStop) / 1000.));
+ span.stop();
+
+
status.currentLog.finished = removeStop;
status.lastLog = status.currentLog;
status.currentLog = new GcCycleStats();
- span.stop();
} catch (Exception e) {
log.error("exception occured while garbage collecting write ahead logs", e);
@@ -188,161 +183,101 @@ public class GarbageCollectWriteAheadLogs {
}
}
- boolean holdsLock(HostAndPort addr) {
+ private long removeTabletServerMarkers(Map<TServerInstance,Set<Path>> candidates) {
+ long result = 0;
try {
- String zpath = ZooUtil.getRoot(context.getInstance()) + Constants.ZTSERVERS + "/" + addr.toString();
- List<String> children = ZooReaderWriter.getInstance().getChildren(zpath);
- return !(children == null || children.isEmpty());
- } catch (KeeperException.NoNodeException ex) {
- return false;
- } catch (Exception ex) {
- log.debug(ex.toString(), ex);
- return true;
- }
- }
-
- private int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>> serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
- for (Entry<String,ArrayList<Path>> entry : serverToFileMap.entrySet()) {
- if (entry.getKey().isEmpty()) {
- // old-style log entry, just remove it
- for (Path path : entry.getValue()) {
- log.debug("Removing old-style WAL " + path);
- try {
- if (!useTrash || !fs.moveToTrash(path))
- fs.deleteRecursively(path);
- status.currentLog.deleted++;
- } catch (FileNotFoundException ex) {
- // ignored
- } catch (IOException ex) {
- log.error("Unable to delete wal " + path + ": " + ex);
- }
- }
- } else {
- HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false);
- if (!holdsLock(address)) {
+ BatchWriter root = null;
+ BatchWriter meta = null;
+ try {
+ root = context.getConnector().createBatchWriter(RootTable.NAME, null);
+ meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
+ for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
+ Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
for (Path path : entry.getValue()) {
- log.debug("Removing WAL for offline server " + path);
- try {
- if (!useTrash || !fs.moveToTrash(path))
- fs.deleteRecursively(path);
- status.currentLog.deleted++;
- } catch (FileNotFoundException ex) {
- // ignored
- } catch (IOException ex) {
- log.error("Unable to delete wal " + path + ": " + ex);
- }
- }
- continue;
- } else {
- Client tserver = null;
- try {
- tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context);
- tserver.removeLogs(Tracer.traceInfo(), context.rpcCreds(), paths2strings(entry.getValue()));
- log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
- status.currentLog.deleted += entry.getValue().size();
- } catch (TException e) {
- log.warn("Error talking to " + address + ": " + e);
- } finally {
- if (tserver != null)
- ThriftUtil.returnClient(tserver);
+ m.putDelete(CurrentLogsSection.COLF, new Text(path.toString()));
+ result++;
}
+ root.addMutation(m);
+ meta.addMutation(m);
+ }
+ } finally {
+ if (meta != null) {
+ meta.close();
+ }
+ if (root != null) {
+ root.close();
}
}
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
}
+ return result;
+ }
- for (Path swalog : sortedWALogs.values()) {
- log.debug("Removing sorted WAL " + swalog);
- try {
- if (!useTrash || !fs.moveToTrash(swalog)) {
- fs.deleteRecursively(swalog);
- }
- } catch (FileNotFoundException ex) {
- // ignored
- } catch (IOException ioe) {
+ private long removeFiles(Map<TServerInstance, Set<Path> > candidates, final GCStatus status) {
+ for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
+ for (Path path : entry.getValue()) {
+ log.debug("Removing unused WAL for server " + entry.getKey() + " log " + path);
try {
- if (fs.exists(swalog)) {
- log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
- }
+ if (!useTrash || !fs.moveToTrash(path))
+ fs.deleteRecursively(path);
+ status.currentLog.deleted++;
+ } catch (FileNotFoundException ex) {
+ // ignored
} catch (IOException ex) {
- log.error("Unable to check for the existence of " + swalog, ex);
+ log.error("Unable to delete wal " + path + ": " + ex);
}
}
}
-
- return 0;
+ return status.currentLog.deleted;
}
- /**
- * Converts a list of paths to their corresponding strings.
- *
- * @param paths
- * list of paths
- * @return string forms of paths
- */
- static List<String> paths2strings(List<Path> paths) {
- List<String> result = new ArrayList<String>(paths.size());
- for (Path path : paths)
- result.add(path.toString());
- return result;
+ private UUID path2uuid(Path path) {
+ return UUID.fromString(path.getName());
}
- /**
- * Reverses the given mapping of file paths to servers. The returned map provides a list of file paths for each server. Any path whose name is not in the
- * mapping of file names to paths is skipped.
- *
- * @param fileToServerMap
- * map of file paths to servers
- * @param nameToFileMap
- * map of file names to paths
- * @return map of servers to lists of file paths
- */
- static Map<String,ArrayList<Path>> mapServersToFiles(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) {
- Map<String,ArrayList<Path>> result = new HashMap<String,ArrayList<Path>>();
- for (Entry<Path,String> fileServer : fileToServerMap.entrySet()) {
- if (!nameToFileMap.containsKey(fileServer.getKey().getName()))
- continue;
- ArrayList<Path> files = result.get(fileServer.getValue());
- if (files == null) {
- files = new ArrayList<Path>();
- result.put(fileServer.getValue(), files);
- }
- files.add(fileServer.getKey());
- }
- return result;
- }
-
- protected int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+ private long removeEntriesInUse(Map<TServerInstance, Set<Path> > candidates, GCStatus status, Set<TServerInstance> liveServers) throws IOException, KeeperException,
InterruptedException {
- int count = 0;
- Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(context);
-
- // For each WAL reference in the metadata table
- while (iterator.hasNext()) {
- // Each metadata reference has at least one WAL file
- for (String entry : iterator.next().logSet) {
- // old style WALs will have the IP:Port of their logger and new style will either be a Path either absolute or relative, in all cases
- // the last "/" will mark a UUID file name.
- String uuid = entry.substring(entry.lastIndexOf("/") + 1);
- if (!isUUID(uuid)) {
- // fully expect this to be a uuid, if its not then something is wrong and walog GC should not proceed!
- throw new IllegalArgumentException("Expected uuid, but got " + uuid + " from " + entry);
- }
- Path pathFromNN = nameToFileMap.remove(uuid);
- if (pathFromNN != null) {
- status.currentLog.inUse++;
- sortedWALogs.remove(uuid);
- }
+ // remove any entries if there's a log reference, or a tablet is still assigned to the dead server
- count++;
+ Map<UUID, TServerInstance> walToDeadServer = new HashMap<>();
+ for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
+ for (Path file : entry.getValue()) {
+ walToDeadServer.put(path2uuid(file), entry.getKey());
+ }
+ }
+ long count = 0;
+ RootTabletStateStore root = new RootTabletStateStore(context);
+ MetaDataStateStore meta = new MetaDataStateStore(context);
+ Iterator<TabletLocationState> states = Iterators.concat(root.iterator(), meta.iterator());
+ while (states.hasNext()) {
+ count++;
+ TabletLocationState state = states.next();
+ if (state.getState(liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) {
+ candidates.remove(state.current);
+ }
+ for (Collection<String> wals : state.walogs) {
+ for (String wal : wals) {
+ UUID walUUID = path2uuid(new Path(wal));
+ TServerInstance dead = walToDeadServer.get(walUUID);
+ if (dead != null) {
+ Iterator<Path> iter = candidates.get(dead).iterator();
+ while (iter.hasNext()) {
+ if (path2uuid(iter.next()).equals(walUUID)) {
+ iter.remove();
+ break;
+ }
+ }
+ }
+ }
}
}
-
return count;
}
- protected int removeReplicationEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
- InterruptedException {
+ protected int removeReplicationEntries(Map<TServerInstance, Set<Path> > candidates, GCStatus status) throws IOException, KeeperException,
+ InterruptedException {
Connector conn;
try {
conn = context.getConnector();
@@ -353,21 +288,25 @@ public class GarbageCollectWriteAheadLogs {
int count = 0;
- Iterator<Entry<String,Path>> walIter = nameToFileMap.entrySet().iterator();
+ Iterator<Entry<TServerInstance,Set<Path>>> walIter = candidates.entrySet().iterator();
while (walIter.hasNext()) {
- Entry<String,Path> wal = walIter.next();
- String fullPath = wal.getValue().toString();
- if (neededByReplication(conn, fullPath)) {
- log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
- // If we haven't already removed it, check to see if this WAL is
- // "in use" by replication (needed for replication purposes)
- status.currentLog.inUse++;
-
+ Entry<TServerInstance,Set<Path>> wal = walIter.next();
+ Iterator<Path> paths = wal.getValue().iterator();
+ while (paths.hasNext()) {
+ Path fullPath = paths.next();
+ if (neededByReplication(conn, fullPath)) {
+ log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
+ // If we haven't already removed it, check to see if this WAL is
+ // "in use" by replication (needed for replication purposes)
+ status.currentLog.inUse++;
+ paths.remove();
+ } else {
+ log.debug("WAL not needed for replication {}", fullPath);
+ }
+ }
+ if (wal.getValue().isEmpty()) {
walIter.remove();
- sortedWALogs.remove(wal.getKey());
- } else {
- log.debug("WAL not needed for replication {}", fullPath);
}
count++;
}
@@ -375,6 +314,7 @@ public class GarbageCollectWriteAheadLogs {
return count;
}
+
/**
* Determine if the given WAL is needed for replication
*
@@ -382,7 +322,7 @@ public class GarbageCollectWriteAheadLogs {
* The full path (URI)
* @return True if the WAL is still needed by replication (not a candidate for deletion)
*/
- protected boolean neededByReplication(Connector conn, String wal) {
+ protected boolean neededByReplication(Connector conn, Path wal) {
log.info("Checking replication table for " + wal);
Iterable<Entry<Key,Value>> iter = getReplicationStatusForFile(conn, wal);
@@ -405,7 +345,7 @@ public class GarbageCollectWriteAheadLogs {
return false;
}
- protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) {
+ protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, Path wal) {
Scanner metaScanner;
try {
metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
@@ -425,7 +365,7 @@ public class GarbageCollectWriteAheadLogs {
StatusSection.limit(replScanner);
// Only look for this specific WAL
- replScanner.setRange(Range.exact(wal));
+ replScanner.setRange(Range.exact(wal.toString()));
return Iterables.concat(metaScanner, replScanner);
} catch (ReplicationTableOfflineException e) {
@@ -435,107 +375,84 @@ public class GarbageCollectWriteAheadLogs {
return metaScanner;
}
- private int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
- return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap);
- }
-
- /**
- * Scans write-ahead log directories for logs. The maps passed in are populated with scan information.
- *
- * @param walDirs
- * write-ahead log directories
- * @param fileToServerMap
- * map of file paths to servers
- * @param nameToFileMap
- * map of file names to paths
- * @return number of servers located (including those with no logs present)
- */
- int scanServers(String[] walDirs, Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
- Set<String> servers = new HashSet<String>();
- for (String walDir : walDirs) {
- Path walRoot = new Path(walDir);
- FileStatus[] listing = null;
- try {
- listing = fs.listStatus(walRoot);
- } catch (FileNotFoundException e) {
- // ignore dir
- }
- if (listing == null)
- continue;
- for (FileStatus status : listing) {
- String server = status.getPath().getName();
- if (status.isDirectory()) {
- servers.add(server);
- for (FileStatus file : fs.listStatus(new Path(walRoot, server))) {
- if (isUUID(file.getPath().getName())) {
- fileToServerMap.put(file.getPath(), server);
- nameToFileMap.put(file.getPath().getName(), file.getPath());
- } else {
- log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid");
- }
- }
- } else if (isUUID(server)) {
- // old-style WAL are not under a directory
- servers.add("");
- fileToServerMap.put(status.getPath(), "");
- nameToFileMap.put(server, status.getPath());
- } else {
- log.info("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
- }
- }
- }
- return servers.size();
- }
- private Map<String,Path> getSortedWALogs() throws IOException {
- return getSortedWALogs(ServerConstants.getRecoveryDirs());
- }
/**
- * Looks for write-ahead logs in recovery directories.
+ * Scans log markers. The map passed in is populated with the logs for dead servers.
*
- * @param recoveryDirs
- * recovery directories
- * @return map of log file names to paths
+ * @param unusedLogs
+ * map of dead server to log file entries
+ * @return total number of log files
*/
- Map<String,Path> getSortedWALogs(String[] recoveryDirs) throws IOException {
- Map<String,Path> result = new HashMap<String,Path>();
-
- for (String dir : recoveryDirs) {
- Path recoveryDir = new Path(dir);
-
- if (fs.exists(recoveryDir)) {
- for (FileStatus status : fs.listStatus(recoveryDir)) {
- String name = status.getPath().getName();
- if (isUUID(name)) {
- result.put(name, status.getPath());
- } else {
- log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
- }
+ private long getCurrent(Map<TServerInstance, Set<Path> > unusedLogs, Set<TServerInstance> currentServers) throws Exception {
+ Set<Path> rootWALs = new HashSet<>();
+ // Get entries in zookeeper:
+ String zpath = ZooUtil.getRoot(context.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
+ ZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ List<String> children = zoo.getChildren(zpath);
+ for (String child : children) {
+ LogEntry entry = LogEntry.fromBytes(zoo.getData(zpath + "/" + child, null));
+ rootWALs.add(new Path(entry.filename));
+ }
+ long count = 0;
+
+ // get all the WAL markers that are not in zookeeper for dead servers
+ Scanner rootScanner = context.getConnector().createScanner(RootTable.NAME, Authorizations.EMPTY);
+ rootScanner.setRange(CurrentLogsSection.getRange());
+ Scanner metaScanner = context.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ metaScanner.setRange(CurrentLogsSection.getRange());
+ Iterator<Entry<Key,Value>> entries = Iterators.concat(rootScanner.iterator(), metaScanner.iterator());
+ Text hostAndPort = new Text();
+ Text sessionId = new Text();
+ Text filename = new Text();
+ while (entries.hasNext()) {
+ Entry<Key,Value> entry = entries.next();
+ CurrentLogsSection.getTabletServer(entry.getKey(), hostAndPort, sessionId);
+ CurrentLogsSection.getPath(entry.getKey(), filename);
+ TServerInstance tsi = new TServerInstance(HostAndPort.fromString(hostAndPort.toString()), sessionId.toString());
+ Path path = new Path(filename.toString());
+ if (!currentServers.contains(tsi) || entry.getValue().equals(CurrentLogsSection.UNUSED) && !rootWALs.contains(path)) {
+ Set<Path> logs = unusedLogs.get(tsi);
+ if (logs == null) {
+ unusedLogs.put(tsi, logs = new HashSet<Path>());
+ }
+ if (logs.add(path)) {
+ count++;
}
}
}
- return result;
- }
- /**
- * Checks if a string is a valid UUID.
- *
- * @param name
- * string to check
- * @return true if string is a UUID
- */
- static boolean isUUID(String name) {
- if (name == null || name.length() != 36) {
- return false;
- }
- try {
- UUID.fromString(name);
- return true;
- } catch (IllegalArgumentException ex) {
- return false;
+ // scan HDFS for logs for dead servers
+ for (Volume volume : VolumeManagerImpl.get().getVolumes()) {
+ RemoteIterator<LocatedFileStatus> iter = volume.getFileSystem().listFiles(volume.prefixChild(ServerConstants.WAL_DIR), true);
+ while (iter.hasNext()) {
+ LocatedFileStatus next = iter.next();
+ // recursive listing returns directories, too
+ if (next.isDirectory()) {
+ continue;
+ }
+ // make sure we've waited long enough for zookeeper propagation
+ if (System.currentTimeMillis() - next.getModificationTime() < context.getConnector().getInstance().getZooKeepersSessionTimeOut()) {
+ continue;
+ }
+ Path path = next.getPath();
+ String hostPlusPort = path.getParent().getName();
+ // server is still alive, or has a replacement
+ TServerInstance instance = liveServers.find(hostPlusPort);
+ if (instance != null) {
+ continue;
+ }
+ TServerInstance fake = new TServerInstance(hostPlusPort, 0L);
+ Set<Path> paths = unusedLogs.get(fake);
+ if (paths == null) {
+ paths = new HashSet<>();
+ }
+ paths.add(path);
+ unusedLogs.put(fake, paths);
+ }
}
+ return count;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 037023a..4f64c15 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -569,7 +569,6 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
replSpan.stop();
}
- // Clean up any unused write-ahead logs
Span waLogs = Trace.start("walogs");
try {
GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 3a32727..455aaee 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -37,13 +37,11 @@ import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
@@ -186,20 +184,21 @@ public class CloseWriteAheadLogReferences implements Runnable {
try {
// TODO Configurable number of threads
bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
- bs.setRanges(Collections.singleton(TabletsSection.getRange()));
- bs.fetchColumnFamily(LogColumnFamily.NAME);
+ bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
+ bs.fetchColumnFamily(CurrentLogsSection.COLF);
// For each log key/value in the metadata table
for (Entry<Key,Value> entry : bs) {
- // The value may contain multiple WALs
- LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-
- log.debug("Found WALs for table(" + logEntry.extent.getTableId() + "): " + logEntry.logSet);
+ if (entry.getValue().equals(CurrentLogsSection.UNUSED)) {
+ continue;
+ }
+ Text tpath = new Text();
+ CurrentLogsSection.getPath(entry.getKey(), tpath);
+ String path = new Path(tpath.toString()).toString();
+ log.debug("Found WAL " + path.toString());
// Normalize each log file (using Path) and add it to the set
- for (String logFile : logEntry.logSet) {
- referencedWals.add(normalizedWalPaths.get(logFile));
- }
+ referencedWals.add(normalizedWalPaths.get(path));
}
} catch (TableNotFoundException e) {
// uhhhh
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
deleted file mode 100644
index 5801faa..0000000
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ /dev/null
@@ -1,567 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.gc;
-
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationCopy;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.conf.SiteConfiguration;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.gc.thrift.GCStatus;
-import org.apache.accumulo.core.gc.thrift.GcCycleStats;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.conf.ServerConfigurationFactory;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.replication.StatusUtil;
-import org.apache.accumulo.server.replication.proto.Replication.Status;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-public class GarbageCollectWriteAheadLogsTest {
- private static final long BLOCK_SIZE = 64000000L;
-
- private static final Path DIR_1_PATH = new Path("/dir1");
- private static final Path DIR_2_PATH = new Path("/dir2");
- private static final Path DIR_3_PATH = new Path("/dir3");
- private static final String UUID1 = UUID.randomUUID().toString();
- private static final String UUID2 = UUID.randomUUID().toString();
- private static final String UUID3 = UUID.randomUUID().toString();
-
- private Instance instance;
- private AccumuloConfiguration systemConfig;
- private VolumeManager volMgr;
- private GarbageCollectWriteAheadLogs gcwal;
- private long modTime;
-
- @Rule
- public TestName testName = new TestName();
-
- @Before
- public void setUp() throws Exception {
- SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class);
- instance = createMock(Instance.class);
- expect(instance.getInstanceID()).andReturn("mock").anyTimes();
- expect(instance.getZooKeepers()).andReturn("localhost").anyTimes();
- expect(instance.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes();
- systemConfig = new ConfigurationCopy(new HashMap<String,String>());
- volMgr = createMock(VolumeManager.class);
- ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class);
- expect(factory.getConfiguration()).andReturn(systemConfig).anyTimes();
- expect(factory.getInstance()).andReturn(instance).anyTimes();
- expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
-
- // Just make the SiteConfiguration delegate to our AccumuloConfiguration
- // Presently, we only need get(Property) and iterator().
- EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() {
- @Override
- public String answer() {
- Object[] args = EasyMock.getCurrentArguments();
- return systemConfig.get((Property) args[0]);
- }
- }).anyTimes();
- EasyMock.expect(siteConfig.getBoolean(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<Boolean>() {
- @Override
- public Boolean answer() {
- Object[] args = EasyMock.getCurrentArguments();
- return systemConfig.getBoolean((Property) args[0]);
- }
- }).anyTimes();
-
- EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() {
- @Override
- public Iterator<Entry<String,String>> answer() {
- return systemConfig.iterator();
- }
- }).anyTimes();
-
- replay(instance, factory, siteConfig);
- AccumuloServerContext context = new AccumuloServerContext(factory);
- gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false);
- modTime = System.currentTimeMillis();
- }
-
- @Test
- public void testGetters() {
- assertSame(instance, gcwal.getInstance());
- assertSame(volMgr, gcwal.getVolumeManager());
- assertFalse(gcwal.isUsingTrash());
- }
-
- @Test
- public void testPathsToStrings() {
- ArrayList<Path> paths = new ArrayList<Path>();
- paths.add(new Path(DIR_1_PATH, "file1"));
- paths.add(DIR_2_PATH);
- paths.add(new Path(DIR_3_PATH, "file3"));
- List<String> strings = GarbageCollectWriteAheadLogs.paths2strings(paths);
- int len = 3;
- assertEquals(len, strings.size());
- for (int i = 0; i < len; i++) {
- assertEquals(paths.get(i).toString(), strings.get(i));
- }
- }
-
- @Test
- public void testMapServersToFiles() {
- // @formatter:off
- /*
- * Test fileToServerMap:
- * /dir1/server1/uuid1 -> server1 (new-style)
- * /dir1/uuid2 -> "" (old-style)
- * /dir3/server3/uuid3 -> server3 (new-style)
- */
- // @formatter:on
- Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
- Path path1 = new Path(new Path(DIR_1_PATH, "server1"), UUID1);
- fileToServerMap.put(path1, "server1"); // new-style
- Path path2 = new Path(DIR_1_PATH, UUID2);
- fileToServerMap.put(path2, ""); // old-style
- Path path3 = new Path(new Path(DIR_3_PATH, "server3"), UUID3);
- fileToServerMap.put(path3, "server3"); // old-style
- // @formatter:off
- /*
- * Test nameToFileMap:
- * uuid1 -> /dir1/server1/uuid1
- * uuid3 -> /dir3/server3/uuid3
- */
- // @formatter:on
- Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
- nameToFileMap.put(UUID1, path1);
- nameToFileMap.put(UUID3, path3);
-
- // @formatter:off
- /*
- * Expected map:
- * server1 -> [ /dir1/server1/uuid1 ]
- * server3 -> [ /dir3/server3/uuid3 ]
- */
- // @formatter:on
- Map<String,ArrayList<Path>> result = GarbageCollectWriteAheadLogs.mapServersToFiles(fileToServerMap, nameToFileMap);
- assertEquals(2, result.size());
- ArrayList<Path> list1 = result.get("server1");
- assertEquals(1, list1.size());
- assertTrue(list1.contains(path1));
- ArrayList<Path> list3 = result.get("server3");
- assertEquals(1, list3.size());
- assertTrue(list3.contains(path3));
- }
-
- private FileStatus makeFileStatus(int size, Path path) {
- boolean isDir = (size == 0);
- return new FileStatus(size, isDir, 3, BLOCK_SIZE, modTime, path);
- }
-
- private void mockListStatus(Path dir, FileStatus... fileStatuses) throws Exception {
- expect(volMgr.listStatus(dir)).andReturn(fileStatuses);
- }
-
- @Test
- public void testScanServers_NewStyle() throws Exception {
- String[] walDirs = new String[] {"/dir1", "/dir2", "/dir3"};
- // @formatter:off
- /*
- * Test directory layout:
- * /dir1/
- * server1/
- * uuid1
- * file2
- * subdir2/
- * /dir2/ missing
- * /dir3/
- * server3/
- * uuid3
- */
- // @formatter:on
- Path serverDir1Path = new Path(DIR_1_PATH, "server1");
- FileStatus serverDir1 = makeFileStatus(0, serverDir1Path);
- Path subDir2Path = new Path(DIR_1_PATH, "subdir2");
- FileStatus serverDir2 = makeFileStatus(0, subDir2Path);
- mockListStatus(DIR_1_PATH, serverDir1, serverDir2);
- Path path1 = new Path(serverDir1Path, UUID1);
- FileStatus file1 = makeFileStatus(100, path1);
- FileStatus file2 = makeFileStatus(200, new Path(serverDir1Path, "file2"));
- mockListStatus(serverDir1Path, file1, file2);
- mockListStatus(subDir2Path);
- expect(volMgr.listStatus(DIR_2_PATH)).andThrow(new FileNotFoundException());
- Path serverDir3Path = new Path(DIR_3_PATH, "server3");
- FileStatus serverDir3 = makeFileStatus(0, serverDir3Path);
- mockListStatus(DIR_3_PATH, serverDir3);
- Path path3 = new Path(serverDir3Path, UUID3);
- FileStatus file3 = makeFileStatus(300, path3);
- mockListStatus(serverDir3Path, file3);
- replay(volMgr);
-
- Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
- Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
- int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap);
- assertEquals(3, count);
- // @formatter:off
- /*
- * Expected fileToServerMap:
- * /dir1/server1/uuid1 -> server1
- * /dir3/server3/uuid3 -> server3
- */
- // @formatter:on
- assertEquals(2, fileToServerMap.size());
- assertEquals("server1", fileToServerMap.get(path1));
- assertEquals("server3", fileToServerMap.get(path3));
- // @formatter:off
- /*
- * Expected nameToFileMap:
- * uuid1 -> /dir1/server1/uuid1
- * uuid3 -> /dir3/server3/uuid3
- */
- // @formatter:on
- assertEquals(2, nameToFileMap.size());
- assertEquals(path1, nameToFileMap.get(UUID1));
- assertEquals(path3, nameToFileMap.get(UUID3));
- }
-
- @Test
- public void testScanServers_OldStyle() throws Exception {
- // @formatter:off
- /*
- * Test directory layout:
- * /dir1/
- * uuid1
- * /dir3/
- * uuid3
- */
- // @formatter:on
- String[] walDirs = new String[] {"/dir1", "/dir3"};
- Path serverFile1Path = new Path(DIR_1_PATH, UUID1);
- FileStatus serverFile1 = makeFileStatus(100, serverFile1Path);
- mockListStatus(DIR_1_PATH, serverFile1);
- Path serverFile3Path = new Path(DIR_3_PATH, UUID3);
- FileStatus serverFile3 = makeFileStatus(300, serverFile3Path);
- mockListStatus(DIR_3_PATH, serverFile3);
- replay(volMgr);
-
- Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
- Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
- int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap);
- /*
- * Expect only a single server, the non-server entry for upgrade WALs
- */
- assertEquals(1, count);
- // @formatter:off
- /*
- * Expected fileToServerMap:
- * /dir1/uuid1 -> ""
- * /dir3/uuid3 -> ""
- */
- // @formatter:on
- assertEquals(2, fileToServerMap.size());
- assertEquals("", fileToServerMap.get(serverFile1Path));
- assertEquals("", fileToServerMap.get(serverFile3Path));
- // @formatter:off
- /*
- * Expected nameToFileMap:
- * uuid1 -> /dir1/uuid1
- * uuid3 -> /dir3/uuid3
- */
- // @formatter:on
- assertEquals(2, nameToFileMap.size());
- assertEquals(serverFile1Path, nameToFileMap.get(UUID1));
- assertEquals(serverFile3Path, nameToFileMap.get(UUID3));
- }
-
- @Test
- public void testGetSortedWALogs() throws Exception {
- String[] recoveryDirs = new String[] {"/dir1", "/dir2", "/dir3"};
- // @formatter:off
- /*
- * Test directory layout:
- * /dir1/
- * uuid1
- * file2
- * /dir2/ missing
- * /dir3/
- * uuid3
- */
- // @formatter:on
- expect(volMgr.exists(DIR_1_PATH)).andReturn(true);
- expect(volMgr.exists(DIR_2_PATH)).andReturn(false);
- expect(volMgr.exists(DIR_3_PATH)).andReturn(true);
- Path path1 = new Path(DIR_1_PATH, UUID1);
- FileStatus file1 = makeFileStatus(100, path1);
- FileStatus file2 = makeFileStatus(200, new Path(DIR_1_PATH, "file2"));
- mockListStatus(DIR_1_PATH, file1, file2);
- Path path3 = new Path(DIR_3_PATH, UUID3);
- FileStatus file3 = makeFileStatus(300, path3);
- mockListStatus(DIR_3_PATH, file3);
- replay(volMgr);
-
- Map<String,Path> sortedWalogs = gcwal.getSortedWALogs(recoveryDirs);
- // @formatter:off
- /*
- * Expected map:
- * uuid1 -> /dir1/uuid1
- * uuid3 -> /dir3/uuid3
- */
- // @formatter:on
- assertEquals(2, sortedWalogs.size());
- assertEquals(path1, sortedWalogs.get(UUID1));
- assertEquals(path3, sortedWalogs.get(UUID3));
- }
-
- @Test
- public void testIsUUID() {
- assertTrue(GarbageCollectWriteAheadLogs.isUUID(UUID.randomUUID().toString()));
- assertFalse(GarbageCollectWriteAheadLogs.isUUID("foo"));
- assertFalse(GarbageCollectWriteAheadLogs.isUUID("0" + UUID.randomUUID().toString()));
- assertFalse(GarbageCollectWriteAheadLogs.isUUID(null));
- }
-
- // It was easier to do this than get the mocking working for me
- private static class ReplicationGCWAL extends GarbageCollectWriteAheadLogs {
-
- private List<Entry<Key,Value>> replData;
-
- ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, List<Entry<Key,Value>> replData) throws IOException {
- super(context, fs, useTrash);
- this.replData = replData;
- }
-
- @Override
- protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) {
- return this.replData;
- }
- }
-
- @Test
- public void replicationEntriesAffectGC() throws Exception {
- String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
- Connector conn = createMock(Connector.class);
-
- // Write a Status record which should prevent file1 from being deleted
- LinkedList<Entry<Key,Value>> replData = new LinkedList<>();
- replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())));
-
- ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, replData);
-
- replay(conn);
-
- // Open (not-closed) file must be retained
- assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
-
- // No replication data, not needed
- replData.clear();
- assertFalse(replGC.neededByReplication(conn, "/wals/" + file2));
-
- // The file is closed but not replicated, must be retained
- replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileClosedValue()));
- assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
-
- // File is closed and fully replicated, can be deleted
- replData.clear();
- replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"),
- ProtobufUtil.toValue(Status.newBuilder().setInfiniteEnd(true).setBegin(Long.MAX_VALUE).setClosed(true).build())));
- assertFalse(replGC.neededByReplication(conn, "/wals/" + file1));
- }
-
- @Test
- public void removeReplicationEntries() throws Exception {
- String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
-
- Instance inst = new MockInstance(testName.getMethodName());
- AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
-
- GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-
- long file1CreateTime = System.currentTimeMillis();
- long file2CreateTime = file1CreateTime + 50;
- BatchWriter bw = ReplicationTable.getBatchWriter(context.getConnector());
- Mutation m = new Mutation("/wals/" + file1);
- StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
- bw.addMutation(m);
- m = new Mutation("/wals/" + file2);
- StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
- bw.addMutation(m);
-
- // These WALs are potential candidates for deletion from fs
- Map<String,Path> nameToFileMap = new HashMap<>();
- nameToFileMap.put(file1, new Path("/wals/" + file1));
- nameToFileMap.put(file2, new Path("/wals/" + file2));
-
- Map<String,Path> sortedWALogs = Collections.emptyMap();
-
- // Make the GCStatus and GcCycleStats
- GCStatus status = new GCStatus();
- GcCycleStats cycleStats = new GcCycleStats();
- status.currentLog = cycleStats;
-
- // We should iterate over two entries
- Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
-
- // We should have noted that two files were still in use
- Assert.assertEquals(2l, cycleStats.inUse);
-
- // Both should have been deleted
- Assert.assertEquals(0, nameToFileMap.size());
- }
-
- @Test
- public void replicationEntriesOnlyInMetaPreventGC() throws Exception {
- String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
-
- Instance inst = new MockInstance(testName.getMethodName());
- AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
-
- Connector conn = context.getConnector();
-
- GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-
- long file1CreateTime = System.currentTimeMillis();
- long file2CreateTime = file1CreateTime + 50;
- // Write some records to the metadata table, we haven't yet written status records to the replication table
- BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file1);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
- bw.addMutation(m);
-
- m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file2);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
- bw.addMutation(m);
-
- // These WALs are potential candidates for deletion from fs
- Map<String,Path> nameToFileMap = new HashMap<>();
- nameToFileMap.put(file1, new Path("/wals/" + file1));
- nameToFileMap.put(file2, new Path("/wals/" + file2));
-
- Map<String,Path> sortedWALogs = Collections.emptyMap();
-
- // Make the GCStatus and GcCycleStats objects
- GCStatus status = new GCStatus();
- GcCycleStats cycleStats = new GcCycleStats();
- status.currentLog = cycleStats;
-
- // We should iterate over two entries
- Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
-
- // We should have noted that two files were still in use
- Assert.assertEquals(2l, cycleStats.inUse);
-
- // Both should have been deleted
- Assert.assertEquals(0, nameToFileMap.size());
- }
-
- @Test
- public void noReplicationTableDoesntLimitMetatdataResults() throws Exception {
- Instance inst = new MockInstance(testName.getMethodName());
- AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
- Connector conn = context.getConnector();
-
- String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
- BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
- bw.addMutation(m);
- bw.close();
-
- GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-
- Iterable<Entry<Key,Value>> data = gcWALs.getReplicationStatusForFile(conn, wal);
- Entry<Key,Value> entry = Iterables.getOnlyElement(data);
-
- Assert.assertEquals(ReplicationSection.getRowPrefix() + wal, entry.getKey().getRow().toString());
- }
-
- @Test
- public void fetchesReplicationEntriesFromMetadataAndReplicationTables() throws Exception {
- Instance inst = new MockInstance(testName.getMethodName());
- AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
- Connector conn = context.getConnector();
-
- long walCreateTime = System.currentTimeMillis();
- String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
- BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
- bw.addMutation(m);
- bw.close();
-
- bw = ReplicationTable.getBatchWriter(conn);
- m = new Mutation(wal);
- StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
- bw.addMutation(m);
- bw.close();
-
- GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-
- Iterable<Entry<Key,Value>> iter = gcWALs.getReplicationStatusForFile(conn, wal);
- Map<Key,Value> data = new HashMap<>();
- for (Entry<Key,Value> e : iter) {
- data.put(e.getKey(), e.getValue());
- }
-
- Assert.assertEquals(2, data.size());
-
- // Should get one element from each table (metadata and replication)
- for (Key k : data.keySet()) {
- String row = k.getRow().toString();
- if (row.startsWith(ReplicationSection.getRowPrefix())) {
- Assert.assertTrue(row.endsWith(wal));
- } else {
- Assert.assertEquals(wal, row);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index 3115de1..78a5bd5 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -50,14 +50,12 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
@@ -130,22 +128,16 @@ public class CloseWriteAheadLogReferencesTest {
public void findOneWalFromMetadata() throws Exception {
Connector conn = createMock(Connector.class);
BatchScanner bs = createMock(BatchScanner.class);
-
// Fake out some data
final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
- LogEntry logEntry = new LogEntry();
- logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
- logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID();
- logEntry.server = "tserver1";
- logEntry.tabletId = 1;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+ String file = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID();
+ data.add(entry("tserver1:9997[1234567890]", file));
// Get a batchscanner, scan the tablets section, fetch only the logs
expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
- bs.setRanges(Collections.singleton(TabletsSection.getRange()));
+ bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
expectLastCall().once();
- bs.fetchColumnFamily(LogColumnFamily.NAME);
+ bs.fetchColumnFamily(CurrentLogsSection.COLF);
expectLastCall().once();
expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
@@ -163,54 +155,12 @@ public class CloseWriteAheadLogReferencesTest {
// Validate
Set<String> wals = refs.getReferencedWals(conn);
- Assert.assertEquals(Collections.singleton(logEntry.filename), wals);
-
- verify(conn, bs);
- }
-
- @Test
- public void findManyWalFromSingleMetadata() throws Exception {
- Connector conn = createMock(Connector.class);
- BatchScanner bs = createMock(BatchScanner.class);
-
- // Fake out some data
- final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
- LogEntry logEntry = new LogEntry();
- logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
- logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID();
- logEntry.server = "tserver1";
- logEntry.tabletId = 1;
- // Multiple DFSLoggers
- logEntry.logSet = Sets.newHashSet(logEntry.filename, "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID());
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- // Get a batchscanner, scan the tablets section, fetch only the logs
- expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
- bs.setRanges(Collections.singleton(TabletsSection.getRange()));
- expectLastCall().once();
- bs.fetchColumnFamily(LogColumnFamily.NAME);
- expectLastCall().once();
- expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
-
- @Override
- public Iterator<Entry<Key,Value>> answer() throws Throwable {
- return data.iterator();
- }
-
- });
- // Close the bs
- bs.close();
- expectLastCall().once();
-
- replay(conn, bs);
-
- // Validate
- Set<String> wals = refs.getReferencedWals(conn);
- Assert.assertEquals(logEntry.logSet, wals);
+ Assert.assertEquals(Collections.singleton(file), wals);
verify(conn, bs);
}
+ // This is a silly test now
@Test
public void findManyRefsToSingleWalFromMetadata() throws Exception {
Connector conn = createMock(Connector.class);
@@ -220,31 +170,14 @@ public class CloseWriteAheadLogReferencesTest {
// Fake out some data
final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
- LogEntry logEntry = new LogEntry();
- logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
- logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + uuid;
- logEntry.server = "tserver1";
- logEntry.tabletId = 1;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("1"), new Text("c"), new Text("b"));
- logEntry.server = "tserver1";
- logEntry.tabletId = 2;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("1"), null, new Text("c"));
- logEntry.server = "tserver1";
- logEntry.tabletId = 3;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+ String filename = "hdfs://localhost:8020/accumulo/wal/tserver+9997/" + uuid;
+ data.add(entry("tserver1:9997[0123456789]", filename));
// Get a batchscanner, scan the tablets section, fetch only the logs
expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
- bs.setRanges(Collections.singleton(TabletsSection.getRange()));
+ bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
expectLastCall().once();
- bs.fetchColumnFamily(LogColumnFamily.NAME);
+ bs.fetchColumnFamily(CurrentLogsSection.COLF);
expectLastCall().once();
expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
@@ -262,7 +195,7 @@ public class CloseWriteAheadLogReferencesTest {
// Validate
Set<String> wals = refs.getReferencedWals(conn);
- Assert.assertEquals(Collections.singleton(logEntry.filename), wals);
+ Assert.assertEquals(Collections.singleton(filename), wals);
verify(conn, bs);
}
@@ -272,59 +205,22 @@ public class CloseWriteAheadLogReferencesTest {
Connector conn = createMock(Connector.class);
BatchScanner bs = createMock(BatchScanner.class);
- String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+port/" + UUID.randomUUID(), file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+port/"
- + UUID.randomUUID(), file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+port/" + UUID.randomUUID();
+ String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID();
+ String file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+9997/" + UUID.randomUUID();
+ String file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+9997/" + UUID.randomUUID();
// Fake out some data
final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
- LogEntry logEntry = new LogEntry();
- logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
- logEntry.filename = file1;
- logEntry.server = "tserver1";
- logEntry.tabletId = 1;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("5"), null, null);
- logEntry.tabletId = 2;
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("3"), new Text("b"), new Text("a"));
- logEntry.filename = file2;
- logEntry.server = "tserver2";
- logEntry.tabletId = 3;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("3"), new Text("c"), new Text("b"));
- logEntry.tabletId = 4;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("4"), new Text("5"), new Text("0"));
- logEntry.filename = file3;
- logEntry.server = "tserver3";
- logEntry.tabletId = 5;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("4"), new Text("8"), new Text("5"));
- logEntry.server = "tserver3";
- logEntry.tabletId = 7;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("4"), null, new Text("8"));
- logEntry.server = "tserver3";
- logEntry.tabletId = 15;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+ data.add(entry("tserver1:9997[1234567890]", file1));
+ data.add(entry("tserver2:9997[1234567891]", file2));
+ data.add(entry("tserver3:9997[1234567891]", file3));
// Get a batchscanner, scan the tablets section, fetch only the logs
expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
- bs.setRanges(Collections.singleton(TabletsSection.getRange()));
+ bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
expectLastCall().once();
- bs.fetchColumnFamily(LogColumnFamily.NAME);
+ bs.fetchColumnFamily(CurrentLogsSection.COLF);
expectLastCall().once();
expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
@@ -347,6 +243,11 @@ public class CloseWriteAheadLogReferencesTest {
verify(conn, bs);
}
+ private static Entry<Key,Value> entry(String session, String file) {
+ Key key = new Key(new Text(CurrentLogsSection.getRowPrefix() + session), CurrentLogsSection.COLF, new Text(file));
+ return Maps.immutableEntry(key, new Value());
+ }
+
@Test
public void unusedWalsAreClosed() throws Exception {
Set<String> wals = Collections.emptySet();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 4d52cc5..810134b 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -422,6 +422,9 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
perm.grantNamespacePermission(user, Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.READ);
}
perm.grantNamespacePermission("root", Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE);
+
+ // add the currlog location for root tablet current logs
+ zoo.putPersistentData(ZooUtil.getRoot(getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS, new byte[0], NodeExistsPolicy.SKIP);
haveUpgradedZooKeeper = true;
} catch (Exception ex) {
// ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
index 592d9ae..3f15b39 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
@@ -173,7 +173,8 @@ class MasterClientServiceHandler extends FateServiceHandler implements MasterCli
scanner.setRange(MetadataSchema.TabletsSection.getRange());
} else {
scanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
- scanner.setRange(new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange());
+ Range range = new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange();
+ scanner.setRange(range.clip(MetadataSchema.TabletsSection.getRange()));
}
TabletsSection.ServerColumnFamily.FLUSH_COLUMN.fetch(scanner);
TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index 4c47953..20917a4 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -163,6 +163,7 @@ class TabletGroupWatcher extends Daemon {
List<Assignment> assigned = new ArrayList<Assignment>();
List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>();
Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>();
+ Map<TServerInstance, List<Path>> logsForDeadServers = new TreeMap<>();
MasterState masterState = master.getMasterState();
int[] counts = new int[TabletState.values().length];
@@ -175,6 +176,7 @@ class TabletGroupWatcher extends Daemon {
if (tls == null) {
continue;
}
+ Master.log.debug(store.name() + " location State: " + tls);
// ignore entries for tables that do not exist in zookeeper
if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null)
continue;
@@ -184,7 +186,7 @@ class TabletGroupWatcher extends Daemon {
// Don't overwhelm the tablet servers with work
if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
- flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
+ flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned);
assignments.clear();
assigned.clear();
assignedToDeadServers.clear();
@@ -204,8 +206,9 @@ class TabletGroupWatcher extends Daemon {
TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo());
TServerInstance server = tls.getServer();
TabletState state = tls.getState(currentTServers.keySet());
- if (Master.log.isTraceEnabled())
- Master.log.trace("Goal state " + goal + " current " + state);
+ if (Master.log.isTraceEnabled()) {
+ Master.log.trace("Goal state " + goal + " current " + state + " for " + tls.extent);
+ }
stats.update(tableId, state);
mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty());
sendChopRequest(mergeStats.getMergeInfo(), state, tls);
@@ -239,7 +242,7 @@ class TabletGroupWatcher extends Daemon {
assignedToDeadServers.add(tls);
if (server.equals(this.master.migrations.get(tls.extent)))
this.master.migrations.remove(tls.extent);
- // log.info("Current servers " + currentTServers.keySet());
+ MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(), tls.extent, tls.futureOrCurrent(), logsForDeadServers);
break;
case UNASSIGNED:
// maybe it's a finishing migration
@@ -273,7 +276,7 @@ class TabletGroupWatcher extends Daemon {
break;
case ASSIGNED_TO_DEAD_SERVER:
assignedToDeadServers.add(tls);
- // log.info("Current servers " + currentTServers.keySet());
+ MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(), tls.extent, tls.futureOrCurrent(), logsForDeadServers);
break;
case HOSTED:
TServerConnection conn = this.master.tserverSet.getConnection(server);
@@ -292,7 +295,8 @@ class TabletGroupWatcher extends Daemon {
counts[state.ordinal()]++;
}
- flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
+ flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned);
+ store.markLogsAsUnused(master, logsForDeadServers);
// provide stats after flushing changes to avoid race conditions w/ delete table
stats.end(masterState);
@@ -312,8 +316,12 @@ class TabletGroupWatcher extends Daemon {
updateMergeState(mergeStatsCache);
- Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
- eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+ if (this.master.tserverSet.getCurrentServers().equals(currentTServers.keySet())) {
+ Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
+ eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+ } else {
+ Master.log.info("Detected change in current tserver set, re-running state machine.");
+ }
} catch (Exception ex) {
Master.log.error("Error processing table state for store " + store.name(), ex);
if (ex.getCause() != null && ex.getCause() instanceof BadLocationStateException) {
@@ -730,12 +738,19 @@ class TabletGroupWatcher extends Daemon {
}
}
- private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned,
- List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException {
+ private void flushChanges(
+ SortedMap<TServerInstance,TabletServerStatus> currentTServers,
+ List<Assignment> assignments,
+ List<Assignment> assigned,
+ List<TabletLocationState> assignedToDeadServers,
+ Map<TServerInstance, List<Path>> logsForDeadServers,
+ Map<KeyExtent,TServerInstance> unassigned)
+ throws DistributedStoreException, TException {
if (!assignedToDeadServers.isEmpty()) {
int maxServersToShow = min(assignedToDeadServers.size(), 100);
Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
- store.unassign(assignedToDeadServers);
+ Master.log.debug("logs for dead servers: " + logsForDeadServers);
+ store.unassign(assignedToDeadServers, logsForDeadServers);
this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size());
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
index 8532e1b..74e9b78 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
@@ -107,6 +107,7 @@ public class WorkMaker {
// Don't create the record if we have nothing to do.
// TODO put this into a filter on serverside
if (!shouldCreateWork(status)) {
+ log.debug("Not creating work: " + status.toString());
continue;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
index a3c7e46..f1763be 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
@@ -30,6 +30,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.cli.ClientOpts;
@@ -187,7 +188,7 @@ public class MergeStats {
Text tableId = extent.getTableId();
Text first = KeyExtent.getMetadataEntry(tableId, start);
Range range = new Range(first, false, null, true);
- scanner.setRange(range);
+ scanner.setRange(range.clip(MetadataSchema.TabletsSection.getRange()));
KeyExtent prevExtent = null;
log.debug("Scanning range " + range);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java b/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
index 6790858..3c3bc37 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.accumulo.master;
-import java.util.Arrays;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
@@ -306,13 +305,7 @@ public class ReplicationOperationsImplTest {
bw.addMutation(m);
bw.close();
- LogEntry logEntry = new LogEntry();
- logEntry.extent = new KeyExtent(new Text(tableId1), null, null);
- logEntry.server = "tserver";
- logEntry.filename = file1;
- logEntry.tabletId = 1;
- logEntry.logSet = Arrays.asList(file1);
- logEntry.timestamp = System.currentTimeMillis();
+ LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId1), null, null), System.currentTimeMillis(), "tserver", file1);
bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
m = new Mutation(ReplicationSection.getRowPrefix() + file1);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
index 634ee89..8cbea68 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
@@ -186,7 +186,7 @@ public class TestMergeState {
// take it offline
m = tablet.getPrevRowUpdateMutation();
Collection<Collection<String>> walogs = Collections.emptyList();
- metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)));
+ metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)), null);
// now we can split
stats = scan(state, metaDataStateStore);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
index d2cc0cf..4002da5 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
@@ -181,7 +181,7 @@ public class RootTabletStateStoreTest {
} catch (BadLocationStateException e) {
fail("Unexpected error " + e);
}
- tstore.unassign(Collections.singletonList(assigned));
+ tstore.unassign(Collections.singletonList(assigned), null);
count = 0;
for (TabletLocationState location : tstore) {
assertEquals(location.extent, root);
@@ -209,7 +209,7 @@ public class RootTabletStateStoreTest {
fail("Unexpected error " + e);
}
try {
- tstore.unassign(Collections.singletonList(broken));
+ tstore.unassign(Collections.singletonList(broken), null);
Assert.fail("should not get here");
} catch (IllegalArgumentException ex) {}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/findbugs/exclude-filter.xml
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/findbugs/exclude-filter.xml b/server/tserver/src/main/findbugs/exclude-filter.xml
index 47dd1f5..a334163 100644
--- a/server/tserver/src/main/findbugs/exclude-filter.xml
+++ b/server/tserver/src/main/findbugs/exclude-filter.xml
@@ -18,7 +18,7 @@
<Match>
<!-- locking is confusing, but probably correct -->
<Class name="org.apache.accumulo.tserver.tablet.Tablet" />
- <Method name="beginUpdatingLogsUsed" params="org.apache.accumulo.tserver.InMemoryMap,java.util.Collection,boolean" returns="boolean" />
+ <Method name="beginUpdatingLogsUsed" params="org.apache.accumulo.tserver.InMemoryMap,org.apache.accumulo.tserver.log.DfsLogger,boolean" returns="boolean" />
<Bug code="UL" pattern="UL_UNRELEASED_LOCK" />
</Match>
<Match>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java b/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
index 389a544..5fe2548 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
@@ -35,7 +35,8 @@ public class GarbageCollectionLogger {
private long gcTimeIncreasedCount = 0;
private static long lastMemoryCheckTime = 0;
- public GarbageCollectionLogger() {}
+ public GarbageCollectionLogger() {
+ }
public synchronized void logGCInfo(AccumuloConfiguration conf) {
final long now = System.currentTimeMillis();