You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/06/12 17:46:28 UTC
[3/3] accumulo git commit: ACCUMULO-3423 fixing issues found in ITs
ACCUMULO-3423 fixing issues found in ITs
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/844166a0
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/844166a0
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/844166a0
Branch: refs/heads/master
Commit: 844166a05a248d13148d8f82f7f135d808994c13
Parents: 2fe0bef
Author: Eric C. Newton <er...@gmail.com>
Authored: Fri Jun 12 11:45:53 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Fri Jun 12 11:45:53 2015 -0400
----------------------------------------------------------------------
.../gc/GarbageCollectWriteAheadLogs.java | 15 +++++
.../CloseWriteAheadLogReferences.java | 10 +--
.../gc/GarbageCollectWriteAheadLogsTest.java | 65 +++++++++++++-------
...bageCollectorCommunicatesWithTServersIT.java | 4 +-
.../test/replication/ReplicationIT.java | 39 ++++++++----
5 files changed, 93 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/844166a0/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index b8fb9fb..8803a40 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -39,6 +39,9 @@ import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.gc.thrift.GcCycleStats;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
@@ -315,6 +318,18 @@ public class GarbageCollectWriteAheadLogs {
Connector conn;
try {
conn = context.getConnector();
+ try {
+ final Scanner s = ReplicationTable.getScanner(conn);
+ StatusSection.limit(s);
+ for (Entry<Key,Value> entry : s) {
+ UUID id = path2uuid(new Path(entry.getKey().getRow().toString()));
+ candidates.remove(id);
+ log.info("Ignore closed log " + id + " because it is being replicated");
+ }
+ } catch (ReplicationTableOfflineException ex) {
+ return candidates.size();
+ }
+
final Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
scanner.setRange(MetadataSchema.ReplicationSection.getRange());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/844166a0/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 1444127..8857939 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -155,10 +155,10 @@ public class CloseWriteAheadLogReferences implements Runnable {
*
* @param conn
* Connector
- * @param referencedWals
- * {@link Set} of paths to WALs that are referenced in the tablets section of the metadata table
+ * @param closedWals
+ * {@link Set} of paths to WALs that marked as closed or unreferenced in zookeeper
*/
- protected long updateReplicationEntries(Connector conn, Set<String> referencedWals) {
+ protected long updateReplicationEntries(Connector conn, Set<String> closedWals) {
BatchScanner bs = null;
BatchWriter bw = null;
long recordsClosed = 0;
@@ -181,11 +181,11 @@ public class CloseWriteAheadLogReferences implements Runnable {
// Ignore things that aren't completely replicated as we can't delete those anyways
MetadataSchema.ReplicationSection.getFile(entry.getKey(), replFileText);
String replFile = replFileText.toString();
- boolean isReferenced = referencedWals.contains(replFile);
+ boolean isClosed = closedWals.contains(replFile);
// We only want to clean up WALs (which is everything but rfiles) and only when
// metadata doesn't have a reference to the given WAL
- if (!status.getClosed() && !replFile.endsWith(RFILE_SUFFIX) && !isReferenced) {
+ if (!status.getClosed() && !replFile.endsWith(RFILE_SUFFIX) && !isClosed) {
try {
closeWal(bw, entry.getKey());
recordsClosed++;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/844166a0/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index a40d390..60d6026 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -34,6 +34,8 @@ import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.gc.thrift.GcCycleStats;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.replication.ReplicationSchema;
+import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.AccumuloServerContext;
@@ -131,28 +133,35 @@ public class GarbageCollectWriteAheadLogsTest {
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
Connector conn = EasyMock.createMock(Connector.class);
- Scanner scanner = EasyMock.createMock(Scanner.class);
+ Scanner mscanner = EasyMock.createMock(Scanner.class);
+ Scanner rscanner = EasyMock.createMock(Scanner.class);
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once();
EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path));
EasyMock.expect(context.getConnector()).andReturn(conn);
- EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(scanner);
- scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
+
+ EasyMock.expect(conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)).andReturn(rscanner);
+ rscanner.fetchColumnFamily(ReplicationSchema.StatusSection.NAME);
+ EasyMock.expectLastCall().once();
+ EasyMock.expect(rscanner.iterator()).andReturn(emptyKV);
+
+ EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(mscanner);
+ mscanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
EasyMock.expectLastCall().once();
- scanner.setRange(MetadataSchema.ReplicationSection.getRange());
+ mscanner.setRange(MetadataSchema.ReplicationSection.getRange());
EasyMock.expectLastCall().once();
- EasyMock.expect(scanner.iterator()).andReturn(emptyKV);
+ EasyMock.expect(mscanner.iterator()).andReturn(emptyKV);
EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once();
marker.removeWalMarker(server2, id);
EasyMock.expectLastCall().once();
marker.forget(server2);
EasyMock.expectLastCall().once();
- EasyMock.replay(context, fs, marker, tserverSet, conn, scanner);
+ EasyMock.replay(context, fs, marker, tserverSet, conn, rscanner, mscanner);
GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, tserverSet, marker, tabletOnServer1List);
gc.collect(status);
- EasyMock.verify(context, fs, marker, tserverSet, conn, scanner);
+ EasyMock.verify(context, fs, marker, tserverSet, conn, rscanner, mscanner);
}
@Test
@@ -162,23 +171,30 @@ public class GarbageCollectWriteAheadLogsTest {
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
Connector conn = EasyMock.createMock(Connector.class);
- Scanner scanner = EasyMock.createMock(Scanner.class);
+ Scanner mscanner = EasyMock.createMock(Scanner.class);
+ Scanner rscanner = EasyMock.createMock(Scanner.class);
GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once();
EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path));
EasyMock.expect(context.getConnector()).andReturn(conn);
- EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(scanner);
- scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
+
+ EasyMock.expect(conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)).andReturn(rscanner);
+ rscanner.fetchColumnFamily(ReplicationSchema.StatusSection.NAME);
EasyMock.expectLastCall().once();
- scanner.setRange(MetadataSchema.ReplicationSection.getRange());
+ EasyMock.expect(rscanner.iterator()).andReturn(emptyKV);
+
+ EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(mscanner);
+ mscanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
EasyMock.expectLastCall().once();
- EasyMock.expect(scanner.iterator()).andReturn(emptyKV);
- EasyMock.replay(context, fs, marker, tserverSet, conn, scanner);
+ mscanner.setRange(MetadataSchema.ReplicationSection.getRange());
+ EasyMock.expectLastCall().once();
+ EasyMock.expect(mscanner.iterator()).andReturn(emptyKV);
+ EasyMock.replay(context, fs, marker, tserverSet, conn, rscanner, mscanner);
GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, tserverSet, marker, tabletOnServer2List);
gc.collect(status);
- EasyMock.verify(context, fs, marker, tserverSet, conn, scanner);
+ EasyMock.verify(context, fs, marker, tserverSet, conn, rscanner, mscanner);
}
@Test
@@ -188,7 +204,8 @@ public class GarbageCollectWriteAheadLogsTest {
WalStateManager marker = EasyMock.createMock(WalStateManager.class);
LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
Connector conn = EasyMock.createMock(Connector.class);
- Scanner scanner = EasyMock.createMock(Scanner.class);
+ Scanner mscanner = EasyMock.createMock(Scanner.class);
+ Scanner rscanner = EasyMock.createMock(Scanner.class);
String row = MetadataSchema.ReplicationSection.getRowPrefix() + path.toString();
String colf = MetadataSchema.ReplicationSection.COLF.toString();
String colq = "1";
@@ -200,15 +217,21 @@ public class GarbageCollectWriteAheadLogsTest {
EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<WalState,Path>(WalState.UNREFERENCED, path));
EasyMock.expect(context.getConnector()).andReturn(conn);
- EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(scanner);
- scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
+
+ EasyMock.expect(conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)).andReturn(rscanner);
+ rscanner.fetchColumnFamily(ReplicationSchema.StatusSection.NAME);
+ EasyMock.expectLastCall().once();
+ EasyMock.expect(rscanner.iterator()).andReturn(emptyKV);
+
+ EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(mscanner);
+ mscanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
EasyMock.expectLastCall().once();
- scanner.setRange(MetadataSchema.ReplicationSection.getRange());
+ mscanner.setRange(MetadataSchema.ReplicationSection.getRange());
EasyMock.expectLastCall().once();
- EasyMock.expect(scanner.iterator()).andReturn(replicationWork.entrySet().iterator());
- EasyMock.replay(context, fs, marker, tserverSet, conn, scanner);
+ EasyMock.expect(mscanner.iterator()).andReturn(replicationWork.entrySet().iterator());
+ EasyMock.replay(context, fs, marker, tserverSet, conn, rscanner, mscanner);
GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, tserverSet, marker, tabletOnServer1List);
gc.collect(status);
- EasyMock.verify(context, fs, marker, tserverSet, conn, scanner);
+ EasyMock.verify(context, fs, marker, tserverSet, conn, rscanner, mscanner);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/844166a0/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java b/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
index ab142d0..6a14de3 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@ -198,13 +198,11 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacB
log.info("Flushing mutations to the server");
bw.flush();
- log.info("Checking that metadata only has one WAL recorded for this table");
+ log.info("Checking that metadata only has two WALs recorded for this table (inUse, and opened)");
Set<String> wals = getWalsForTable(table);
Assert.assertEquals("Expected to only find two WALs for the table", 2, wals.size());
- log.info("Compacting the table which will remove all WALs from the tablets");
-
// Flush our test table to remove the WAL references in it
conn.tableOperations().flush(table, null, null, true);
// Flush the metadata table too because it will have a reference to the WAL
http://git-wip-us.apache.org/repos/asf/accumulo/blob/844166a0/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 77198df..55379a4 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -64,6 +64,7 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
@@ -405,6 +406,9 @@ public class ReplicationIT extends ConfigurableMacBase {
// Create two tables
conn.tableOperations().create(table1);
conn.tableOperations().create(table2);
+ conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.READ);
+ // wait for permission to propagate
+ Thread.sleep(5000);
// Enable replication on table1
conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
@@ -467,14 +471,10 @@ public class ReplicationIT extends ConfigurableMacBase {
// After writing data, we'll get a replication table online
Assert.assertTrue(ReplicationTable.isOnline(conn));
- conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.READ);
Set<String> tableIds = Sets.newHashSet(conn.tableOperations().tableIdMap().get(table1), conn.tableOperations().tableIdMap().get(table2));
Set<String> tableIdsForMetadata = Sets.newHashSet(tableIds);
- // Wait to make sure the table permission propagate
- Thread.sleep(5000);
-
s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
s.setRange(MetadataSchema.ReplicationSection.getRange());
@@ -482,6 +482,11 @@ public class ReplicationIT extends ConfigurableMacBase {
for (Entry<Key,Value> metadata : s) {
records.add(metadata);
}
+ s = ReplicationTable.getScanner(conn);
+ StatusSection.limit(s);
+ for (Entry<Key,Value> replication : s) {
+ records.add(replication);
+ }
Assert.assertEquals("Expected to find 2 records, but actually found " + records, 2, records.size());
@@ -580,12 +585,7 @@ public class ReplicationIT extends ConfigurableMacBase {
// Sleep a sufficient amount of time to ensure that we get the straggling WALs that might have been created at the end
Thread.sleep(5000);
- Scanner s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- Set<String> replFiles = new HashSet<>();
- for (Entry<Key,Value> entry : s) {
- replFiles.add(entry.getKey().getRow().toString());
- }
+ Set<String> replFiles = getReferencesToFilesToBeReplicated(conn);
// We might have a WAL that was use solely for the replication table
// We want to remove that from our list as it should not appear in the replication table
@@ -608,8 +608,25 @@ public class ReplicationIT extends ConfigurableMacBase {
for (String replFile : replFiles) {
Path p = new Path(replFile);
FileSystem fs = p.getFileSystem(conf);
- Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, fs.exists(p));
+ if (!fs.exists(p)) {
+ // double-check: the garbage collector can be fast
+ Set<String> currentSet = getReferencesToFilesToBeReplicated(conn);
+ log.info("Current references {}", currentSet);
+ log.info("Looking for reference to {}", replFile);
+ log.info("Contains? {}", currentSet.contains(replFile));
+ Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, !currentSet.contains(replFile));
+ }
+ }
+ }
+
+ private Set<String> getReferencesToFilesToBeReplicated(final Connector conn) throws ReplicationTableOfflineException {
+ Scanner s = ReplicationTable.getScanner(conn);
+ StatusSection.limit(s);
+ Set<String> replFiles = new HashSet<>();
+ for (Entry<Key,Value> entry : s) {
+ replFiles.add(entry.getKey().getRow().toString());
}
+ return replFiles;
}
@Test