You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/04/25 01:20:49 UTC
[03/34] accumulo git commit: ACCUMULO-3625 use log markers against
tservers, not tablets
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index ed7626e..a95cffa 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -23,8 +23,6 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -60,6 +58,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
@@ -86,6 +85,7 @@ import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.tablets.TabletTime;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -120,7 +120,7 @@ public class MetadataTableUtil {
return metadataTable;
}
- private synchronized static Writer getRootTable(ClientContext context) {
+ public synchronized static Writer getRootTable(ClientContext context) {
Credentials credentials = context.getCredentials();
Writer rootTable = root_tables.get(credentials);
if (rootTable == null) {
@@ -227,7 +227,7 @@ public class MetadataTableUtil {
// add before removing in case of process death
for (LogEntry logEntry : logsToAdd)
- addLogEntry(context, logEntry, zooLock);
+ addRootLogEntry(context, zooLock, logEntry);
removeUnusedWALEntries(context, extent, logsToRemove, zooLock);
} else {
@@ -252,6 +252,39 @@ public class MetadataTableUtil {
}
}
+ private static interface ZooOperation {
+ void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException;
+ }
+
+ private static void retryZooKeeperUpdate(ClientContext context, ZooLock zooLock, ZooOperation op) {
+ while (true) {
+ try {
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ if (zoo.isLockHeld(zooLock.getLockID())) {
+ op.run(zoo);
+ }
+ break;
+ } catch (KeeperException e) {
+ log.error(e, e);
+ } catch (InterruptedException e) {
+ log.error(e, e);
+ } catch (IOException e) {
+ log.error(e, e);
+ }
+ UtilWaitThread.sleep(1000);
+ }
+ }
+
+ private static void addRootLogEntry(AccumuloServerContext context, ZooLock zooLock, final LogEntry entry) {
+ retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+ @Override
+ public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+ String root = getZookeeperLogLocation();
+ rw.putPersistentData(root + "/" + entry.getUniqueID(), entry.toBytes(), NodeExistsPolicy.OVERWRITE);
+ }
+ });
+ }
+
public static SortedMap<FileRef,DataFileValue> getDataFileSizes(KeyExtent extent, ClientContext context) throws IOException {
TreeMap<FileRef,DataFileValue> sizes = new TreeMap<FileRef,DataFileValue>();
@@ -451,34 +484,6 @@ public class MetadataTableUtil {
return ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
}
- public static void addLogEntry(ClientContext context, LogEntry entry, ZooLock zooLock) {
- if (entry.extent.isRootTablet()) {
- String root = getZookeeperLogLocation();
- while (true) {
- try {
- IZooReaderWriter zoo = ZooReaderWriter.getInstance();
- if (zoo.isLockHeld(zooLock.getLockID())) {
- String[] parts = entry.filename.split("/");
- String uniqueId = parts[parts.length - 1];
- zoo.putPersistentData(root + "/" + uniqueId, entry.toBytes(), NodeExistsPolicy.OVERWRITE);
- }
- break;
- } catch (KeeperException e) {
- log.error(e, e);
- } catch (InterruptedException e) {
- log.error(e, e);
- } catch (IOException e) {
- log.error(e, e);
- }
- UtilWaitThread.sleep(1000);
- }
- } else {
- Mutation m = new Mutation(entry.getRow());
- m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
- update(context, zooLock, m, entry.extent);
- }
- }
-
public static void setRootTabletDir(String dir) throws IOException {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
String zpath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_PATH;
@@ -569,22 +574,11 @@ public class MetadataTableUtil {
}
}
- Collections.sort(result, new Comparator<LogEntry>() {
- @Override
- public int compare(LogEntry o1, LogEntry o2) {
- long diff = o1.timestamp - o2.timestamp;
- if (diff < 0)
- return -1;
- if (diff > 0)
- return 1;
- return 0;
- }
- });
log.info("Returning logs " + result + " for extent " + extent);
return result;
}
- static void getRootLogEntries(ArrayList<LogEntry> result) throws KeeperException, InterruptedException, IOException {
+ static void getRootLogEntries(final ArrayList<LogEntry> result) throws KeeperException, InterruptedException, IOException {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
String root = getZookeeperLogLocation();
// there's a little race between getting the children and fetching
@@ -592,11 +586,10 @@ public class MetadataTableUtil {
while (true) {
result.clear();
for (String child : zoo.getChildren(root)) {
- LogEntry e = new LogEntry();
try {
- e.fromBytes(zoo.getData(root + "/" + child, null));
+ LogEntry e = LogEntry.fromBytes(zoo.getData(root + "/" + child, null));
// upgrade from !0;!0<< -> +r<<
- e.extent = RootTable.EXTENT;
+ e = new LogEntry(RootTable.EXTENT, 0, e.server, e.filename);
result.add(e);
} catch (KeeperException.NoNodeException ex) {
continue;
@@ -666,28 +659,23 @@ public class MetadataTableUtil {
return new LogEntryIterator(context);
}
- public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, List<LogEntry> logEntries, ZooLock zooLock) {
+ public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, final List<LogEntry> entries, ZooLock zooLock) {
if (extent.isRootTablet()) {
- for (LogEntry entry : logEntries) {
- String root = getZookeeperLogLocation();
- while (true) {
- try {
- IZooReaderWriter zoo = ZooReaderWriter.getInstance();
- if (zoo.isLockHeld(zooLock.getLockID())) {
- String parts[] = entry.filename.split("/");
- zoo.recursiveDelete(root + "/" + parts[parts.length - 1], NodeMissingPolicy.SKIP);
- }
- break;
- } catch (Exception e) {
- log.error(e, e);
+ retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+ @Override
+ public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+ String root = getZookeeperLogLocation();
+ for (LogEntry entry : entries) {
+ String path = root + "/" + entry.getUniqueID();
+ log.debug("Removing " + path + " from zookeeper");
+ rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
}
- UtilWaitThread.sleep(1000);
}
- }
+ });
} else {
Mutation m = new Mutation(extent.getMetadataEntry());
- for (LogEntry entry : logEntries) {
- m.putDelete(LogColumnFamily.NAME, new Text(entry.getName()));
+ for (LogEntry entry : entries) {
+ m.putDelete(entry.getColumnFamily(), entry.getColumnQualifier());
}
update(context, zooLock, m, extent);
}
@@ -1072,4 +1060,106 @@ public class MetadataTableUtil {
return tabletEntries;
}
+ public static void addNewLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final String filename, KeyExtent extent) {
+ log.debug("Adding log entry " + filename);
+ if (extent.isRootTablet()) {
+ retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+ @Override
+ public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+ String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+ String[] parts = filename.split("/");
+ String uniqueId = parts[parts.length - 1];
+ String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId;
+ rw.putPersistentData(path, filename.getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
+ }
+ });
+ } else {
+ Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+ m.put("log", filename, new Value(EMPTY_BYTES));
+ String tableName = MetadataTable.NAME;
+ if (extent.isMeta()) {
+ tableName = RootTable.NAME;
+ }
+ try {
+ BatchWriter bw = context.getConnector().createBatchWriter(tableName, null);
+ bw.addMutation(m);
+ bw.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static void removeCurrentRootLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final String filename) {
+ retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
+ @Override
+ public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+ String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+ String[] parts = filename.split("/");
+ String uniqueId = parts[parts.length - 1];
+ String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId;
+ log.debug("Removing entry " + path + " from zookeeper");
+ rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
+ }
+ });
+ }
+
+ public static void markLogUnused(ClientContext context, ZooLock lock, TServerInstance tabletSession, Set<String> all) throws AccumuloException {
+ try {
+ BatchWriter root = context.getConnector().createBatchWriter(RootTable.NAME, null);
+ BatchWriter meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
+ for (String fname : all) {
+ Text tname = new Text(fname.getBytes(UTF_8));
+ Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+ m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname);
+ root.addMutation(m);
+ log.debug("deleting " + MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString() + " log:" + fname);
+ m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+ m.put(MetadataSchema.CurrentLogsSection.COLF, tname, MetadataSchema.CurrentLogsSection.UNUSED);
+ meta.addMutation(m);
+ removeCurrentRootLogMarker(context, lock, tabletSession, fname);
+ }
+ root.close();
+ meta.close();
+ } catch (Exception ex) {
+ throw new AccumuloException(ex);
+ }
+ }
+
+ public static void fetchLogsForDeadServer(ClientContext context, ZooLock lock, KeyExtent extent, TServerInstance server, Map<TServerInstance,List<String>> logsForDeadServers)
+ throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ // already cached
+ if (logsForDeadServers.containsKey(server)) {
+ return;
+ }
+ if (extent.isRootTablet()) {
+ final List<String> logs = new ArrayList<>();
+ retryZooKeeperUpdate(context, lock, new ZooOperation() {
+ @Override
+ public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
+ String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
+ logs.clear();
+ for (String child : rw.getChildren(root)) {
+ logs.add(new String(rw.getData(root + "/" + child, null), UTF_8));
+ }
+ }
+ });
+ logsForDeadServers.put(server, logs);
+ } else {
+ // use the correct meta table
+ String table = MetadataTable.NAME;
+ if (extent.isMeta()) {
+ table = RootTable.NAME;
+ }
+ // fetch the current logs in use, and put them in the cache
+ Scanner scanner = context.getConnector().createScanner(table, Authorizations.EMPTY);
+ scanner.setRange(new Range(MetadataSchema.CurrentLogsSection.getRowPrefix() + server.toString()));
+ List<String> logs = new ArrayList<>();
+ for (Entry<Key,Value> entry : scanner) {
+ logs.add(entry.getKey().getColumnQualifier().toString());
+ }
+ logsForDeadServers.put(server, logs);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
index 344e245..0de0b0e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
@@ -16,7 +16,6 @@
*/
package org.apache.accumulo.server.util;
-import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -176,20 +175,14 @@ public class ReplicationTableUtil {
/**
* Write replication ingest entries for each provided file with the given {@link Status}.
*/
- public static void updateFiles(ClientContext context, KeyExtent extent, Collection<String> files, Status stat) {
+ public static void updateFiles(ClientContext context, KeyExtent extent, String file, Status stat) {
if (log.isDebugEnabled()) {
- log.debug("Updating replication status for " + extent + " with " + files + " using " + ProtobufUtil.toString(stat));
+ log.debug("Updating replication status for " + extent + " with " + file + " using " + ProtobufUtil.toString(stat));
}
// TODO could use batch writer, would need to handle failure and retry like update does - ACCUMULO-1294
- if (files.isEmpty()) {
- return;
- }
Value v = ProtobufUtil.toValue(stat);
- for (String file : files) {
- // TODO Can preclude this addition if the extent is for a table we don't need to replicate
- update(context, createUpdateMutation(new Path(file), v, extent), extent);
- }
+ update(context, createUpdateMutation(new Path(file), v, extent), extent);
}
static Mutation createUpdateMutation(Path file, Value v, KeyExtent extent) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
index 355fa42..375e263 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
@@ -94,7 +94,7 @@ public class ReplicationTableUtilTest {
String myFile = "file:////home/user/accumulo/wal/server+port/" + uuid;
long createdTime = System.currentTimeMillis();
- ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"), null, null), Collections.singleton(myFile), StatusUtil.fileCreated(createdTime));
+ ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"), null, null), myFile, StatusUtil.fileCreated(createdTime));
verify(writer);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 35c60d6..2561eec 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -18,7 +18,7 @@ package org.apache.accumulo.gc;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -26,21 +26,22 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.UUID;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.gc.thrift.GcCycleStats;
import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
@@ -48,29 +49,29 @@ import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
import org.apache.accumulo.core.replication.StatusUtil;
import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
-import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
-import org.apache.accumulo.core.trace.Tracer;
-import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.master.LiveTServerSet;
+import org.apache.accumulo.server.master.LiveTServerSet.Listener;
+import org.apache.accumulo.server.master.state.MetaDataStateStore;
+import org.apache.accumulo.server.master.state.RootTabletStateStore;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletState;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.thrift.TException;
+import org.apache.hadoop.io.Text;
+import org.apache.htrace.Span;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
import com.google.common.net.HostAndPort;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -79,8 +80,7 @@ public class GarbageCollectWriteAheadLogs {
private final AccumuloServerContext context;
private final VolumeManager fs;
-
- private boolean useTrash;
+ private final boolean useTrash;
/**
* Creates a new GC WAL object.
@@ -98,54 +98,33 @@ public class GarbageCollectWriteAheadLogs {
this.useTrash = useTrash;
}
- /**
- * Gets the instance used by this object.
- *
- * @return instance
- */
- Instance getInstance() {
- return context.getInstance();
- }
-
- /**
- * Gets the volume manager used by this object.
- *
- * @return volume manager
- */
- VolumeManager getVolumeManager() {
- return fs;
- }
-
- /**
- * Checks if the volume manager should move files to the trash rather than delete them.
- *
- * @return true if trash is used
- */
- boolean isUsingTrash() {
- return useTrash;
- }
-
public void collect(GCStatus status) {
- Span span = Trace.start("scanServers");
+ Span span = Trace.start("getCandidates");
try {
-
- Map<String,Path> sortedWALogs = getSortedWALogs();
+ LiveTServerSet liveServers = new LiveTServerSet(context, new Listener() {
+ @Override
+ public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) {
+ log.debug("New tablet server noticed: " + added);
+ log.debug("Tablet server removed: " + deleted);
+ }
+ });
+ Set<TServerInstance> currentServers = liveServers.getCurrentServers();
status.currentLog.started = System.currentTimeMillis();
- Map<Path,String> fileToServerMap = new HashMap<Path,String>();
- Map<String,Path> nameToFileMap = new HashMap<String,Path>();
- int count = scanServers(fileToServerMap, nameToFileMap);
+ Map<TServerInstance, Set<String> > candidates = new HashMap<>();
+ long count = getCurrent(candidates, currentServers);
long fileScanStop = System.currentTimeMillis();
- log.info(String.format("Fetched %d files from %d servers in %.2f seconds", fileToServerMap.size(), count,
+
+ log.info(String.format("Fetched %d files for %d servers in %.2f seconds", count, candidates.size(),
(fileScanStop - status.currentLog.started) / 1000.));
- status.currentLog.candidates = fileToServerMap.size();
+ status.currentLog.candidates = count;
span.stop();
span = Trace.start("removeMetadataEntries");
try {
- count = removeMetadataEntries(nameToFileMap, sortedWALogs, status);
+ count = removeMetadataEntries(candidates, status, currentServers);
} catch (Exception ex) {
log.error("Unable to scan metadata table", ex);
return;
@@ -158,7 +137,7 @@ public class GarbageCollectWriteAheadLogs {
span = Trace.start("removeReplicationEntries");
try {
- count = removeReplicationEntries(nameToFileMap, sortedWALogs, status);
+ count = removeReplicationEntries(candidates, status);
} catch (Exception ex) {
log.error("Unable to scan replication table", ex);
return;
@@ -170,16 +149,23 @@ public class GarbageCollectWriteAheadLogs {
log.info(String.format("%d replication entries scanned in %.2f seconds", count, (replicationEntryScanStop - logEntryScanStop) / 1000.));
span = Trace.start("removeFiles");
- Map<String,ArrayList<Path>> serverToFileMap = mapServersToFiles(fileToServerMap, nameToFileMap);
- count = removeFiles(nameToFileMap, serverToFileMap, sortedWALogs, status);
+ count = removeFiles(candidates, status);
long removeStop = System.currentTimeMillis();
- log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, serverToFileMap.size(), (removeStop - logEntryScanStop) / 1000.));
+ log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, candidates.size(), (removeStop - logEntryScanStop) / 1000.));
+ span.stop();
+
+ span = Trace.start("removeMarkers");
+ count = removeMarkers(candidates);
+ long removeMarkersStop = System.currentTimeMillis();
+ log.info(String.format("%d markers removed in %.2f seconds", count, (removeMarkersStop - removeStop) / 1000.));
+ span.stop();
+
+
status.currentLog.finished = removeStop;
status.lastLog = status.currentLog;
status.currentLog = new GcCycleStats();
- span.stop();
} catch (Exception e) {
log.error("exception occured while garbage collecting write ahead logs", e);
@@ -188,161 +174,82 @@ public class GarbageCollectWriteAheadLogs {
}
}
- boolean holdsLock(HostAndPort addr) {
+ private long removeMarkers(Map<TServerInstance,Set<String>> candidates) {
+ long result = 0;
try {
- String zpath = ZooUtil.getRoot(context.getInstance()) + Constants.ZTSERVERS + "/" + addr.toString();
- List<String> children = ZooReaderWriter.getInstance().getChildren(zpath);
- return !(children == null || children.isEmpty());
- } catch (KeeperException.NoNodeException ex) {
- return false;
- } catch (Exception ex) {
- log.debug(ex.toString(), ex);
- return true;
- }
- }
-
- private int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>> serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
- for (Entry<String,ArrayList<Path>> entry : serverToFileMap.entrySet()) {
- if (entry.getKey().isEmpty()) {
- // old-style log entry, just remove it
- for (Path path : entry.getValue()) {
- log.debug("Removing old-style WAL " + path);
- try {
- if (!useTrash || !fs.moveToTrash(path))
- fs.deleteRecursively(path);
- status.currentLog.deleted++;
- } catch (FileNotFoundException ex) {
- // ignored
- } catch (IOException ex) {
- log.error("Unable to delete wal " + path + ": " + ex);
- }
- }
- } else {
- HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false);
- if (!holdsLock(address)) {
- for (Path path : entry.getValue()) {
- log.debug("Removing WAL for offline server " + path);
- try {
- if (!useTrash || !fs.moveToTrash(path))
- fs.deleteRecursively(path);
- status.currentLog.deleted++;
- } catch (FileNotFoundException ex) {
- // ignored
- } catch (IOException ex) {
- log.error("Unable to delete wal " + path + ": " + ex);
- }
- }
- continue;
- } else {
- Client tserver = null;
- try {
- tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context);
- tserver.removeLogs(Tracer.traceInfo(), context.rpcCreds(), paths2strings(entry.getValue()));
- log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
- status.currentLog.deleted += entry.getValue().size();
- } catch (TException e) {
- log.warn("Error talking to " + address + ": " + e);
- } finally {
- if (tserver != null)
- ThriftUtil.returnClient(tserver);
- }
+ BatchWriter root = context.getConnector().createBatchWriter(RootTable.NAME, null);
+ BatchWriter meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
+ for (Entry<TServerInstance,Set<String>> entry : candidates.entrySet()) {
+ Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + entry.toString());
+ for (String wal : entry.getValue()) {
+ m.putDelete(CurrentLogsSection.COLF, new Text(wal));
+ result++;
}
+ root.addMutation(m);
+ meta.addMutation(m);
}
+ meta.close();
+ root.close();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
}
+ return result;
+ }
- for (Path swalog : sortedWALogs.values()) {
- log.debug("Removing sorted WAL " + swalog);
- try {
- if (!useTrash || !fs.moveToTrash(swalog)) {
- fs.deleteRecursively(swalog);
- }
- } catch (FileNotFoundException ex) {
- // ignored
- } catch (IOException ioe) {
+ private long removeFiles(Map<TServerInstance, Set<String> > candidates, final GCStatus status) {
+ for (Entry<TServerInstance,Set<String>> entry : candidates.entrySet()) {
+ for (String walog : entry.getValue()) {
+ log.debug("Removing WAL for offline server " + entry.getKey() + " log " + walog);
+ Path path = new Path(walog);
try {
- if (fs.exists(swalog)) {
- log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
- }
+ if (!useTrash || !fs.moveToTrash(path))
+ fs.deleteRecursively(path);
+ status.currentLog.deleted++;
+ } catch (FileNotFoundException ex) {
+ // ignored
} catch (IOException ex) {
- log.error("Unable to check for the existence of " + swalog, ex);
+ log.error("Unable to delete wal " + path + ": " + ex);
}
}
}
-
- return 0;
+ return status.currentLog.deleted;
}
- /**
- * Converts a list of paths to their corresponding strings.
- *
- * @param paths
- * list of paths
- * @return string forms of paths
- */
- static List<String> paths2strings(List<Path> paths) {
- List<String> result = new ArrayList<String>(paths.size());
- for (Path path : paths)
- result.add(path.toString());
- return result;
- }
+ private long removeMetadataEntries(Map<TServerInstance, Set<String> > candidates, GCStatus status, Set<TServerInstance> liveServers) throws IOException, KeeperException,
+ InterruptedException {
- /**
- * Reverses the given mapping of file paths to servers. The returned map provides a list of file paths for each server. Any path whose name is not in the
- * mapping of file names to paths is skipped.
- *
- * @param fileToServerMap
- * map of file paths to servers
- * @param nameToFileMap
- * map of file names to paths
- * @return map of servers to lists of file paths
- */
- static Map<String,ArrayList<Path>> mapServersToFiles(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) {
- Map<String,ArrayList<Path>> result = new HashMap<String,ArrayList<Path>>();
- for (Entry<Path,String> fileServer : fileToServerMap.entrySet()) {
- if (!nameToFileMap.containsKey(fileServer.getKey().getName()))
- continue;
- ArrayList<Path> files = result.get(fileServer.getValue());
- if (files == null) {
- files = new ArrayList<Path>();
- result.put(fileServer.getValue(), files);
+ // remove any entries if there's a log reference, or a tablet is still assigned to the dead server
+
+ Map<String, TServerInstance> walToDeadServer = new HashMap<>();
+ for (Entry<TServerInstance,Set<String>> entry : candidates.entrySet()) {
+ for (String file : entry.getValue()) {
+ walToDeadServer.put(file, entry.getKey());
}
- files.add(fileServer.getKey());
}
- return result;
- }
-
- protected int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
- InterruptedException {
- int count = 0;
- Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(context);
-
- // For each WAL reference in the metadata table
- while (iterator.hasNext()) {
- // Each metadata reference has at least one WAL file
- for (String entry : iterator.next().logSet) {
- // old style WALs will have the IP:Port of their logger and new style will either be a Path either absolute or relative, in all cases
- // the last "/" will mark a UUID file name.
- String uuid = entry.substring(entry.lastIndexOf("/") + 1);
- if (!isUUID(uuid)) {
- // fully expect this to be a uuid, if its not then something is wrong and walog GC should not proceed!
- throw new IllegalArgumentException("Expected uuid, but got " + uuid + " from " + entry);
- }
-
- Path pathFromNN = nameToFileMap.remove(uuid);
- if (pathFromNN != null) {
- status.currentLog.inUse++;
- sortedWALogs.remove(uuid);
+ long count = 0;
+ RootTabletStateStore root = new RootTabletStateStore(context);
+ MetaDataStateStore meta = new MetaDataStateStore(context);
+ Iterator<TabletLocationState> states = Iterators.concat(root.iterator(), meta.iterator());
+ while (states.hasNext()) {
+ count++;
+ TabletLocationState state = states.next();
+ if (state.getState(liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) {
+ candidates.remove(state.current);
+ }
+ for (Collection<String> wals : state.walogs) {
+ for (String wal : wals) {
+ TServerInstance dead = walToDeadServer.get(wal);
+ if (dead != null) {
+ candidates.get(dead).remove(wal);
+ }
}
-
- count++;
}
}
-
return count;
}
- protected int removeReplicationEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
- InterruptedException {
+ protected int removeReplicationEntries(Map<TServerInstance, Set<String> > candidates, GCStatus status) throws IOException, KeeperException,
+ InterruptedException {
Connector conn;
try {
conn = context.getConnector();
@@ -353,21 +260,25 @@ public class GarbageCollectWriteAheadLogs {
int count = 0;
- Iterator<Entry<String,Path>> walIter = nameToFileMap.entrySet().iterator();
+ Iterator<Entry<TServerInstance,Set<String>>> walIter = candidates.entrySet().iterator();
while (walIter.hasNext()) {
- Entry<String,Path> wal = walIter.next();
- String fullPath = wal.getValue().toString();
- if (neededByReplication(conn, fullPath)) {
- log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
- // If we haven't already removed it, check to see if this WAL is
- // "in use" by replication (needed for replication purposes)
- status.currentLog.inUse++;
-
+ Entry<TServerInstance,Set<String>> wal = walIter.next();
+ Iterator<String> paths = wal.getValue().iterator();
+ while (paths.hasNext()) {
+ String fullPath = paths.next();
+ if (neededByReplication(conn, fullPath)) {
+ log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
+ // If we haven't already removed it, check to see if this WAL is
+ // "in use" by replication (needed for replication purposes)
+ status.currentLog.inUse++;
+ paths.remove();
+ } else {
+ log.debug("WAL not needed for replication {}", fullPath);
+ }
+ }
+ if (wal.getValue().isEmpty()) {
walIter.remove();
- sortedWALogs.remove(wal.getKey());
- } else {
- log.debug("WAL not needed for replication {}", fullPath);
}
count++;
}
@@ -375,6 +286,7 @@ public class GarbageCollectWriteAheadLogs {
return count;
}
+
/**
* Determine if the given WAL is needed for replication
*
@@ -435,107 +347,54 @@ public class GarbageCollectWriteAheadLogs {
return metaScanner;
}
- private int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
- return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap);
- }
-
- /**
- * Scans write-ahead log directories for logs. The maps passed in are populated with scan information.
- *
- * @param walDirs
- * write-ahead log directories
- * @param fileToServerMap
- * map of file paths to servers
- * @param nameToFileMap
- * map of file names to paths
- * @return number of servers located (including those with no logs present)
- */
- int scanServers(String[] walDirs, Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
- Set<String> servers = new HashSet<String>();
- for (String walDir : walDirs) {
- Path walRoot = new Path(walDir);
- FileStatus[] listing = null;
- try {
- listing = fs.listStatus(walRoot);
- } catch (FileNotFoundException e) {
- // ignore dir
- }
- if (listing == null)
- continue;
- for (FileStatus status : listing) {
- String server = status.getPath().getName();
- if (status.isDirectory()) {
- servers.add(server);
- for (FileStatus file : fs.listStatus(new Path(walRoot, server))) {
- if (isUUID(file.getPath().getName())) {
- fileToServerMap.put(file.getPath(), server);
- nameToFileMap.put(file.getPath().getName(), file.getPath());
- } else {
- log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid");
- }
- }
- } else if (isUUID(server)) {
- // old-style WAL are not under a directory
- servers.add("");
- fileToServerMap.put(status.getPath(), "");
- nameToFileMap.put(server, status.getPath());
- } else {
- log.info("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
- }
- }
- }
- return servers.size();
- }
- private Map<String,Path> getSortedWALogs() throws IOException {
- return getSortedWALogs(ServerConstants.getRecoveryDirs());
- }
/**
- * Looks for write-ahead logs in recovery directories.
+ * Scans log markers. The map passed in is populated with the logs for dead servers.
*
- * @param recoveryDirs
- * recovery directories
- * @return map of log file names to paths
+ * @param logsForDeadServers
+ * map of dead server to log file entries
+ * @return total number of log files
*/
- Map<String,Path> getSortedWALogs(String[] recoveryDirs) throws IOException {
- Map<String,Path> result = new HashMap<String,Path>();
-
- for (String dir : recoveryDirs) {
- Path recoveryDir = new Path(dir);
-
- if (fs.exists(recoveryDir)) {
- for (FileStatus status : fs.listStatus(recoveryDir)) {
- String name = status.getPath().getName();
- if (isUUID(name)) {
- result.put(name, status.getPath());
- } else {
- log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
- }
+ private long getCurrent(Map<TServerInstance, Set<String> > logsForDeadServers, Set<TServerInstance> currentServers) throws Exception {
+ Set<String> rootWALs = new HashSet<String>();
+ // Get entries in zookeeper:
+ String zpath = ZooUtil.getRoot(context.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
+ ZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ List<String> children = zoo.getChildren(zpath);
+ for (String child : children) {
+ LogEntry entry = LogEntry.fromBytes(zoo.getData(zpath + "/" + child, null));
+ rootWALs.add(entry.filename);
+ }
+ long count = 0;
+
+ // get all the WAL markers that are not in zookeeper for dead servers
+ Scanner rootScanner = context.getConnector().createScanner(RootTable.NAME, Authorizations.EMPTY);
+ rootScanner.setRange(CurrentLogsSection.getRange());
+ Scanner metaScanner = context.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ metaScanner.setRange(CurrentLogsSection.getRange());
+ Iterator<Entry<Key,Value>> entries = Iterators.concat(rootScanner.iterator(), metaScanner.iterator());
+ Text hostAndPort = new Text();
+ Text sessionId = new Text();
+ Text filename = new Text();
+ while (entries.hasNext()) {
+ Entry<Key,Value> entry = entries.next();
+ CurrentLogsSection.getTabletServer(entry.getKey(), hostAndPort, sessionId);
+ CurrentLogsSection.getPath(entry.getKey(), filename);
+ TServerInstance tsi = new TServerInstance(HostAndPort.fromString(hostAndPort.toString()), sessionId.toString());
+ if ((!currentServers.contains(tsi) || entry.getValue().equals(CurrentLogsSection.UNUSED)) && !rootWALs.contains(filename)) {
+ Set<String> logs = logsForDeadServers.get(tsi);
+ if (logs == null) {
+ logsForDeadServers.put(tsi, logs = new HashSet<String>());
+ }
+ if (logs.add(new Path(filename.toString()).toString())) {
+ count++;
}
}
}
- return result;
- }
- /**
- * Checks if a string is a valid UUID.
- *
- * @param name
- * string to check
- * @return true if string is a UUID
- */
- static boolean isUUID(String name) {
- if (name == null || name.length() != 36) {
- return false;
- }
- try {
- UUID.fromString(name);
- return true;
- } catch (IllegalArgumentException ex) {
- return false;
- }
+ return count;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 35005d8..9328225 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -568,7 +568,6 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
replSpan.stop();
}
- // Clean up any unused write-ahead logs
Span waLogs = Trace.start("walogs");
try {
GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 9b60c88..8185f23 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -37,15 +37,13 @@ import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.StatusUtil;
import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
@@ -186,20 +184,21 @@ public class CloseWriteAheadLogReferences implements Runnable {
try {
// TODO Configurable number of threads
bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
- bs.setRanges(Collections.singleton(TabletsSection.getRange()));
- bs.fetchColumnFamily(LogColumnFamily.NAME);
+ bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
+ bs.fetchColumnFamily(CurrentLogsSection.COLF);
// For each log key/value in the metadata table
for (Entry<Key,Value> entry : bs) {
- // The value may contain multiple WALs
- LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-
- log.debug("Found WALs for table(" + logEntry.extent.getTableId() + "): " + logEntry.logSet);
+ if (entry.getValue().equals(CurrentLogsSection.UNUSED)) {
+ continue;
+ }
+ Text tpath = new Text();
+ CurrentLogsSection.getPath(entry.getKey(), tpath);
+ String path = new Path(tpath.toString()).toString();
+ log.debug("Found WAL " + path.toString());
// Normalize each log file (using Path) and add it to the set
- for (String logFile : logEntry.logSet) {
- referencedWals.add(normalizedWalPaths.get(logFile));
- }
+ referencedWals.add(normalizedWalPaths.get(path));
}
} catch (TableNotFoundException e) {
// uhhhh
@@ -248,6 +247,8 @@ public class CloseWriteAheadLogReferences implements Runnable {
MetadataSchema.ReplicationSection.getFile(entry.getKey(), replFileText);
String replFile = replFileText.toString();
boolean isReferenced = referencedWals.contains(replFile);
+ log.debug("replFile " + replFile);
+ log.debug("referencedWals " + referencedWals);
// We only want to clean up WALs (which is everything but rfiles) and only when
// metadata doesn't have a reference to the given WAL
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
deleted file mode 100644
index 5224f28..0000000
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ /dev/null
@@ -1,568 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.gc;
-
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationCopy;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.conf.SiteConfiguration;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.gc.thrift.GCStatus;
-import org.apache.accumulo.core.gc.thrift.GcCycleStats;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.conf.ServerConfigurationFactory;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-public class GarbageCollectWriteAheadLogsTest {
- private static final long BLOCK_SIZE = 64000000L;
-
- private static final Path DIR_1_PATH = new Path("/dir1");
- private static final Path DIR_2_PATH = new Path("/dir2");
- private static final Path DIR_3_PATH = new Path("/dir3");
- private static final String UUID1 = UUID.randomUUID().toString();
- private static final String UUID2 = UUID.randomUUID().toString();
- private static final String UUID3 = UUID.randomUUID().toString();
-
- private Instance instance;
- private AccumuloConfiguration systemConfig;
- private VolumeManager volMgr;
- private GarbageCollectWriteAheadLogs gcwal;
- private AccumuloServerContext context;
- private long modTime;
-
- @Rule
- public TestName testName = new TestName();
-
- @Before
- public void setUp() throws Exception {
- SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class);
- instance = createMock(Instance.class);
- expect(instance.getInstanceID()).andReturn("mock").anyTimes();
- expect(instance.getZooKeepers()).andReturn("localhost").anyTimes();
- expect(instance.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes();
- systemConfig = new ConfigurationCopy(new HashMap<String,String>());
- volMgr = createMock(VolumeManager.class);
- ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class);
- expect(factory.getConfiguration()).andReturn(systemConfig).anyTimes();
- expect(factory.getInstance()).andReturn(instance).anyTimes();
- expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
-
- // Just make the SiteConfiguration delegate to our AccumuloConfiguration
- // Presently, we only need get(Property) and iterator().
- EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() {
- @Override
- public String answer() {
- Object[] args = EasyMock.getCurrentArguments();
- return systemConfig.get((Property) args[0]);
- }
- }).anyTimes();
- EasyMock.expect(siteConfig.getBoolean(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<Boolean>() {
- @Override
- public Boolean answer() {
- Object[] args = EasyMock.getCurrentArguments();
- return systemConfig.getBoolean((Property) args[0]);
- }
- }).anyTimes();
-
- EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() {
- @Override
- public Iterator<Entry<String,String>> answer() {
- return systemConfig.iterator();
- }
- }).anyTimes();
-
- replay(instance, factory, siteConfig);
- AccumuloServerContext context = new AccumuloServerContext(factory);
- gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false);
- modTime = System.currentTimeMillis();
- }
-
- @Test
- public void testGetters() {
- assertSame(instance, gcwal.getInstance());
- assertSame(volMgr, gcwal.getVolumeManager());
- assertFalse(gcwal.isUsingTrash());
- }
-
- @Test
- public void testPathsToStrings() {
- ArrayList<Path> paths = new ArrayList<Path>();
- paths.add(new Path(DIR_1_PATH, "file1"));
- paths.add(DIR_2_PATH);
- paths.add(new Path(DIR_3_PATH, "file3"));
- List<String> strings = GarbageCollectWriteAheadLogs.paths2strings(paths);
- int len = 3;
- assertEquals(len, strings.size());
- for (int i = 0; i < len; i++) {
- assertEquals(paths.get(i).toString(), strings.get(i));
- }
- }
-
- @Test
- public void testMapServersToFiles() {
- // @formatter:off
- /*
- * Test fileToServerMap:
- * /dir1/server1/uuid1 -> server1 (new-style)
- * /dir1/uuid2 -> "" (old-style)
- * /dir3/server3/uuid3 -> server3 (new-style)
- */
- // @formatter:on
- Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
- Path path1 = new Path(new Path(DIR_1_PATH, "server1"), UUID1);
- fileToServerMap.put(path1, "server1"); // new-style
- Path path2 = new Path(DIR_1_PATH, UUID2);
- fileToServerMap.put(path2, ""); // old-style
- Path path3 = new Path(new Path(DIR_3_PATH, "server3"), UUID3);
- fileToServerMap.put(path3, "server3"); // old-style
- // @formatter:off
- /*
- * Test nameToFileMap:
- * uuid1 -> /dir1/server1/uuid1
- * uuid3 -> /dir3/server3/uuid3
- */
- // @formatter:on
- Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
- nameToFileMap.put(UUID1, path1);
- nameToFileMap.put(UUID3, path3);
-
- // @formatter:off
- /*
- * Expected map:
- * server1 -> [ /dir1/server1/uuid1 ]
- * server3 -> [ /dir3/server3/uuid3 ]
- */
- // @formatter:on
- Map<String,ArrayList<Path>> result = GarbageCollectWriteAheadLogs.mapServersToFiles(fileToServerMap, nameToFileMap);
- assertEquals(2, result.size());
- ArrayList<Path> list1 = result.get("server1");
- assertEquals(1, list1.size());
- assertTrue(list1.contains(path1));
- ArrayList<Path> list3 = result.get("server3");
- assertEquals(1, list3.size());
- assertTrue(list3.contains(path3));
- }
-
- private FileStatus makeFileStatus(int size, Path path) {
- boolean isDir = (size == 0);
- return new FileStatus(size, isDir, 3, BLOCK_SIZE, modTime, path);
- }
-
- private void mockListStatus(Path dir, FileStatus... fileStatuses) throws Exception {
- expect(volMgr.listStatus(dir)).andReturn(fileStatuses);
- }
-
- @Test
- public void testScanServers_NewStyle() throws Exception {
- String[] walDirs = new String[] {"/dir1", "/dir2", "/dir3"};
- // @formatter:off
- /*
- * Test directory layout:
- * /dir1/
- * server1/
- * uuid1
- * file2
- * subdir2/
- * /dir2/ missing
- * /dir3/
- * server3/
- * uuid3
- */
- // @formatter:on
- Path serverDir1Path = new Path(DIR_1_PATH, "server1");
- FileStatus serverDir1 = makeFileStatus(0, serverDir1Path);
- Path subDir2Path = new Path(DIR_1_PATH, "subdir2");
- FileStatus serverDir2 = makeFileStatus(0, subDir2Path);
- mockListStatus(DIR_1_PATH, serverDir1, serverDir2);
- Path path1 = new Path(serverDir1Path, UUID1);
- FileStatus file1 = makeFileStatus(100, path1);
- FileStatus file2 = makeFileStatus(200, new Path(serverDir1Path, "file2"));
- mockListStatus(serverDir1Path, file1, file2);
- mockListStatus(subDir2Path);
- expect(volMgr.listStatus(DIR_2_PATH)).andThrow(new FileNotFoundException());
- Path serverDir3Path = new Path(DIR_3_PATH, "server3");
- FileStatus serverDir3 = makeFileStatus(0, serverDir3Path);
- mockListStatus(DIR_3_PATH, serverDir3);
- Path path3 = new Path(serverDir3Path, UUID3);
- FileStatus file3 = makeFileStatus(300, path3);
- mockListStatus(serverDir3Path, file3);
- replay(volMgr);
-
- Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
- Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
- int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap);
- assertEquals(3, count);
- // @formatter:off
- /*
- * Expected fileToServerMap:
- * /dir1/server1/uuid1 -> server1
- * /dir3/server3/uuid3 -> server3
- */
- // @formatter:on
- assertEquals(2, fileToServerMap.size());
- assertEquals("server1", fileToServerMap.get(path1));
- assertEquals("server3", fileToServerMap.get(path3));
- // @formatter:off
- /*
- * Expected nameToFileMap:
- * uuid1 -> /dir1/server1/uuid1
- * uuid3 -> /dir3/server3/uuid3
- */
- // @formatter:on
- assertEquals(2, nameToFileMap.size());
- assertEquals(path1, nameToFileMap.get(UUID1));
- assertEquals(path3, nameToFileMap.get(UUID3));
- }
-
- @Test
- public void testScanServers_OldStyle() throws Exception {
- // @formatter:off
- /*
- * Test directory layout:
- * /dir1/
- * uuid1
- * /dir3/
- * uuid3
- */
- // @formatter:on
- String[] walDirs = new String[] {"/dir1", "/dir3"};
- Path serverFile1Path = new Path(DIR_1_PATH, UUID1);
- FileStatus serverFile1 = makeFileStatus(100, serverFile1Path);
- mockListStatus(DIR_1_PATH, serverFile1);
- Path serverFile3Path = new Path(DIR_3_PATH, UUID3);
- FileStatus serverFile3 = makeFileStatus(300, serverFile3Path);
- mockListStatus(DIR_3_PATH, serverFile3);
- replay(volMgr);
-
- Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
- Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
- int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap);
- /*
- * Expect only a single server, the non-server entry for upgrade WALs
- */
- assertEquals(1, count);
- // @formatter:off
- /*
- * Expected fileToServerMap:
- * /dir1/uuid1 -> ""
- * /dir3/uuid3 -> ""
- */
- // @formatter:on
- assertEquals(2, fileToServerMap.size());
- assertEquals("", fileToServerMap.get(serverFile1Path));
- assertEquals("", fileToServerMap.get(serverFile3Path));
- // @formatter:off
- /*
- * Expected nameToFileMap:
- * uuid1 -> /dir1/uuid1
- * uuid3 -> /dir3/uuid3
- */
- // @formatter:on
- assertEquals(2, nameToFileMap.size());
- assertEquals(serverFile1Path, nameToFileMap.get(UUID1));
- assertEquals(serverFile3Path, nameToFileMap.get(UUID3));
- }
-
- @Test
- public void testGetSortedWALogs() throws Exception {
- String[] recoveryDirs = new String[] {"/dir1", "/dir2", "/dir3"};
- // @formatter:off
- /*
- * Test directory layout:
- * /dir1/
- * uuid1
- * file2
- * /dir2/ missing
- * /dir3/
- * uuid3
- */
- // @formatter:on
- expect(volMgr.exists(DIR_1_PATH)).andReturn(true);
- expect(volMgr.exists(DIR_2_PATH)).andReturn(false);
- expect(volMgr.exists(DIR_3_PATH)).andReturn(true);
- Path path1 = new Path(DIR_1_PATH, UUID1);
- FileStatus file1 = makeFileStatus(100, path1);
- FileStatus file2 = makeFileStatus(200, new Path(DIR_1_PATH, "file2"));
- mockListStatus(DIR_1_PATH, file1, file2);
- Path path3 = new Path(DIR_3_PATH, UUID3);
- FileStatus file3 = makeFileStatus(300, path3);
- mockListStatus(DIR_3_PATH, file3);
- replay(volMgr);
-
- Map<String,Path> sortedWalogs = gcwal.getSortedWALogs(recoveryDirs);
- // @formatter:off
- /*
- * Expected map:
- * uuid1 -> /dir1/uuid1
- * uuid3 -> /dir3/uuid3
- */
- // @formatter:on
- assertEquals(2, sortedWalogs.size());
- assertEquals(path1, sortedWalogs.get(UUID1));
- assertEquals(path3, sortedWalogs.get(UUID3));
- }
-
- @Test
- public void testIsUUID() {
- assertTrue(GarbageCollectWriteAheadLogs.isUUID(UUID.randomUUID().toString()));
- assertFalse(GarbageCollectWriteAheadLogs.isUUID("foo"));
- assertFalse(GarbageCollectWriteAheadLogs.isUUID("0" + UUID.randomUUID().toString()));
- assertFalse(GarbageCollectWriteAheadLogs.isUUID(null));
- }
-
- // It was easier to do this than get the mocking working for me
- private static class ReplicationGCWAL extends GarbageCollectWriteAheadLogs {
-
- private List<Entry<Key,Value>> replData;
-
- ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, List<Entry<Key,Value>> replData) throws IOException {
- super(context, fs, useTrash);
- this.replData = replData;
- }
-
- @Override
- protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) {
- return this.replData;
- }
- }
-
- @Test
- public void replicationEntriesAffectGC() throws Exception {
- String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
- Connector conn = createMock(Connector.class);
-
- // Write a Status record which should prevent file1 from being deleted
- LinkedList<Entry<Key,Value>> replData = new LinkedList<>();
- replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())));
-
- ReplicationGCWAL replGC = new ReplicationGCWAL(context, volMgr, false, replData);
-
- replay(conn);
-
- // Open (not-closed) file must be retained
- assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
-
- // No replication data, not needed
- replData.clear();
- assertFalse(replGC.neededByReplication(conn, "/wals/" + file2));
-
- // The file is closed but not replicated, must be retained
- replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileClosedValue()));
- assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
-
- // File is closed and fully replicated, can be deleted
- replData.clear();
- replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"),
- ProtobufUtil.toValue(Status.newBuilder().setInfiniteEnd(true).setBegin(Long.MAX_VALUE).setClosed(true).build())));
- assertFalse(replGC.neededByReplication(conn, "/wals/" + file1));
- }
-
- @Test
- public void removeReplicationEntries() throws Exception {
- String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
-
- Instance inst = new MockInstance(testName.getMethodName());
- AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
-
- GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-
- long file1CreateTime = System.currentTimeMillis();
- long file2CreateTime = file1CreateTime + 50;
- BatchWriter bw = ReplicationTable.getBatchWriter(context.getConnector());
- Mutation m = new Mutation("/wals/" + file1);
- StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
- bw.addMutation(m);
- m = new Mutation("/wals/" + file2);
- StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
- bw.addMutation(m);
-
- // These WALs are potential candidates for deletion from fs
- Map<String,Path> nameToFileMap = new HashMap<>();
- nameToFileMap.put(file1, new Path("/wals/" + file1));
- nameToFileMap.put(file2, new Path("/wals/" + file2));
-
- Map<String,Path> sortedWALogs = Collections.emptyMap();
-
- // Make the GCStatus and GcCycleStats
- GCStatus status = new GCStatus();
- GcCycleStats cycleStats = new GcCycleStats();
- status.currentLog = cycleStats;
-
- // We should iterate over two entries
- Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
-
- // We should have noted that two files were still in use
- Assert.assertEquals(2l, cycleStats.inUse);
-
- // Both should have been deleted
- Assert.assertEquals(0, nameToFileMap.size());
- }
-
- @Test
- public void replicationEntriesOnlyInMetaPreventGC() throws Exception {
- String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
-
- Instance inst = new MockInstance(testName.getMethodName());
- AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
-
- Connector conn = context.getConnector();
-
- GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-
- long file1CreateTime = System.currentTimeMillis();
- long file2CreateTime = file1CreateTime + 50;
- // Write some records to the metadata table, we haven't yet written status records to the replication table
- BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file1);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
- bw.addMutation(m);
-
- m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file2);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
- bw.addMutation(m);
-
- // These WALs are potential candidates for deletion from fs
- Map<String,Path> nameToFileMap = new HashMap<>();
- nameToFileMap.put(file1, new Path("/wals/" + file1));
- nameToFileMap.put(file2, new Path("/wals/" + file2));
-
- Map<String,Path> sortedWALogs = Collections.emptyMap();
-
- // Make the GCStatus and GcCycleStats objects
- GCStatus status = new GCStatus();
- GcCycleStats cycleStats = new GcCycleStats();
- status.currentLog = cycleStats;
-
- // We should iterate over two entries
- Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
-
- // We should have noted that two files were still in use
- Assert.assertEquals(2l, cycleStats.inUse);
-
- // Both should have been deleted
- Assert.assertEquals(0, nameToFileMap.size());
- }
-
- @Test
- public void noReplicationTableDoesntLimitMetatdataResults() throws Exception {
- Instance inst = new MockInstance(testName.getMethodName());
- AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
- Connector conn = context.getConnector();
-
- String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
- BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
- bw.addMutation(m);
- bw.close();
-
- GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-
- Iterable<Entry<Key,Value>> data = gcWALs.getReplicationStatusForFile(conn, wal);
- Entry<Key,Value> entry = Iterables.getOnlyElement(data);
-
- Assert.assertEquals(ReplicationSection.getRowPrefix() + wal, entry.getKey().getRow().toString());
- }
-
- @Test
- public void fetchesReplicationEntriesFromMetadataAndReplicationTables() throws Exception {
- Instance inst = new MockInstance(testName.getMethodName());
- AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
- Connector conn = context.getConnector();
-
- long walCreateTime = System.currentTimeMillis();
- String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
- BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
- bw.addMutation(m);
- bw.close();
-
- bw = ReplicationTable.getBatchWriter(conn);
- m = new Mutation(wal);
- StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
- bw.addMutation(m);
- bw.close();
-
- GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
-
- Iterable<Entry<Key,Value>> iter = gcWALs.getReplicationStatusForFile(conn, wal);
- Map<Key,Value> data = new HashMap<>();
- for (Entry<Key,Value> e : iter) {
- data.put(e.getKey(), e.getValue());
- }
-
- Assert.assertEquals(2, data.size());
-
- // Should get one element from each table (metadata and replication)
- for (Key k : data.keySet()) {
- String row = k.getRow().toString();
- if (row.startsWith(ReplicationSection.getRowPrefix())) {
- Assert.assertTrue(row.endsWith(wal));
- } else {
- Assert.assertEquals(wal, row);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index ba68890..f47f14b 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -46,20 +46,17 @@ import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.StatusUtil;
import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
@@ -130,22 +127,16 @@ public class CloseWriteAheadLogReferencesTest {
public void findOneWalFromMetadata() throws Exception {
Connector conn = createMock(Connector.class);
BatchScanner bs = createMock(BatchScanner.class);
-
// Fake out some data
final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
- LogEntry logEntry = new LogEntry();
- logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
- logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID();
- logEntry.server = "tserver1";
- logEntry.tabletId = 1;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+ String file = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID();
+ data.add(entry("tserver1:9997[1234567890]", file));
// Get a batchscanner, scan the tablets section, fetch only the logs
expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
- bs.setRanges(Collections.singleton(TabletsSection.getRange()));
+ bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
expectLastCall().once();
- bs.fetchColumnFamily(LogColumnFamily.NAME);
+ bs.fetchColumnFamily(CurrentLogsSection.COLF);
expectLastCall().once();
expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
@@ -163,54 +154,12 @@ public class CloseWriteAheadLogReferencesTest {
// Validate
Set<String> wals = refs.getReferencedWals(conn);
- Assert.assertEquals(Collections.singleton(logEntry.filename), wals);
-
- verify(conn, bs);
- }
-
- @Test
- public void findManyWalFromSingleMetadata() throws Exception {
- Connector conn = createMock(Connector.class);
- BatchScanner bs = createMock(BatchScanner.class);
-
- // Fake out some data
- final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
- LogEntry logEntry = new LogEntry();
- logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
- logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID();
- logEntry.server = "tserver1";
- logEntry.tabletId = 1;
- // Multiple DFSLoggers
- logEntry.logSet = Sets.newHashSet(logEntry.filename, "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID());
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- // Get a batchscanner, scan the tablets section, fetch only the logs
- expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
- bs.setRanges(Collections.singleton(TabletsSection.getRange()));
- expectLastCall().once();
- bs.fetchColumnFamily(LogColumnFamily.NAME);
- expectLastCall().once();
- expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
-
- @Override
- public Iterator<Entry<Key,Value>> answer() throws Throwable {
- return data.iterator();
- }
-
- });
- // Close the bs
- bs.close();
- expectLastCall().once();
-
- replay(conn, bs);
-
- // Validate
- Set<String> wals = refs.getReferencedWals(conn);
- Assert.assertEquals(logEntry.logSet, wals);
+ Assert.assertEquals(Collections.singleton(file), wals);
verify(conn, bs);
}
+ // This is a silly test now
@Test
public void findManyRefsToSingleWalFromMetadata() throws Exception {
Connector conn = createMock(Connector.class);
@@ -220,31 +169,14 @@ public class CloseWriteAheadLogReferencesTest {
// Fake out some data
final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
- LogEntry logEntry = new LogEntry();
- logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
- logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + uuid;
- logEntry.server = "tserver1";
- logEntry.tabletId = 1;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("1"), new Text("c"), new Text("b"));
- logEntry.server = "tserver1";
- logEntry.tabletId = 2;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("1"), null, new Text("c"));
- logEntry.server = "tserver1";
- logEntry.tabletId = 3;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+ String filename = "hdfs://localhost:8020/accumulo/wal/tserver+9997/" + uuid;
+ data.add(entry("tserver1:9997[0123456789]", filename));
// Get a batchscanner, scan the tablets section, fetch only the logs
expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
- bs.setRanges(Collections.singleton(TabletsSection.getRange()));
+ bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
expectLastCall().once();
- bs.fetchColumnFamily(LogColumnFamily.NAME);
+ bs.fetchColumnFamily(CurrentLogsSection.COLF);
expectLastCall().once();
expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
@@ -262,7 +194,7 @@ public class CloseWriteAheadLogReferencesTest {
// Validate
Set<String> wals = refs.getReferencedWals(conn);
- Assert.assertEquals(Collections.singleton(logEntry.filename), wals);
+ Assert.assertEquals(Collections.singleton(filename), wals);
verify(conn, bs);
}
@@ -272,59 +204,22 @@ public class CloseWriteAheadLogReferencesTest {
Connector conn = createMock(Connector.class);
BatchScanner bs = createMock(BatchScanner.class);
- String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+port/" + UUID.randomUUID(), file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+port/"
- + UUID.randomUUID(), file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+port/" + UUID.randomUUID();
+ String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID();
+ String file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+9997/" + UUID.randomUUID();
+ String file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+9997/" + UUID.randomUUID();
// Fake out some data
final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
- LogEntry logEntry = new LogEntry();
- logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a"));
- logEntry.filename = file1;
- logEntry.server = "tserver1";
- logEntry.tabletId = 1;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("5"), null, null);
- logEntry.tabletId = 2;
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("3"), new Text("b"), new Text("a"));
- logEntry.filename = file2;
- logEntry.server = "tserver2";
- logEntry.tabletId = 3;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("3"), new Text("c"), new Text("b"));
- logEntry.tabletId = 4;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("4"), new Text("5"), new Text("0"));
- logEntry.filename = file3;
- logEntry.server = "tserver3";
- logEntry.tabletId = 5;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("4"), new Text("8"), new Text("5"));
- logEntry.server = "tserver3";
- logEntry.tabletId = 7;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
-
- logEntry.extent = new KeyExtent(new Text("4"), null, new Text("8"));
- logEntry.server = "tserver3";
- logEntry.tabletId = 15;
- logEntry.logSet = Collections.singleton(logEntry.filename);
- data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue())));
+
+ data.add(entry("tserver1:9997[1234567890]", file1));
+ data.add(entry("tserver2:9997[1234567891]", file2));
+ data.add(entry("tserver3:9997[1234567891]", file3));
// Get a batchscanner, scan the tablets section, fetch only the logs
expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
- bs.setRanges(Collections.singleton(TabletsSection.getRange()));
+ bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
expectLastCall().once();
- bs.fetchColumnFamily(LogColumnFamily.NAME);
+ bs.fetchColumnFamily(CurrentLogsSection.COLF);
expectLastCall().once();
expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
@@ -347,6 +242,11 @@ public class CloseWriteAheadLogReferencesTest {
verify(conn, bs);
}
+ private static Entry<Key,Value> entry(String session, String file) {
+ Key key = new Key(new Text(CurrentLogsSection.getRowPrefix() + session), CurrentLogsSection.COLF, new Text(file));
+ return Maps.immutableEntry(key, new Value());
+ }
+
@Test
public void unusedWalsAreClosed() throws Exception {
Set<String> wals = Collections.emptySet();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 5e6dcfb..2434487 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -421,6 +421,9 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List
perm.grantNamespacePermission(user, Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.READ);
}
perm.grantNamespacePermission("root", Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE);
+
+ // add the currlog location for root tablet current logs
+ zoo.putPersistentData(ZooUtil.getRoot(getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS, new byte[0], NodeExistsPolicy.SKIP);
haveUpgradedZooKeeper = true;
} catch (Exception ex) {
log.fatal("Error performing upgrade", ex);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
index 3809a29..43939d2 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.thrift.TKeyExtent;
import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -162,7 +163,8 @@ class MasterClientServiceHandler extends FateServiceHandler implements MasterCli
scanner.setRange(MetadataSchema.TabletsSection.getRange());
} else {
scanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
- scanner.setRange(new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange());
+ Range range = new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange();
+ scanner.setRange(range.clip(MetadataSchema.TabletsSection.getRange()));
}
TabletsSection.ServerColumnFamily.FLUSH_COLUMN.fetch(scanner);
TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);