You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/09 17:29:22 UTC
[44/50] [abbrv] git commit: ACCUMULO-2575 Also check metadata table
to account for status updates that haven't made it to replication table yet
ACCUMULO-2575 Also check metadata table to account for status updates that haven't made it to replication table yet
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2372c71d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2372c71d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2372c71d
Branch: refs/heads/ACCUMULO-378
Commit: 2372c71df8ea7aa5299a93481d6b84088cc85049
Parents: b5cd35a
Author: Josh Elser <el...@apache.org>
Authored: Thu May 8 13:26:09 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 8 13:26:09 2014 -0400
----------------------------------------------------------------------
.../gc/GarbageCollectWriteAheadLogs.java | 18 +++--
.../gc/GarbageCollectWriteAheadLogsTest.java | 76 ++++++++++++++++++--
2 files changed, 86 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2372c71d/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index c373dd7..6e34e43 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -41,9 +41,12 @@ import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.StatusUtil;
import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
@@ -67,6 +70,7 @@ import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.zookeeper.KeeperException;
+import com.google.common.collect.Iterables;
import com.google.common.net.HostAndPort;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -392,14 +396,20 @@ public class GarbageCollectWriteAheadLogs {
}
protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) throws TableNotFoundException {
- Scanner s = ReplicationTable.getScanner(conn);
+ Scanner replScanner = ReplicationTable.getScanner(conn);
// Scan only the Status records
- StatusSection.limit(s);
+ StatusSection.limit(replScanner);
// Only look for this specific WAL
- s.setRange(Range.exact(wal));
+ replScanner.setRange(Range.exact(wal));
- return s;
+ Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ // Need to add in the replication section prefix
+ metaScanner.setRange(Range.exact(ReplicationSection.getRowPrefix() + wal));
+ // Limit the column family to be sure
+ metaScanner.fetchColumnFamily(ReplicationSection.COLF);
+
+ return Iterables.concat(replScanner, metaScanner);
}
private int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2372c71d/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 11390f5..749fc96 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -46,6 +46,9 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.gc.thrift.GCStatus;
+import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.StatusUtil;
@@ -56,9 +59,12 @@ import org.apache.accumulo.server.replication.ReplicationTable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
import com.google.common.collect.Maps;
@@ -76,7 +82,9 @@ public class GarbageCollectWriteAheadLogsTest {
private VolumeManager volMgr;
private GarbageCollectWriteAheadLogs gcwal;
private long modTime;
- private GCStatus status;
+
+ @Rule
+ public TestName testName = new TestName();
@Before
public void setUp() throws Exception {
@@ -84,7 +92,6 @@ public class GarbageCollectWriteAheadLogsTest {
volMgr = createMock(VolumeManager.class);
gcwal = new GarbageCollectWriteAheadLogs(instance, volMgr, false);
modTime = System.currentTimeMillis();
- status = createMock(GCStatus.class);
}
@Test
@@ -290,6 +297,7 @@ public class GarbageCollectWriteAheadLogsTest {
assertFalse(GarbageCollectWriteAheadLogs.isUUID(null));
}
+ // It was easier to do this than get the mocking working for me
private static class ReplicationGCWAL extends GarbageCollectWriteAheadLogs {
private List<Entry<Key,Value>> replData;
@@ -345,9 +353,11 @@ public class GarbageCollectWriteAheadLogsTest {
public void removeReplicationEntries() throws Exception {
String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
- Instance inst = new MockInstance("removeReplicationEntries");
+ Instance inst = new MockInstance(testName.getMethodName());
Credentials creds = new Credentials("root", new PasswordToken(""));
Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+
+ GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(inst, volMgr, false);
ReplicationTable.create(conn);
@@ -359,11 +369,69 @@ public class GarbageCollectWriteAheadLogsTest {
StatusSection.add(m, new Text("1"), StatusUtil.newFileValue());
bw.addMutation(m);
+ // These WALs are potential candidates for deletion from fs
Map<String,Path> nameToFileMap = new HashMap<>();
+ nameToFileMap.put(file1, new Path("/wals/" + file1));
+ nameToFileMap.put(file2, new Path("/wals/" + file2));
+
Map<String,Path> sortedWALogs = Collections.emptyMap();
- gcwal.removeReplicationEntries(nameToFileMap, sortedWALogs, this.status, creds);
+ // Make the GCStatus and GcCycleStats
+ GCStatus status = new GCStatus();
+ GcCycleStats cycleStats = new GcCycleStats();
+ status.currentLog = cycleStats;
+
+ // We should iterate over two entries
+ Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status, creds));
+
+ // We should have noted that two files were still in use
+ Assert.assertEquals(2l, cycleStats.inUse);
+
+ // Both should have been deleted
+ Assert.assertEquals(0, nameToFileMap.size());
+ }
+
+ @Test
+ public void replicationEntriesOnlyInMetaPreventGC() throws Exception {
+ String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
+
+ Instance inst = new MockInstance(testName.getMethodName());
+ Credentials creds = new Credentials("root", new PasswordToken(""));
+ Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+
+ GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(inst, volMgr, false);
+
+ ReplicationTable.create(conn);
+
+ // Write some records to the metadata table, we haven't yet written status records to the replication table
+ BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file1);
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.newFileValue());
+ bw.addMutation(m);
+
+ m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file2);
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.newFileValue());
+ bw.addMutation(m);
+
+ // These WALs are potential candidates for deletion from fs
+ Map<String,Path> nameToFileMap = new HashMap<>();
+ nameToFileMap.put(file1, new Path("/wals/" + file1));
+ nameToFileMap.put(file2, new Path("/wals/" + file2));
+
+ Map<String,Path> sortedWALogs = Collections.emptyMap();
+
+ // Make the GCStatus and GcCycleStats objects
+ GCStatus status = new GCStatus();
+ GcCycleStats cycleStats = new GcCycleStats();
+ status.currentLog = cycleStats;
+
+ // We should iterate over two entries
+ Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status, creds));
+
+ // We should have noted that two files were still in use
+ Assert.assertEquals(2l, cycleStats.inUse);
+ // Both should have been deleted
Assert.assertEquals(0, nameToFileMap.size());
}
}