You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/05/10 23:06:01 UTC
[15/19] accumulo git commit: Revert "ACCUMULO-3423 optimize WAL
metadata table updates"
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 40acf8b..1735c0d 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -18,7 +18,7 @@ package org.apache.accumulo.gc;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.Collection;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -28,56 +28,49 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.gc.thrift.GcCycleStats;
import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
+import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
-import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.fs.VolumeManagerImpl;
-import org.apache.accumulo.server.master.LiveTServerSet;
-import org.apache.accumulo.server.master.LiveTServerSet.Listener;
-import org.apache.accumulo.server.master.state.MetaDataStateStore;
-import org.apache.accumulo.server.master.state.RootTabletStateStore;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.state.TabletLocationState;
-import org.apache.accumulo.server.master.state.TabletState;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.util.MetadataTableUtil;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
import com.google.common.net.HostAndPort;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -86,8 +79,8 @@ public class GarbageCollectWriteAheadLogs {
private final AccumuloServerContext context;
private final VolumeManager fs;
- private final boolean useTrash;
- private final LiveTServerSet liveServers;
+
+ private boolean useTrash;
/**
* Creates a new GC WAL object.
@@ -103,35 +96,56 @@ public class GarbageCollectWriteAheadLogs {
this.context = context;
this.fs = fs;
this.useTrash = useTrash;
- this.liveServers = new LiveTServerSet(context, new Listener() {
- @Override
- public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) {
- log.debug("New tablet servers noticed: " + added);
- log.debug("Tablet servers removed: " + deleted);
- }
- });
- liveServers.startListeningForTabletServerChanges();
+ }
+
+ /**
+ * Gets the instance used by this object.
+ *
+ * @return instance
+ */
+ Instance getInstance() {
+ return context.getInstance();
+ }
+
+ /**
+ * Gets the volume manager used by this object.
+ *
+ * @return volume manager
+ */
+ VolumeManager getVolumeManager() {
+ return fs;
+ }
+
+ /**
+ * Checks if the volume manager should move files to the trash rather than delete them.
+ *
+ * @return true if trash is used
+ */
+ boolean isUsingTrash() {
+ return useTrash;
}
public void collect(GCStatus status) {
- Span span = Trace.start("getCandidates");
+ Span span = Trace.start("scanServers");
try {
- Set<TServerInstance> currentServers = liveServers.getCurrentServers();
+
+ Map<String,Path> sortedWALogs = getSortedWALogs();
status.currentLog.started = System.currentTimeMillis();
- Map<TServerInstance,Set<Path>> candidates = new HashMap<>();
- long count = getCurrent(candidates, currentServers);
+ Map<Path,String> fileToServerMap = new HashMap<Path,String>();
+ Map<String,Path> nameToFileMap = new HashMap<String,Path>();
+ int count = scanServers(fileToServerMap, nameToFileMap);
long fileScanStop = System.currentTimeMillis();
-
- log.info(String.format("Fetched %d files for %d servers in %.2f seconds", count, candidates.size(), (fileScanStop - status.currentLog.started) / 1000.));
- status.currentLog.candidates = count;
+ log.info(String.format("Fetched %d files from %d servers in %.2f seconds", fileToServerMap.size(), count,
+ (fileScanStop - status.currentLog.started) / 1000.));
+ status.currentLog.candidates = fileToServerMap.size();
span.stop();
- span = Trace.start("removeEntriesInUse");
+ span = Trace.start("removeMetadataEntries");
try {
- count = removeEntriesInUse(candidates, status, currentServers);
+ count = removeMetadataEntries(nameToFileMap, sortedWALogs, status);
} catch (Exception ex) {
log.error("Unable to scan metadata table", ex);
return;
@@ -144,7 +158,7 @@ public class GarbageCollectWriteAheadLogs {
span = Trace.start("removeReplicationEntries");
try {
- count = removeReplicationEntries(candidates, status);
+ count = removeReplicationEntries(nameToFileMap, sortedWALogs, status);
} catch (Exception ex) {
log.error("Unable to scan replication table", ex);
return;
@@ -156,22 +170,16 @@ public class GarbageCollectWriteAheadLogs {
log.info(String.format("%d replication entries scanned in %.2f seconds", count, (replicationEntryScanStop - logEntryScanStop) / 1000.));
span = Trace.start("removeFiles");
+ Map<String,ArrayList<Path>> serverToFileMap = mapServersToFiles(fileToServerMap, nameToFileMap);
- count = removeFiles(candidates, status);
+ count = removeFiles(nameToFileMap, serverToFileMap, sortedWALogs, status);
long removeStop = System.currentTimeMillis();
- log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, candidates.size(), (removeStop - logEntryScanStop) / 1000.));
- span.stop();
-
- span = Trace.start("removeMarkers");
- count = removeTabletServerMarkers(candidates);
- long removeMarkersStop = System.currentTimeMillis();
- log.info(String.format("%d markers removed in %.2f seconds", count, (removeMarkersStop - removeStop) / 1000.));
- span.stop();
-
+ log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, serverToFileMap.size(), (removeStop - logEntryScanStop) / 1000.));
status.currentLog.finished = removeStop;
status.lastLog = status.currentLog;
status.currentLog = new GcCycleStats();
+ span.stop();
} catch (Exception e) {
log.error("exception occured while garbage collecting write ahead logs", e);
@@ -180,100 +188,161 @@ public class GarbageCollectWriteAheadLogs {
}
}
- private long removeTabletServerMarkers(Map<TServerInstance,Set<Path>> candidates) {
- long result = 0;
+ boolean holdsLock(HostAndPort addr) {
try {
- BatchWriter root = null;
- BatchWriter meta = null;
- try {
- root = context.getConnector().createBatchWriter(RootTable.NAME, null);
- meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
- for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
- Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
- for (Path path : entry.getValue()) {
- m.putDelete(CurrentLogsSection.COLF, new Text(path.toString()));
- result++;
+ String zpath = ZooUtil.getRoot(context.getInstance()) + Constants.ZTSERVERS + "/" + addr.toString();
+ List<String> children = ZooReaderWriter.getInstance().getChildren(zpath);
+ return !(children == null || children.isEmpty());
+ } catch (KeeperException.NoNodeException ex) {
+ return false;
+ } catch (Exception ex) {
+ log.debug(ex.toString(), ex);
+ return true;
+ }
+ }
+
+ private int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>> serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
+ for (Entry<String,ArrayList<Path>> entry : serverToFileMap.entrySet()) {
+ if (entry.getKey().isEmpty()) {
+ // old-style log entry, just remove it
+ for (Path path : entry.getValue()) {
+ log.debug("Removing old-style WAL " + path);
+ try {
+ if (!useTrash || !fs.moveToTrash(path))
+ fs.deleteRecursively(path);
+ status.currentLog.deleted++;
+ } catch (FileNotFoundException ex) {
+ // ignored
+ } catch (IOException ex) {
+ log.error("Unable to delete wal " + path + ": " + ex);
}
- root.addMutation(m);
- meta.addMutation(m);
}
- } finally {
- if (meta != null) {
- meta.close();
- }
- if (root != null) {
- root.close();
+ } else {
+ HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false);
+ if (!holdsLock(address)) {
+ for (Path path : entry.getValue()) {
+ log.debug("Removing WAL for offline server " + path);
+ try {
+ if (!useTrash || !fs.moveToTrash(path))
+ fs.deleteRecursively(path);
+ status.currentLog.deleted++;
+ } catch (FileNotFoundException ex) {
+ // ignored
+ } catch (IOException ex) {
+ log.error("Unable to delete wal " + path + ": " + ex);
+ }
+ }
+ continue;
+ } else {
+ Client tserver = null;
+ try {
+ tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context);
+ tserver.removeLogs(Tracer.traceInfo(), context.rpcCreds(), paths2strings(entry.getValue()));
+ log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
+ status.currentLog.deleted += entry.getValue().size();
+ } catch (TException e) {
+ log.warn("Error talking to " + address + ": " + e);
+ } finally {
+ if (tserver != null)
+ ThriftUtil.returnClient(tserver);
+ }
}
}
- } catch (Exception ex) {
- throw new RuntimeException(ex);
}
- return result;
- }
- private long removeFiles(Map<TServerInstance,Set<Path>> candidates, final GCStatus status) {
- for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
- for (Path path : entry.getValue()) {
- log.debug("Removing unused WAL for server " + entry.getKey() + " log " + path);
+ for (Path swalog : sortedWALogs.values()) {
+ log.debug("Removing sorted WAL " + swalog);
+ try {
+ if (!useTrash || !fs.moveToTrash(swalog)) {
+ fs.deleteRecursively(swalog);
+ }
+ } catch (FileNotFoundException ex) {
+ // ignored
+ } catch (IOException ioe) {
try {
- if (!useTrash || !fs.moveToTrash(path))
- fs.deleteRecursively(path);
- status.currentLog.deleted++;
- } catch (FileNotFoundException ex) {
- // ignored
+ if (fs.exists(swalog)) {
+ log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
+ }
} catch (IOException ex) {
- log.error("Unable to delete wal " + path + ": " + ex);
+ log.error("Unable to check for the existence of " + swalog, ex);
}
}
}
- return status.currentLog.deleted;
- }
- private UUID path2uuid(Path path) {
- return UUID.fromString(path.getName());
+ return 0;
}
- private long removeEntriesInUse(Map<TServerInstance,Set<Path>> candidates, GCStatus status, Set<TServerInstance> liveServers) throws IOException,
- KeeperException, InterruptedException {
-
- // remove any entries if there's a log reference, or a tablet is still assigned to the dead server
+ /**
+ * Converts a list of paths to their corresponding strings.
+ *
+ * @param paths
+ * list of paths
+ * @return string forms of paths
+ */
+ static List<String> paths2strings(List<Path> paths) {
+ List<String> result = new ArrayList<String>(paths.size());
+ for (Path path : paths)
+ result.add(path.toString());
+ return result;
+ }
- Map<UUID,TServerInstance> walToDeadServer = new HashMap<>();
- for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
- for (Path file : entry.getValue()) {
- walToDeadServer.put(path2uuid(file), entry.getKey());
+ /**
+ * Reverses the given mapping of file paths to servers. The returned map provides a list of file paths for each server. Any path whose name is not in the
+ * mapping of file names to paths is skipped.
+ *
+ * @param fileToServerMap
+ * map of file paths to servers
+ * @param nameToFileMap
+ * map of file names to paths
+ * @return map of servers to lists of file paths
+ */
+ static Map<String,ArrayList<Path>> mapServersToFiles(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) {
+ Map<String,ArrayList<Path>> result = new HashMap<String,ArrayList<Path>>();
+ for (Entry<Path,String> fileServer : fileToServerMap.entrySet()) {
+ if (!nameToFileMap.containsKey(fileServer.getKey().getName()))
+ continue;
+ ArrayList<Path> files = result.get(fileServer.getValue());
+ if (files == null) {
+ files = new ArrayList<Path>();
+ result.put(fileServer.getValue(), files);
}
+ files.add(fileServer.getKey());
}
- long count = 0;
- RootTabletStateStore root = new RootTabletStateStore(context);
- MetaDataStateStore meta = new MetaDataStateStore(context);
- Iterator<TabletLocationState> states = Iterators.concat(root.iterator(), meta.iterator());
- while (states.hasNext()) {
- count++;
- TabletLocationState state = states.next();
- if (state.getState(liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) {
- candidates.remove(state.current);
- }
- for (Collection<String> wals : state.walogs) {
- for (String wal : wals) {
- UUID walUUID = path2uuid(new Path(wal));
- TServerInstance dead = walToDeadServer.get(walUUID);
- if (dead != null) {
- Iterator<Path> iter = candidates.get(dead).iterator();
- while (iter.hasNext()) {
- if (path2uuid(iter.next()).equals(walUUID)) {
- iter.remove();
- break;
- }
- }
- }
+ return result;
+ }
+
+ protected int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+ InterruptedException {
+ int count = 0;
+ Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(context);
+
+ // For each WAL reference in the metadata table
+ while (iterator.hasNext()) {
+ // Each metadata reference has at least one WAL file
+ for (String entry : iterator.next().logSet) {
+ // old style WALs will have the IP:Port of their logger and new style will either be a Path either absolute or relative, in all cases
+ // the last "/" will mark a UUID file name.
+ String uuid = entry.substring(entry.lastIndexOf("/") + 1);
+ if (!isUUID(uuid)) {
+ // fully expect this to be a uuid, if its not then something is wrong and walog GC should not proceed!
+ throw new IllegalArgumentException("Expected uuid, but got " + uuid + " from " + entry);
}
+
+ Path pathFromNN = nameToFileMap.remove(uuid);
+ if (pathFromNN != null) {
+ status.currentLog.inUse++;
+ sortedWALogs.remove(uuid);
+ }
+
+ count++;
}
}
+
return count;
}
- protected int removeReplicationEntries(Map<TServerInstance,Set<Path>> candidates, GCStatus status) throws IOException, KeeperException, InterruptedException {
+ protected int removeReplicationEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+ InterruptedException {
Connector conn;
try {
conn = context.getConnector();
@@ -284,25 +353,21 @@ public class GarbageCollectWriteAheadLogs {
int count = 0;
- Iterator<Entry<TServerInstance,Set<Path>>> walIter = candidates.entrySet().iterator();
+ Iterator<Entry<String,Path>> walIter = nameToFileMap.entrySet().iterator();
while (walIter.hasNext()) {
- Entry<TServerInstance,Set<Path>> wal = walIter.next();
- Iterator<Path> paths = wal.getValue().iterator();
- while (paths.hasNext()) {
- Path fullPath = paths.next();
- if (neededByReplication(conn, fullPath)) {
- log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
- // If we haven't already removed it, check to see if this WAL is
- // "in use" by replication (needed for replication purposes)
- status.currentLog.inUse++;
- paths.remove();
- } else {
- log.debug("WAL not needed for replication {}", fullPath);
- }
- }
- if (wal.getValue().isEmpty()) {
+ Entry<String,Path> wal = walIter.next();
+ String fullPath = wal.getValue().toString();
+ if (neededByReplication(conn, fullPath)) {
+ log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
+ // If we haven't already removed it, check to see if this WAL is
+ // "in use" by replication (needed for replication purposes)
+ status.currentLog.inUse++;
+
walIter.remove();
+ sortedWALogs.remove(wal.getKey());
+ } else {
+ log.debug("WAL not needed for replication {}", fullPath);
}
count++;
}
@@ -317,7 +382,7 @@ public class GarbageCollectWriteAheadLogs {
* The full path (URI)
* @return True if the WAL is still needed by replication (not a candidate for deletion)
*/
- protected boolean neededByReplication(Connector conn, Path wal) {
+ protected boolean neededByReplication(Connector conn, String wal) {
log.info("Checking replication table for " + wal);
Iterable<Entry<Key,Value>> iter = getReplicationStatusForFile(conn, wal);
@@ -340,7 +405,7 @@ public class GarbageCollectWriteAheadLogs {
return false;
}
- protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, Path wal) {
+ protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) {
Scanner metaScanner;
try {
metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
@@ -360,7 +425,7 @@ public class GarbageCollectWriteAheadLogs {
StatusSection.limit(replScanner);
// Only look for this specific WAL
- replScanner.setRange(Range.exact(wal.toString()));
+ replScanner.setRange(Range.exact(wal));
return Iterables.concat(metaScanner, replScanner);
} catch (ReplicationTableOfflineException e) {
@@ -370,84 +435,107 @@ public class GarbageCollectWriteAheadLogs {
return metaScanner;
}
-
-
+ private int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
+ return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap);
+ }
/**
- * Scans log markers. The map passed in is populated with the logs for dead servers.
+ * Scans write-ahead log directories for logs. The maps passed in are populated with scan information.
*
- * @param unusedLogs
- * map of dead server to log file entries
- * @return total number of log files
+ * @param walDirs
+ * write-ahead log directories
+ * @param fileToServerMap
+ * map of file paths to servers
+ * @param nameToFileMap
+ * map of file names to paths
+ * @return number of servers located (including those with no logs present)
*/
- private long getCurrent(Map<TServerInstance, Set<Path> > unusedLogs, Set<TServerInstance> currentServers) throws Exception {
- Set<Path> rootWALs = new HashSet<>();
- // Get entries in zookeeper:
- String zpath = ZooUtil.getRoot(context.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
- ZooReaderWriter zoo = ZooReaderWriter.getInstance();
- List<String> children = zoo.getChildren(zpath);
- for (String child : children) {
- LogEntry entry = LogEntry.fromBytes(zoo.getData(zpath + "/" + child, null));
- rootWALs.add(new Path(entry.filename));
- }
- long count = 0;
-
- // get all the WAL markers that are not in zookeeper for dead servers
- Scanner rootScanner = context.getConnector().createScanner(RootTable.NAME, Authorizations.EMPTY);
- rootScanner.setRange(CurrentLogsSection.getRange());
- Scanner metaScanner = context.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- metaScanner.setRange(CurrentLogsSection.getRange());
- Iterator<Entry<Key,Value>> entries = Iterators.concat(rootScanner.iterator(), metaScanner.iterator());
- Text hostAndPort = new Text();
- Text sessionId = new Text();
- Text filename = new Text();
- while (entries.hasNext()) {
- Entry<Key,Value> entry = entries.next();
- CurrentLogsSection.getTabletServer(entry.getKey(), hostAndPort, sessionId);
- CurrentLogsSection.getPath(entry.getKey(), filename);
- TServerInstance tsi = new TServerInstance(HostAndPort.fromString(hostAndPort.toString()), sessionId.toString());
- Path path = new Path(filename.toString());
- if (!currentServers.contains(tsi) || entry.getValue().equals(CurrentLogsSection.UNUSED) && !rootWALs.contains(path)) {
- Set<Path> logs = unusedLogs.get(tsi);
- if (logs == null) {
- unusedLogs.put(tsi, logs = new HashSet<Path>());
- }
- if (logs.add(path)) {
- count++;
+ int scanServers(String[] walDirs, Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
+ Set<String> servers = new HashSet<String>();
+ for (String walDir : walDirs) {
+ Path walRoot = new Path(walDir);
+ FileStatus[] listing = null;
+ try {
+ listing = fs.listStatus(walRoot);
+ } catch (FileNotFoundException e) {
+ // ignore dir
+ }
+
+ if (listing == null)
+ continue;
+ for (FileStatus status : listing) {
+ String server = status.getPath().getName();
+ if (status.isDirectory()) {
+ servers.add(server);
+ for (FileStatus file : fs.listStatus(new Path(walRoot, server))) {
+ if (isUUID(file.getPath().getName())) {
+ fileToServerMap.put(file.getPath(), server);
+ nameToFileMap.put(file.getPath().getName(), file.getPath());
+ } else {
+ log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid");
+ }
+ }
+ } else if (isUUID(server)) {
+ // old-style WAL are not under a directory
+ servers.add("");
+ fileToServerMap.put(status.getPath(), "");
+ nameToFileMap.put(server, status.getPath());
+ } else {
+ log.info("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
}
}
}
+ return servers.size();
+ }
- // scan HDFS for logs for dead servers
- for (Volume volume : VolumeManagerImpl.get().getVolumes()) {
- RemoteIterator<LocatedFileStatus> iter = volume.getFileSystem().listFiles(volume.prefixChild(ServerConstants.WAL_DIR), true);
- while (iter.hasNext()) {
- LocatedFileStatus next = iter.next();
- // recursive listing returns directories, too
- if (next.isDirectory()) {
- continue;
- }
- // make sure we've waited long enough for zookeeper propagation
- if (System.currentTimeMillis() - next.getModificationTime() < context.getConnector().getInstance().getZooKeepersSessionTimeOut()) {
- continue;
- }
- Path path = next.getPath();
- String hostPlusPort = path.getParent().getName();
- // server is still alive, or has a replacement
- TServerInstance instance = liveServers.find(hostPlusPort);
- if (instance != null) {
- continue;
- }
- TServerInstance fake = new TServerInstance(hostPlusPort, 0L);
- Set<Path> paths = unusedLogs.get(fake);
- if (paths == null) {
- paths = new HashSet<>();
+ private Map<String,Path> getSortedWALogs() throws IOException {
+ return getSortedWALogs(ServerConstants.getRecoveryDirs());
+ }
+
+ /**
+ * Looks for write-ahead logs in recovery directories.
+ *
+ * @param recoveryDirs
+ * recovery directories
+ * @return map of log file names to paths
+ */
+ Map<String,Path> getSortedWALogs(String[] recoveryDirs) throws IOException {
+ Map<String,Path> result = new HashMap<String,Path>();
+
+ for (String dir : recoveryDirs) {
+ Path recoveryDir = new Path(dir);
+
+ if (fs.exists(recoveryDir)) {
+ for (FileStatus status : fs.listStatus(recoveryDir)) {
+ String name = status.getPath().getName();
+ if (isUUID(name)) {
+ result.put(name, status.getPath());
+ } else {
+ log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
+ }
}
- paths.add(path);
- unusedLogs.put(fake, paths);
}
}
- return count;
+ return result;
+ }
+
+ /**
+ * Checks if a string is a valid UUID.
+ *
+ * @param name
+ * string to check
+ * @return true if string is a UUID
+ */
+ static boolean isUUID(String name) {
+ if (name == null || name.length() != 36) {
+ return false;
+ }
+ try {
+ UUID.fromString(name);
+ return true;
+ } catch (IllegalArgumentException ex) {
+ return false;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 4f64c15..037023a 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -569,6 +569,7 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
replSpan.stop();
}
+ // Clean up any unused write-ahead logs
Span waLogs = Trace.start("walogs");
try {
GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index cb4b341..78ac4ac 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -37,11 +37,13 @@ import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
@@ -182,21 +184,20 @@ public class CloseWriteAheadLogReferences implements Runnable {
try {
// TODO Configurable number of threads
bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
- bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
- bs.fetchColumnFamily(CurrentLogsSection.COLF);
+ bs.setRanges(Collections.singleton(TabletsSection.getRange()));
+ bs.fetchColumnFamily(LogColumnFamily.NAME);
// For each log key/value in the metadata table
for (Entry<Key,Value> entry : bs) {
- if (entry.getValue().equals(CurrentLogsSection.UNUSED)) {
- continue;
- }
- Text tpath = new Text();
- CurrentLogsSection.getPath(entry.getKey(), tpath);
- String path = new Path(tpath.toString()).toString();
- log.debug("Found WAL " + path.toString());
+ // The value may contain multiple WALs
+ LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
+
+ log.debug("Found WALs for table(" + logEntry.extent.getTableId() + "): " + logEntry.logSet);
// Normalize each log file (using Path) and add it to the set
- referencedWals.add(normalizedWalPaths.get(path));
+ for (String logFile : logEntry.logSet) {
+ referencedWals.add(normalizedWalPaths.get(logFile));
+ }
}
} catch (TableNotFoundException e) {
// uhhhh
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
new file mode 100644
index 0000000..5801faa
--- /dev/null
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.gc;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.gc.thrift.GCStatus;
+import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
+public class GarbageCollectWriteAheadLogsTest {
+ private static final long BLOCK_SIZE = 64000000L;
+
+ private static final Path DIR_1_PATH = new Path("/dir1");
+ private static final Path DIR_2_PATH = new Path("/dir2");
+ private static final Path DIR_3_PATH = new Path("/dir3");
+ private static final String UUID1 = UUID.randomUUID().toString();
+ private static final String UUID2 = UUID.randomUUID().toString();
+ private static final String UUID3 = UUID.randomUUID().toString();
+
+ private Instance instance;
+ private AccumuloConfiguration systemConfig;
+ private VolumeManager volMgr;
+ private GarbageCollectWriteAheadLogs gcwal;
+ private long modTime;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Before
+ public void setUp() throws Exception {
+ SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class);
+ instance = createMock(Instance.class);
+ expect(instance.getInstanceID()).andReturn("mock").anyTimes();
+ expect(instance.getZooKeepers()).andReturn("localhost").anyTimes();
+ expect(instance.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes();
+ systemConfig = new ConfigurationCopy(new HashMap<String,String>());
+ volMgr = createMock(VolumeManager.class);
+ ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class);
+ expect(factory.getConfiguration()).andReturn(systemConfig).anyTimes();
+ expect(factory.getInstance()).andReturn(instance).anyTimes();
+ expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
+
+ // Just make the SiteConfiguration delegate to our AccumuloConfiguration
+ // Presently, we only need get(Property) and iterator().
+ EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() {
+ @Override
+ public String answer() {
+ Object[] args = EasyMock.getCurrentArguments();
+ return systemConfig.get((Property) args[0]);
+ }
+ }).anyTimes();
+ EasyMock.expect(siteConfig.getBoolean(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<Boolean>() {
+ @Override
+ public Boolean answer() {
+ Object[] args = EasyMock.getCurrentArguments();
+ return systemConfig.getBoolean((Property) args[0]);
+ }
+ }).anyTimes();
+
+ EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() {
+ @Override
+ public Iterator<Entry<String,String>> answer() {
+ return systemConfig.iterator();
+ }
+ }).anyTimes();
+
+ replay(instance, factory, siteConfig);
+ AccumuloServerContext context = new AccumuloServerContext(factory);
+ gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+ modTime = System.currentTimeMillis();
+ }
+
+ @Test
+ public void testGetters() {
+ assertSame(instance, gcwal.getInstance());
+ assertSame(volMgr, gcwal.getVolumeManager());
+ assertFalse(gcwal.isUsingTrash());
+ }
+
+ @Test
+ public void testPathsToStrings() {
+ ArrayList<Path> paths = new ArrayList<Path>();
+ paths.add(new Path(DIR_1_PATH, "file1"));
+ paths.add(DIR_2_PATH);
+ paths.add(new Path(DIR_3_PATH, "file3"));
+ List<String> strings = GarbageCollectWriteAheadLogs.paths2strings(paths);
+ int len = 3;
+ assertEquals(len, strings.size());
+ for (int i = 0; i < len; i++) {
+ assertEquals(paths.get(i).toString(), strings.get(i));
+ }
+ }
+
+ @Test
+ public void testMapServersToFiles() {
+ // @formatter:off
+ /*
+ * Test fileToServerMap:
+ * /dir1/server1/uuid1 -> server1 (new-style)
+ * /dir1/uuid2 -> "" (old-style)
+ * /dir3/server3/uuid3 -> server3 (new-style)
+ */
+ // @formatter:on
+ Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
+ Path path1 = new Path(new Path(DIR_1_PATH, "server1"), UUID1);
+ fileToServerMap.put(path1, "server1"); // new-style
+ Path path2 = new Path(DIR_1_PATH, UUID2);
+ fileToServerMap.put(path2, ""); // old-style
+ Path path3 = new Path(new Path(DIR_3_PATH, "server3"), UUID3);
+ fileToServerMap.put(path3, "server3"); // old-style
+ // @formatter:off
+ /*
+ * Test nameToFileMap:
+ * uuid1 -> /dir1/server1/uuid1
+ * uuid3 -> /dir3/server3/uuid3
+ */
+ // @formatter:on
+ Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
+ nameToFileMap.put(UUID1, path1);
+ nameToFileMap.put(UUID3, path3);
+
+ // @formatter:off
+ /*
+ * Expected map:
+ * server1 -> [ /dir1/server1/uuid1 ]
+ * server3 -> [ /dir3/server3/uuid3 ]
+ */
+ // @formatter:on
+ Map<String,ArrayList<Path>> result = GarbageCollectWriteAheadLogs.mapServersToFiles(fileToServerMap, nameToFileMap);
+ assertEquals(2, result.size());
+ ArrayList<Path> list1 = result.get("server1");
+ assertEquals(1, list1.size());
+ assertTrue(list1.contains(path1));
+ ArrayList<Path> list3 = result.get("server3");
+ assertEquals(1, list3.size());
+ assertTrue(list3.contains(path3));
+ }
+
+ private FileStatus makeFileStatus(int size, Path path) {
+ boolean isDir = (size == 0);
+ return new FileStatus(size, isDir, 3, BLOCK_SIZE, modTime, path);
+ }
+
+ private void mockListStatus(Path dir, FileStatus... fileStatuses) throws Exception {
+ expect(volMgr.listStatus(dir)).andReturn(fileStatuses);
+ }
+
+ @Test
+ public void testScanServers_NewStyle() throws Exception {
+ String[] walDirs = new String[] {"/dir1", "/dir2", "/dir3"};
+ // @formatter:off
+ /*
+ * Test directory layout:
+ * /dir1/
+ * server1/
+ * uuid1
+ * file2
+ * subdir2/
+ * /dir2/ missing
+ * /dir3/
+ * server3/
+ * uuid3
+ */
+ // @formatter:on
+ Path serverDir1Path = new Path(DIR_1_PATH, "server1");
+ FileStatus serverDir1 = makeFileStatus(0, serverDir1Path);
+ Path subDir2Path = new Path(DIR_1_PATH, "subdir2");
+ FileStatus serverDir2 = makeFileStatus(0, subDir2Path);
+ mockListStatus(DIR_1_PATH, serverDir1, serverDir2);
+ Path path1 = new Path(serverDir1Path, UUID1);
+ FileStatus file1 = makeFileStatus(100, path1);
+ FileStatus file2 = makeFileStatus(200, new Path(serverDir1Path, "file2"));
+ mockListStatus(serverDir1Path, file1, file2);
+ mockListStatus(subDir2Path);
+ expect(volMgr.listStatus(DIR_2_PATH)).andThrow(new FileNotFoundException());
+ Path serverDir3Path = new Path(DIR_3_PATH, "server3");
+ FileStatus serverDir3 = makeFileStatus(0, serverDir3Path);
+ mockListStatus(DIR_3_PATH, serverDir3);
+ Path path3 = new Path(serverDir3Path, UUID3);
+ FileStatus file3 = makeFileStatus(300, path3);
+ mockListStatus(serverDir3Path, file3);
+ replay(volMgr);
+
+ Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
+ Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
+ int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap);
+ assertEquals(3, count);
+ // @formatter:off
+ /*
+ * Expected fileToServerMap:
+ * /dir1/server1/uuid1 -> server1
+ * /dir3/server3/uuid3 -> server3
+ */
+ // @formatter:on
+ assertEquals(2, fileToServerMap.size());
+ assertEquals("server1", fileToServerMap.get(path1));
+ assertEquals("server3", fileToServerMap.get(path3));
+ // @formatter:off
+ /*
+ * Expected nameToFileMap:
+ * uuid1 -> /dir1/server1/uuid1
+ * uuid3 -> /dir3/server3/uuid3
+ */
+ // @formatter:on
+ assertEquals(2, nameToFileMap.size());
+ assertEquals(path1, nameToFileMap.get(UUID1));
+ assertEquals(path3, nameToFileMap.get(UUID3));
+ }
+
+ @Test
+ public void testScanServers_OldStyle() throws Exception {
+ // @formatter:off
+ /*
+ * Test directory layout:
+ * /dir1/
+ * uuid1
+ * /dir3/
+ * uuid3
+ */
+ // @formatter:on
+ String[] walDirs = new String[] {"/dir1", "/dir3"};
+ Path serverFile1Path = new Path(DIR_1_PATH, UUID1);
+ FileStatus serverFile1 = makeFileStatus(100, serverFile1Path);
+ mockListStatus(DIR_1_PATH, serverFile1);
+ Path serverFile3Path = new Path(DIR_3_PATH, UUID3);
+ FileStatus serverFile3 = makeFileStatus(300, serverFile3Path);
+ mockListStatus(DIR_3_PATH, serverFile3);
+ replay(volMgr);
+
+ Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
+ Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
+ int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap);
+ /*
+ * Expect only a single server, the non-server entry for upgrade WALs
+ */
+ assertEquals(1, count);
+ // @formatter:off
+ /*
+ * Expected fileToServerMap:
+ * /dir1/uuid1 -> ""
+ * /dir3/uuid3 -> ""
+ */
+ // @formatter:on
+ assertEquals(2, fileToServerMap.size());
+ assertEquals("", fileToServerMap.get(serverFile1Path));
+ assertEquals("", fileToServerMap.get(serverFile3Path));
+ // @formatter:off
+ /*
+ * Expected nameToFileMap:
+ * uuid1 -> /dir1/uuid1
+ * uuid3 -> /dir3/uuid3
+ */
+ // @formatter:on
+ assertEquals(2, nameToFileMap.size());
+ assertEquals(serverFile1Path, nameToFileMap.get(UUID1));
+ assertEquals(serverFile3Path, nameToFileMap.get(UUID3));
+ }
+
+ @Test
+ public void testGetSortedWALogs() throws Exception {
+ String[] recoveryDirs = new String[] {"/dir1", "/dir2", "/dir3"};
+ // @formatter:off
+ /*
+ * Test directory layout:
+ * /dir1/
+ * uuid1
+ * file2
+ * /dir2/ missing
+ * /dir3/
+ * uuid3
+ */
+ // @formatter:on
+ expect(volMgr.exists(DIR_1_PATH)).andReturn(true);
+ expect(volMgr.exists(DIR_2_PATH)).andReturn(false);
+ expect(volMgr.exists(DIR_3_PATH)).andReturn(true);
+ Path path1 = new Path(DIR_1_PATH, UUID1);
+ FileStatus file1 = makeFileStatus(100, path1);
+ FileStatus file2 = makeFileStatus(200, new Path(DIR_1_PATH, "file2"));
+ mockListStatus(DIR_1_PATH, file1, file2);
+ Path path3 = new Path(DIR_3_PATH, UUID3);
+ FileStatus file3 = makeFileStatus(300, path3);
+ mockListStatus(DIR_3_PATH, file3);
+ replay(volMgr);
+
+ Map<String,Path> sortedWalogs = gcwal.getSortedWALogs(recoveryDirs);
+ // @formatter:off
+ /*
+ * Expected map:
+ * uuid1 -> /dir1/uuid1
+ * uuid3 -> /dir3/uuid3
+ */
+ // @formatter:on
+ assertEquals(2, sortedWalogs.size());
+ assertEquals(path1, sortedWalogs.get(UUID1));
+ assertEquals(path3, sortedWalogs.get(UUID3));
+ }
+
+ @Test
+ public void testIsUUID() {
+ assertTrue(GarbageCollectWriteAheadLogs.isUUID(UUID.randomUUID().toString()));
+ assertFalse(GarbageCollectWriteAheadLogs.isUUID("foo"));
+ assertFalse(GarbageCollectWriteAheadLogs.isUUID("0" + UUID.randomUUID().toString()));
+ assertFalse(GarbageCollectWriteAheadLogs.isUUID(null));
+ }
+
+ // It was easier to do this than get the mocking working for me
+ private static class ReplicationGCWAL extends GarbageCollectWriteAheadLogs {
+
+ private List<Entry<Key,Value>> replData;
+
+ ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, List<Entry<Key,Value>> replData) throws IOException {
+ super(context, fs, useTrash);
+ this.replData = replData;
+ }
+
+ @Override
+ protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) {
+ return this.replData;
+ }
+ }
+
+ @Test
+ public void replicationEntriesAffectGC() throws Exception {
+ String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
+ Connector conn = createMock(Connector.class);
+
+ // Write a Status record which should prevent file1 from being deleted
+ LinkedList<Entry<Key,Value>> replData = new LinkedList<>();
+ replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())));
+
+ ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, replData);
+
+ replay(conn);
+
+ // Open (not-closed) file must be retained
+ assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
+
+ // No replication data, not needed
+ replData.clear();
+ assertFalse(replGC.neededByReplication(conn, "/wals/" + file2));
+
+ // The file is closed but not replicated, must be retained
+ replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileClosedValue()));
+ assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
+
+ // File is closed and fully replicated, can be deleted
+ replData.clear();
+ replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"),
+ ProtobufUtil.toValue(Status.newBuilder().setInfiniteEnd(true).setBegin(Long.MAX_VALUE).setClosed(true).build())));
+ assertFalse(replGC.neededByReplication(conn, "/wals/" + file1));
+ }
+
+ @Test
+ public void removeReplicationEntries() throws Exception {
+ String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
+
+ Instance inst = new MockInstance(testName.getMethodName());
+ AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
+
+ GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+
+ long file1CreateTime = System.currentTimeMillis();
+ long file2CreateTime = file1CreateTime + 50;
+ BatchWriter bw = ReplicationTable.getBatchWriter(context.getConnector());
+ Mutation m = new Mutation("/wals/" + file1);
+ StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
+ bw.addMutation(m);
+ m = new Mutation("/wals/" + file2);
+ StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
+ bw.addMutation(m);
+
+ // These WALs are potential candidates for deletion from fs
+ Map<String,Path> nameToFileMap = new HashMap<>();
+ nameToFileMap.put(file1, new Path("/wals/" + file1));
+ nameToFileMap.put(file2, new Path("/wals/" + file2));
+
+ Map<String,Path> sortedWALogs = Collections.emptyMap();
+
+ // Make the GCStatus and GcCycleStats
+ GCStatus status = new GCStatus();
+ GcCycleStats cycleStats = new GcCycleStats();
+ status.currentLog = cycleStats;
+
+ // We should iterate over two entries
+ Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
+
+ // We should have noted that two files were still in use
+ Assert.assertEquals(2l, cycleStats.inUse);
+
+ // Both should have been deleted
+ Assert.assertEquals(0, nameToFileMap.size());
+ }
+
+ @Test
+ public void replicationEntriesOnlyInMetaPreventGC() throws Exception {
+ String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
+
+ Instance inst = new MockInstance(testName.getMethodName());
+ AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
+
+ Connector conn = context.getConnector();
+
+ GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+
+ long file1CreateTime = System.currentTimeMillis();
+ long file2CreateTime = file1CreateTime + 50;
+ // Write some records to the metadata table, we haven't yet written status records to the replication table
+ BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file1);
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
+ bw.addMutation(m);
+
+ m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file2);
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
+ bw.addMutation(m);
+
+ // These WALs are potential candidates for deletion from fs
+ Map<String,Path> nameToFileMap = new HashMap<>();
+ nameToFileMap.put(file1, new Path("/wals/" + file1));
+ nameToFileMap.put(file2, new Path("/wals/" + file2));
+
+ Map<String,Path> sortedWALogs = Collections.emptyMap();
+
+ // Make the GCStatus and GcCycleStats objects
+ GCStatus status = new GCStatus();
+ GcCycleStats cycleStats = new GcCycleStats();
+ status.currentLog = cycleStats;
+
+ // We should iterate over two entries
+ Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
+
+ // We should have noted that two files were still in use
+ Assert.assertEquals(2l, cycleStats.inUse);
+
+ // Both should have been deleted
+ Assert.assertEquals(0, nameToFileMap.size());
+ }
+
+ @Test
+ public void noReplicationTableDoesntLimitMetatdataResults() throws Exception {
+ Instance inst = new MockInstance(testName.getMethodName());
+ AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
+ Connector conn = context.getConnector();
+
+ String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
+ BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
+ bw.addMutation(m);
+ bw.close();
+
+ GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+
+ Iterable<Entry<Key,Value>> data = gcWALs.getReplicationStatusForFile(conn, wal);
+ Entry<Key,Value> entry = Iterables.getOnlyElement(data);
+
+ Assert.assertEquals(ReplicationSection.getRowPrefix() + wal, entry.getKey().getRow().toString());
+ }
+
+ @Test
+ public void fetchesReplicationEntriesFromMetadataAndReplicationTables() throws Exception {
+ Instance inst = new MockInstance(testName.getMethodName());
+ AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
+ Connector conn = context.getConnector();
+
+ long walCreateTime = System.currentTimeMillis();
+ String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
+ BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
+ bw.addMutation(m);
+ bw.close();
+
+ bw = ReplicationTable.getBatchWriter(conn);
+ m = new Mutation(wal);
+ StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
+ bw.addMutation(m);
+ bw.close();
+
+ GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+
+ Iterable<Entry<Key,Value>> iter = gcWALs.getReplicationStatusForFile(conn, wal);
+ Map<Key,Value> data = new HashMap<>();
+ for (Entry<Key,Value> e : iter) {
+ data.put(e.getKey(), e.getValue());
+ }
+
+ Assert.assertEquals(2, data.size());
+
+ // Should get one element from each table (metadata and replication)
+ for (Key k : data.keySet()) {
+ String row = k.getRow().toString();
+ if (row.startsWith(ReplicationSection.getRowPrefix())) {
+ Assert.assertTrue(row.endsWith(wal));
+ } else {
+ Assert.assertEquals(wal, row);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index 78a5bd5..3115de1 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -50,12 +50,14 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
@@ -128,16 +130,22 @@ public class CloseWriteAheadLogReferencesTest {
public void findOneWalFromMetadata() throws Exception {
Connector conn = createMock(Connector.class);
BatchScanner bs = createMock(BatchScanner.class);
+
// Fake out some data
final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
- String file = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID();
- data.add(entry("tserver1:9997[1234567890]", file));
+ LogEntry logEntry = new LogEntry();
+ logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
+ logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID();
+ logEntry.server = "tserver1";
+ logEntry.tabletId = 1;
+ logEntry.logSet = Collections.singleton(logEntry.filename);
+ data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
// Get a batchscanner, scan the tablets section, fetch only the logs
expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
- bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
+ bs.setRanges(Collections.singleton(TabletsSection.getRange()));
expectLastCall().once();
- bs.fetchColumnFamily(CurrentLogsSection.COLF);
+ bs.fetchColumnFamily(LogColumnFamily.NAME);
expectLastCall().once();
expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
@@ -155,12 +163,54 @@ public class CloseWriteAheadLogReferencesTest {
// Validate
Set<String> wals = refs.getReferencedWals(conn);
- Assert.assertEquals(Collections.singleton(file), wals);
+ Assert.assertEquals(Collections.singleton(logEntry.filename), wals);
+
+ verify(conn, bs);
+ }
+
+ @Test
+ public void findManyWalFromSingleMetadata() throws Exception {
+ Connector conn = createMock(Connector.class);
+ BatchScanner bs = createMock(BatchScanner.class);
+
+ // Fake out some data
+ final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
+ LogEntry logEntry = new LogEntry();
+ logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
+ logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID();
+ logEntry.server = "tserver1";
+ logEntry.tabletId = 1;
+ // Multiple DFSLoggers
+ logEntry.logSet = Sets.newHashSet(logEntry.filename, "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID());
+ data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+ // Get a batchscanner, scan the tablets section, fetch only the logs
+ expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
+ bs.setRanges(Collections.singleton(TabletsSection.getRange()));
+ expectLastCall().once();
+ bs.fetchColumnFamily(LogColumnFamily.NAME);
+ expectLastCall().once();
+ expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
+
+ @Override
+ public Iterator<Entry<Key,Value>> answer() throws Throwable {
+ return data.iterator();
+ }
+
+ });
+ // Close the bs
+ bs.close();
+ expectLastCall().once();
+
+ replay(conn, bs);
+
+ // Validate
+ Set<String> wals = refs.getReferencedWals(conn);
+ Assert.assertEquals(logEntry.logSet, wals);
verify(conn, bs);
}
- // This is a silly test now
@Test
public void findManyRefsToSingleWalFromMetadata() throws Exception {
Connector conn = createMock(Connector.class);
@@ -170,14 +220,31 @@ public class CloseWriteAheadLogReferencesTest {
// Fake out some data
final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
- String filename = "hdfs://localhost:8020/accumulo/wal/tserver+9997/" + uuid;
- data.add(entry("tserver1:9997[0123456789]", filename));
+ LogEntry logEntry = new LogEntry();
+ logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
+ logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + uuid;
+ logEntry.server = "tserver1";
+ logEntry.tabletId = 1;
+ logEntry.logSet = Collections.singleton(logEntry.filename);
+ data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+ logEntry.extent = new KeyExtent(new Text("1"), new Text("c"), new Text("b"));
+ logEntry.server = "tserver1";
+ logEntry.tabletId = 2;
+ logEntry.logSet = Collections.singleton(logEntry.filename);
+ data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+ logEntry.extent = new KeyExtent(new Text("1"), null, new Text("c"));
+ logEntry.server = "tserver1";
+ logEntry.tabletId = 3;
+ logEntry.logSet = Collections.singleton(logEntry.filename);
+ data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
// Get a batchscanner, scan the tablets section, fetch only the logs
expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
- bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
+ bs.setRanges(Collections.singleton(TabletsSection.getRange()));
expectLastCall().once();
- bs.fetchColumnFamily(CurrentLogsSection.COLF);
+ bs.fetchColumnFamily(LogColumnFamily.NAME);
expectLastCall().once();
expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
@@ -195,7 +262,7 @@ public class CloseWriteAheadLogReferencesTest {
// Validate
Set<String> wals = refs.getReferencedWals(conn);
- Assert.assertEquals(Collections.singleton(filename), wals);
+ Assert.assertEquals(Collections.singleton(logEntry.filename), wals);
verify(conn, bs);
}
@@ -205,22 +272,59 @@ public class CloseWriteAheadLogReferencesTest {
Connector conn = createMock(Connector.class);
BatchScanner bs = createMock(BatchScanner.class);
- String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID();
- String file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+9997/" + UUID.randomUUID();
- String file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+9997/" + UUID.randomUUID();
+ String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+port/" + UUID.randomUUID(), file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+port/"
+ + UUID.randomUUID(), file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+port/" + UUID.randomUUID();
// Fake out some data
final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
-
- data.add(entry("tserver1:9997[1234567890]", file1));
- data.add(entry("tserver2:9997[1234567891]", file2));
- data.add(entry("tserver3:9997[1234567891]", file3));
+ LogEntry logEntry = new LogEntry();
+ logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
+ logEntry.filename = file1;
+ logEntry.server = "tserver1";
+ logEntry.tabletId = 1;
+ logEntry.logSet = Collections.singleton(logEntry.filename);
+ data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+ logEntry.extent = new KeyExtent(new Text("5"), null, null);
+ logEntry.tabletId = 2;
+ data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+ logEntry.extent = new KeyExtent(new Text("3"), new Text("b"), new Text("a"));
+ logEntry.filename = file2;
+ logEntry.server = "tserver2";
+ logEntry.tabletId = 3;
+ logEntry.logSet = Collections.singleton(logEntry.filename);
+ data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+ logEntry.extent = new KeyExtent(new Text("3"), new Text("c"), new Text("b"));
+ logEntry.tabletId = 4;
+ logEntry.logSet = Collections.singleton(logEntry.filename);
+ data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+ logEntry.extent = new KeyExtent(new Text("4"), new Text("5"), new Text("0"));
+ logEntry.filename = file3;
+ logEntry.server = "tserver3";
+ logEntry.tabletId = 5;
+ logEntry.logSet = Collections.singleton(logEntry.filename);
+ data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+ logEntry.extent = new KeyExtent(new Text("4"), new Text("8"), new Text("5"));
+ logEntry.server = "tserver3";
+ logEntry.tabletId = 7;
+ logEntry.logSet = Collections.singleton(logEntry.filename);
+ data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+ logEntry.extent = new KeyExtent(new Text("4"), null, new Text("8"));
+ logEntry.server = "tserver3";
+ logEntry.tabletId = 15;
+ logEntry.logSet = Collections.singleton(logEntry.filename);
+ data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
// Get a batchscanner, scan the tablets section, fetch only the logs
expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
- bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
+ bs.setRanges(Collections.singleton(TabletsSection.getRange()));
expectLastCall().once();
- bs.fetchColumnFamily(CurrentLogsSection.COLF);
+ bs.fetchColumnFamily(LogColumnFamily.NAME);
expectLastCall().once();
expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
@@ -243,11 +347,6 @@ public class CloseWriteAheadLogReferencesTest {
verify(conn, bs);
}
- private static Entry<Key,Value> entry(String session, String file) {
- Key key = new Key(new Text(CurrentLogsSection.getRowPrefix() + session), CurrentLogsSection.COLF, new Text(file));
- return Maps.immutableEntry(key, new Value());
- }
-
@Test
public void unusedWalsAreClosed() throws Exception {
Set<String> wals = Collections.emptySet();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 9a324fb..d00785a 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -423,9 +423,6 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
perm.grantNamespacePermission(user, Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.READ);
}
perm.grantNamespacePermission("root", Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE);
-
- // add the currlog location for root tablet current logs
- zoo.putPersistentData(ZooUtil.getRoot(getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS, new byte[0], NodeExistsPolicy.SKIP);
haveUpgradedZooKeeper = true;
} catch (Exception ex) {
// ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
index 3f15b39..592d9ae 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
@@ -173,8 +173,7 @@ class MasterClientServiceHandler extends FateServiceHandler implements MasterCli
scanner.setRange(MetadataSchema.TabletsSection.getRange());
} else {
scanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
- Range range = new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange();
- scanner.setRange(range.clip(MetadataSchema.TabletsSection.getRange()));
+ scanner.setRange(new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange());
}
TabletsSection.ServerColumnFamily.FLUSH_COLUMN.fetch(scanner);
TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index 2b874f6..4c47953 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -163,7 +163,6 @@ class TabletGroupWatcher extends Daemon {
List<Assignment> assigned = new ArrayList<Assignment>();
List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>();
Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>();
- Map<TServerInstance,List<Path>> logsForDeadServers = new TreeMap<>();
MasterState masterState = master.getMasterState();
int[] counts = new int[TabletState.values().length];
@@ -176,7 +175,6 @@ class TabletGroupWatcher extends Daemon {
if (tls == null) {
continue;
}
- Master.log.debug(store.name() + " location State: " + tls);
// ignore entries for tables that do not exist in zookeeper
if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null)
continue;
@@ -186,7 +184,7 @@ class TabletGroupWatcher extends Daemon {
// Don't overwhelm the tablet servers with work
if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
- flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned);
+ flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
assignments.clear();
assigned.clear();
assignedToDeadServers.clear();
@@ -206,9 +204,8 @@ class TabletGroupWatcher extends Daemon {
TabletGoalState goal = this.master.getGoalState(tls, mergeStats.getMergeInfo());
TServerInstance server = tls.getServer();
TabletState state = tls.getState(currentTServers.keySet());
- if (Master.log.isTraceEnabled()) {
- Master.log.trace("Goal state " + goal + " current " + state + " for " + tls.extent);
- }
+ if (Master.log.isTraceEnabled())
+ Master.log.trace("Goal state " + goal + " current " + state);
stats.update(tableId, state);
mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty());
sendChopRequest(mergeStats.getMergeInfo(), state, tls);
@@ -242,7 +239,7 @@ class TabletGroupWatcher extends Daemon {
assignedToDeadServers.add(tls);
if (server.equals(this.master.migrations.get(tls.extent)))
this.master.migrations.remove(tls.extent);
- MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(), tls.extent, tls.futureOrCurrent(), logsForDeadServers);
+ // log.info("Current servers " + currentTServers.keySet());
break;
case UNASSIGNED:
// maybe it's a finishing migration
@@ -276,7 +273,7 @@ class TabletGroupWatcher extends Daemon {
break;
case ASSIGNED_TO_DEAD_SERVER:
assignedToDeadServers.add(tls);
- MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(), tls.extent, tls.futureOrCurrent(), logsForDeadServers);
+ // log.info("Current servers " + currentTServers.keySet());
break;
case HOSTED:
TServerConnection conn = this.master.tserverSet.getConnection(server);
@@ -295,8 +292,7 @@ class TabletGroupWatcher extends Daemon {
counts[state.ordinal()]++;
}
- flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned);
- store.markLogsAsUnused(master, logsForDeadServers);
+ flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);
// provide stats after flushing changes to avoid race conditions w/ delete table
stats.end(masterState);
@@ -316,12 +312,8 @@ class TabletGroupWatcher extends Daemon {
updateMergeState(mergeStatsCache);
- if (this.master.tserverSet.getCurrentServers().equals(currentTServers.keySet())) {
- Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
- eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
- } else {
- Master.log.info("Detected change in current tserver set, re-running state machine.");
- }
+ Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
+ eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
} catch (Exception ex) {
Master.log.error("Error processing table state for store " + store.name(), ex);
if (ex.getCause() != null && ex.getCause() instanceof BadLocationStateException) {
@@ -739,13 +731,11 @@ class TabletGroupWatcher extends Daemon {
}
private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned,
- List<TabletLocationState> assignedToDeadServers, Map<TServerInstance,List<Path>> logsForDeadServers, Map<KeyExtent,TServerInstance> unassigned)
- throws DistributedStoreException, TException {
+ List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException {
if (!assignedToDeadServers.isEmpty()) {
int maxServersToShow = min(assignedToDeadServers.size(), 100);
Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");
- Master.log.debug("logs for dead servers: " + logsForDeadServers);
- store.unassign(assignedToDeadServers, logsForDeadServers);
+ store.unassign(assignedToDeadServers);
this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size());
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
index 74e9b78..8532e1b 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
@@ -107,7 +107,6 @@ public class WorkMaker {
// Don't create the record if we have nothing to do.
// TODO put this into a filter on serverside
if (!shouldCreateWork(status)) {
- log.debug("Not creating work: " + status.toString());
continue;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
index f1763be..a3c7e46 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java
@@ -30,7 +30,6 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.cli.ClientOpts;
@@ -188,7 +187,7 @@ public class MergeStats {
Text tableId = extent.getTableId();
Text first = KeyExtent.getMetadataEntry(tableId, start);
Range range = new Range(first, false, null, true);
- scanner.setRange(range.clip(MetadataSchema.TabletsSection.getRange()));
+ scanner.setRange(range);
KeyExtent prevExtent = null;
log.debug("Scanning range " + range);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java b/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
index 3c3bc37..6790858 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/ReplicationOperationsImplTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.accumulo.master;
+import java.util.Arrays;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
@@ -305,7 +306,13 @@ public class ReplicationOperationsImplTest {
bw.addMutation(m);
bw.close();
- LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId1), null, null), System.currentTimeMillis(), "tserver", file1);
+ LogEntry logEntry = new LogEntry();
+ logEntry.extent = new KeyExtent(new Text(tableId1), null, null);
+ logEntry.server = "tserver";
+ logEntry.filename = file1;
+ logEntry.tabletId = 1;
+ logEntry.logSet = Arrays.asList(file1);
+ logEntry.timestamp = System.currentTimeMillis();
bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
m = new Mutation(ReplicationSection.getRowPrefix() + file1);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
index 8cbea68..634ee89 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
@@ -186,7 +186,7 @@ public class TestMergeState {
// take it offline
m = tablet.getPrevRowUpdateMutation();
Collection<Collection<String>> walogs = Collections.emptyList();
- metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)), null);
+ metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)));
// now we can split
stats = scan(state, metaDataStateStore);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
index 4002da5..d2cc0cf 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java
@@ -181,7 +181,7 @@ public class RootTabletStateStoreTest {
} catch (BadLocationStateException e) {
fail("Unexpected error " + e);
}
- tstore.unassign(Collections.singletonList(assigned), null);
+ tstore.unassign(Collections.singletonList(assigned));
count = 0;
for (TabletLocationState location : tstore) {
assertEquals(location.extent, root);
@@ -209,7 +209,7 @@ public class RootTabletStateStoreTest {
fail("Unexpected error " + e);
}
try {
- tstore.unassign(Collections.singletonList(broken), null);
+ tstore.unassign(Collections.singletonList(broken));
Assert.fail("should not get here");
} catch (IllegalArgumentException ex) {}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/findbugs/exclude-filter.xml
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/findbugs/exclude-filter.xml b/server/tserver/src/main/findbugs/exclude-filter.xml
index a334163..47dd1f5 100644
--- a/server/tserver/src/main/findbugs/exclude-filter.xml
+++ b/server/tserver/src/main/findbugs/exclude-filter.xml
@@ -18,7 +18,7 @@
<Match>
<!-- locking is confusing, but probably correct -->
<Class name="org.apache.accumulo.tserver.tablet.Tablet" />
- <Method name="beginUpdatingLogsUsed" params="org.apache.accumulo.tserver.InMemoryMap,org.apache.accumulo.tserver.log.DfsLogger,boolean" returns="boolean" />
+ <Method name="beginUpdatingLogsUsed" params="org.apache.accumulo.tserver.InMemoryMap,java.util.Collection,boolean" returns="boolean" />
<Bug code="UL" pattern="UL_UNRELEASED_LOCK" />
</Match>
<Match>