You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/05/21 06:04:07 UTC

[1/3] accumulo git commit: ACCUMULO-3838 Fix race condition in ReplicationIT

Repository: accumulo
Updated Branches:
  refs/heads/1.7 25d9b2854 -> 81fdad8d1
  refs/heads/master 9cd82738e -> cf9b9a4ea


ACCUMULO-3838 Fix race condition in ReplicationIT

It was possible that the master sees a WAL that's closed,
copies it to replication and then deletes it from metadata
before the test sees the log entry.

Make sure that the test observes wals from metadata or
replication to make sure it passes.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/81fdad8d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/81fdad8d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/81fdad8d

Branch: refs/heads/1.7
Commit: 81fdad8d1b09a919feaa15885cd0b95bf15daa6e
Parents: 25d9b28
Author: Josh Elser <el...@apache.org>
Authored: Wed May 20 23:13:37 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 20 23:59:59 2015 -0400

----------------------------------------------------------------------
 .../test/replication/ReplicationIT.java         | 144 ++++++++++---------
 1 file changed, 73 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/81fdad8d/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 34c699e..e2ee215 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -41,6 +41,7 @@ import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.IteratorSetting.Column;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.conf.Property;
@@ -135,6 +136,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     scanner.setRange(new Range());
     for (Entry<Key,Value> entry : scanner) {
       if (Thread.interrupted()) {
+        Thread.currentThread().interrupt();
         return logs;
       }
 
@@ -148,6 +150,31 @@ public class ReplicationIT extends ConfigurableMacIT {
     return logs;
   }
 
+  private Multimap<String,String> getAllLogs(Connector conn) throws TableNotFoundException {
+    Multimap<String,String> logs = getLogs(conn);
+    try {
+      Scanner scanner = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY);
+      StatusSection.limit(scanner);
+      Text buff = new Text();
+      for (Entry<Key,Value> entry : scanner) {
+        if (Thread.interrupted()) {
+          Thread.currentThread().interrupt();
+          return logs;
+        }
+
+        StatusSection.getFile(entry.getKey(), buff);
+        String file = buff.toString();
+        StatusSection.getTableId(entry.getKey(), buff);
+        String tableId = buff.toString();
+
+        logs.put(file, tableId);
+      }
+    } catch (TableOfflineException e) {
+      log.debug("Replication table isn't online yet");
+    }
+    return logs;
+  }
+
   private void waitForGCLock(Connector conn) throws InterruptedException {
     // Check if the GC process has the lock before wasting our retry attempts
     ZooKeeperInstance zki = (ZooKeeperInstance) conn.getInstance();
@@ -513,7 +540,7 @@ public class ReplicationIT extends ConfigurableMacIT {
         // when that happens
         while (keepRunning.get()) {
           try {
-            logs.putAll(getLogs(conn));
+            logs.putAll(getAllLogs(conn));
           } catch (TableNotFoundException e) {
             log.error("Metadata table doesn't exist");
           }
@@ -604,6 +631,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     while (observedLogs.hasNext()) {
       Entry<String,String> observedLog = observedLogs.next();
       if (replicationTableId.equals(observedLog.getValue())) {
+        log.info("Removing {} because its tableId is for the replication table", observedLog);
         observedLogs.remove();
       }
     }
@@ -612,9 +640,10 @@ public class ReplicationIT extends ConfigurableMacIT {
     // They might not yet all be closed though (might be newfile)
     Assert.assertEquals("Metadata log distribution: " + logs, logs.keySet(), replFiles);
 
+    final Configuration conf = new Configuration();
     for (String replFile : replFiles) {
       Path p = new Path(replFile);
-      FileSystem fs = p.getFileSystem(new Configuration());
+      FileSystem fs = p.getFileSystem(conf);
       Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, fs.exists(p));
     }
   }
@@ -665,86 +694,59 @@ public class ReplicationIT extends ConfigurableMacIT {
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
     conn.tableOperations().deleteRows(ReplicationTable.NAME, null, null);
 
-    final AtomicBoolean keepRunning = new AtomicBoolean(true);
-    final Set<String> metadataWals = new HashSet<>();
-
-    Thread t = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        // Should really be able to interrupt here, but the Scanner throws a fit to the logger
-        // when that happens
-        while (keepRunning.get()) {
-          try {
-            metadataWals.addAll(getLogs(conn).keySet());
-          } catch (Exception e) {
-            log.error("Metadata table doesn't exist");
-          }
-        }
-      }
-
-    });
-
-    t.start();
-
     String table1 = "table1", table2 = "table2", table3 = "table3";
-    try {
-      conn.tableOperations().create(table1);
-      conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
-      conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
-      conn.tableOperations().create(table2);
-      conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
-      conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
-      conn.tableOperations().create(table3);
-      conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
-      conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+    conn.tableOperations().create(table1);
+    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+    conn.tableOperations().create(table2);
+    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+    conn.tableOperations().create(table3);
+    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 
-      // Write some data to table1
-      BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-      for (int rows = 0; rows < 200; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 500; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
+    // Write some data to table1
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
       }
+      bw.addMutation(m);
+    }
 
-      bw.close();
+    bw.close();
 
-      // Write some data to table2
-      bw = conn.createBatchWriter(table2, new BatchWriterConfig());
-      for (int rows = 0; rows < 200; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 500; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
+    // Write some data to table2
+    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
       }
+      bw.addMutation(m);
+    }
 
-      bw.close();
+    bw.close();
 
-      // Write some data to table3
-      bw = conn.createBatchWriter(table3, new BatchWriterConfig());
-      for (int rows = 0; rows < 200; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 500; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
+    // Write some data to table3
+    bw = conn.createBatchWriter(table3, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
       }
+      bw.addMutation(m);
+    }
 
-      bw.close();
-
-      // Flush everything to try to make the replication records
-      for (String table : Arrays.asList(table1, table2, table3)) {
-        conn.tableOperations().flush(table, null, null, true);
-      }
+    bw.close();
 
-    } finally {
-      keepRunning.set(false);
-      t.join(5000);
+    // Flush everything to try to make the replication records
+    for (String table : Arrays.asList(table1, table2, table3)) {
+      conn.tableOperations().flush(table, null, null, true);
     }
 
     for (String table : Arrays.asList(MetadataTable.NAME, table1, table2, table3)) {


[2/3] accumulo git commit: ACCUMULO-3838 Fix race condition in ReplicationIT

Posted by el...@apache.org.
ACCUMULO-3838 Fix race condition in ReplicationIT

It was possible that the master sees a WAL that's closed,
copies it to replication and then deletes it from metadata
before the test sees the log entry.

Make sure that the test observes wals from metadata or
replication to make sure it passes.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/81fdad8d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/81fdad8d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/81fdad8d

Branch: refs/heads/master
Commit: 81fdad8d1b09a919feaa15885cd0b95bf15daa6e
Parents: 25d9b28
Author: Josh Elser <el...@apache.org>
Authored: Wed May 20 23:13:37 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 20 23:59:59 2015 -0400

----------------------------------------------------------------------
 .../test/replication/ReplicationIT.java         | 144 ++++++++++---------
 1 file changed, 73 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/81fdad8d/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 34c699e..e2ee215 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -41,6 +41,7 @@ import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.IteratorSetting.Column;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.conf.Property;
@@ -135,6 +136,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     scanner.setRange(new Range());
     for (Entry<Key,Value> entry : scanner) {
       if (Thread.interrupted()) {
+        Thread.currentThread().interrupt();
         return logs;
       }
 
@@ -148,6 +150,31 @@ public class ReplicationIT extends ConfigurableMacIT {
     return logs;
   }
 
+  private Multimap<String,String> getAllLogs(Connector conn) throws TableNotFoundException {
+    Multimap<String,String> logs = getLogs(conn);
+    try {
+      Scanner scanner = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY);
+      StatusSection.limit(scanner);
+      Text buff = new Text();
+      for (Entry<Key,Value> entry : scanner) {
+        if (Thread.interrupted()) {
+          Thread.currentThread().interrupt();
+          return logs;
+        }
+
+        StatusSection.getFile(entry.getKey(), buff);
+        String file = buff.toString();
+        StatusSection.getTableId(entry.getKey(), buff);
+        String tableId = buff.toString();
+
+        logs.put(file, tableId);
+      }
+    } catch (TableOfflineException e) {
+      log.debug("Replication table isn't online yet");
+    }
+    return logs;
+  }
+
   private void waitForGCLock(Connector conn) throws InterruptedException {
     // Check if the GC process has the lock before wasting our retry attempts
     ZooKeeperInstance zki = (ZooKeeperInstance) conn.getInstance();
@@ -513,7 +540,7 @@ public class ReplicationIT extends ConfigurableMacIT {
         // when that happens
         while (keepRunning.get()) {
           try {
-            logs.putAll(getLogs(conn));
+            logs.putAll(getAllLogs(conn));
           } catch (TableNotFoundException e) {
             log.error("Metadata table doesn't exist");
           }
@@ -604,6 +631,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     while (observedLogs.hasNext()) {
       Entry<String,String> observedLog = observedLogs.next();
       if (replicationTableId.equals(observedLog.getValue())) {
+        log.info("Removing {} because its tableId is for the replication table", observedLog);
         observedLogs.remove();
       }
     }
@@ -612,9 +640,10 @@ public class ReplicationIT extends ConfigurableMacIT {
     // They might not yet all be closed though (might be newfile)
     Assert.assertEquals("Metadata log distribution: " + logs, logs.keySet(), replFiles);
 
+    final Configuration conf = new Configuration();
     for (String replFile : replFiles) {
       Path p = new Path(replFile);
-      FileSystem fs = p.getFileSystem(new Configuration());
+      FileSystem fs = p.getFileSystem(conf);
       Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, fs.exists(p));
     }
   }
@@ -665,86 +694,59 @@ public class ReplicationIT extends ConfigurableMacIT {
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
     conn.tableOperations().deleteRows(ReplicationTable.NAME, null, null);
 
-    final AtomicBoolean keepRunning = new AtomicBoolean(true);
-    final Set<String> metadataWals = new HashSet<>();
-
-    Thread t = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        // Should really be able to interrupt here, but the Scanner throws a fit to the logger
-        // when that happens
-        while (keepRunning.get()) {
-          try {
-            metadataWals.addAll(getLogs(conn).keySet());
-          } catch (Exception e) {
-            log.error("Metadata table doesn't exist");
-          }
-        }
-      }
-
-    });
-
-    t.start();
-
     String table1 = "table1", table2 = "table2", table3 = "table3";
-    try {
-      conn.tableOperations().create(table1);
-      conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
-      conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
-      conn.tableOperations().create(table2);
-      conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
-      conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
-      conn.tableOperations().create(table3);
-      conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
-      conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+    conn.tableOperations().create(table1);
+    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+    conn.tableOperations().create(table2);
+    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+    conn.tableOperations().create(table3);
+    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 
-      // Write some data to table1
-      BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-      for (int rows = 0; rows < 200; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 500; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
+    // Write some data to table1
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
       }
+      bw.addMutation(m);
+    }
 
-      bw.close();
+    bw.close();
 
-      // Write some data to table2
-      bw = conn.createBatchWriter(table2, new BatchWriterConfig());
-      for (int rows = 0; rows < 200; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 500; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
+    // Write some data to table2
+    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
       }
+      bw.addMutation(m);
+    }
 
-      bw.close();
+    bw.close();
 
-      // Write some data to table3
-      bw = conn.createBatchWriter(table3, new BatchWriterConfig());
-      for (int rows = 0; rows < 200; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 500; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
+    // Write some data to table3
+    bw = conn.createBatchWriter(table3, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
       }
+      bw.addMutation(m);
+    }
 
-      bw.close();
-
-      // Flush everything to try to make the replication records
-      for (String table : Arrays.asList(table1, table2, table3)) {
-        conn.tableOperations().flush(table, null, null, true);
-      }
+    bw.close();
 
-    } finally {
-      keepRunning.set(false);
-      t.join(5000);
+    // Flush everything to try to make the replication records
+    for (String table : Arrays.asList(table1, table2, table3)) {
+      conn.tableOperations().flush(table, null, null, true);
     }
 
     for (String table : Arrays.asList(MetadataTable.NAME, table1, table2, table3)) {


[3/3] accumulo git commit: Merge branch '1.7'

Posted by el...@apache.org.
Merge branch '1.7'

Conflicts:
	test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cf9b9a4e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cf9b9a4e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cf9b9a4e

Branch: refs/heads/master
Commit: cf9b9a4ea6cfd03a053e64c7feca964ca9c026aa
Parents: 9cd8273 81fdad8
Author: Josh Elser <el...@apache.org>
Authored: Thu May 21 00:03:56 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 21 00:03:56 2015 -0400

----------------------------------------------------------------------
 .../test/replication/ReplicationIT.java         | 93 +++++++++++---------
 1 file changed, 50 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/cf9b9a4e/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 54b42f4,e2ee215..ef81f2c
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@@ -132,32 -130,21 +133,33 @@@ public class ReplicationIT extends Conf
    }
  
    private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException {
 -    Multimap<String,String> logs = HashMultimap.create();
 +    // Map of server to tableId
 +    Multimap<TServerInstance,String> serverToTableID = HashMultimap.create();
      Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 -    scanner.fetchColumnFamily(LogColumnFamily.NAME);
 -    scanner.setRange(new Range());
 +    scanner.setRange(MetadataSchema.TabletsSection.getRange());
 +    scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
 +    for (Entry<Key,Value> entry : scanner) {
 +      TServerInstance key = new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier());
 +      byte[] tableId = KeyExtent.tableOfMetadataRow(entry.getKey().getRow());
 +      serverToTableID.put(key, new String(tableId, UTF_8));
 +    }
 +    // Map of logs to tableId
 +    Multimap<String,String> logs = HashMultimap.create();
 +    scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    scanner.setRange(MetadataSchema.CurrentLogsSection.getRange());
      for (Entry<Key,Value> entry : scanner) {
        if (Thread.interrupted()) {
+         Thread.currentThread().interrupt();
          return logs;
        }
 -
 -      LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
 -
 -      for (String log : logEntry.logSet) {
 -        // Need to normalize the log file from LogEntry
 -        logs.put(new Path(log).toString(), logEntry.extent.getTableId().toString());
 +      Text path = new Text();
 +      MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
 +      Text session = new Text();
 +      Text hostPort = new Text();
 +      MetadataSchema.CurrentLogsSection.getTabletServer(entry.getKey(), hostPort, session);
 +      TServerInstance server = new TServerInstance(AddressUtil.parseAddress(hostPort.toString(), false), session.toString());
 +      for (String tableId : serverToTableID.get(server)) {
 +        logs.put(new Path(path.toString()).toString(), tableId);
        }
      }
      return logs;
@@@ -575,12 -638,12 +603,13 @@@
  
      // We should have *some* reference to each log that was seen in the metadata table
      // They might not yet all be closed though (might be newfile)
 -    Assert.assertEquals("Metadata log distribution: " + logs, logs.keySet(), replFiles);
 +    Assert.assertTrue("Metadata log distribution: " + logs + "replFiles " + replFiles, logs.keySet().containsAll(replFiles));
 +    Assert.assertTrue("Difference between replication entries and current logs is bigger than one", logs.keySet().size() - replFiles.size() <= 1);
  
+     final Configuration conf = new Configuration();
      for (String replFile : replFiles) {
        Path p = new Path(replFile);
-       FileSystem fs = p.getFileSystem(new Configuration());
+       FileSystem fs = p.getFileSystem(conf);
        Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, fs.exists(p));
      }
    }
@@@ -631,53 -694,59 +660,31 @@@
      conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
      conn.tableOperations().deleteRows(ReplicationTable.NAME, null, null);
  
-     final AtomicBoolean keepRunning = new AtomicBoolean(true);
-     final Set<String> metadataWals = new HashSet<>();
- 
-     Thread t = new Thread(new Runnable() {
-       @Override
-       public void run() {
-         // Should really be able to interrupt here, but the Scanner throws a fit to the logger
-         // when that happens
-         while (keepRunning.get()) {
-           try {
-             metadataWals.addAll(getLogs(conn).keySet());
-           } catch (Exception e) {
-             log.error("Metadata table doesn't exist");
-           }
-         }
-       }
- 
-     });
- 
-     t.start();
- 
      String table1 = "table1", table2 = "table2", table3 = "table3";
-     try {
-       conn.tableOperations().create(table1);
-       conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
-       conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
-       conn.tableOperations().create(table2);
-       conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
-       conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
-       conn.tableOperations().create(table3);
-       conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
-       conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+     conn.tableOperations().create(table1);
+     conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+     conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+     conn.tableOperations().create(table2);
+     conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
+     conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+     conn.tableOperations().create(table3);
+     conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
+     conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
  
-       writeSomeData(conn, table1, 200, 500);
 -    // Write some data to table1
 -    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
 -    for (int rows = 0; rows < 200; rows++) {
 -      Mutation m = new Mutation(Integer.toString(rows));
 -      for (int cols = 0; cols < 500; cols++) {
 -        String value = Integer.toString(cols);
 -        m.put(value, "", value);
 -      }
 -      bw.addMutation(m);
 -    }
++    writeSomeData(conn, table1, 200, 500);
  
-       writeSomeData(conn, table2, 200, 500);
 -    bw.close();
 -
 -    // Write some data to table2
 -    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
 -    for (int rows = 0; rows < 200; rows++) {
 -      Mutation m = new Mutation(Integer.toString(rows));
 -      for (int cols = 0; cols < 500; cols++) {
 -        String value = Integer.toString(cols);
 -        m.put(value, "", value);
 -      }
 -      bw.addMutation(m);
 -    }
++    writeSomeData(conn, table2, 200, 500);
  
-       writeSomeData(conn, table3, 200, 500);
 -    bw.close();
++    writeSomeData(conn, table3, 200, 500);
  
-       // Flush everything to try to make the replication records
-       for (String table : Arrays.asList(table1, table2, table3)) {
-         conn.tableOperations().flush(table, null, null, true);
 -    // Write some data to table3
 -    bw = conn.createBatchWriter(table3, new BatchWriterConfig());
 -    for (int rows = 0; rows < 200; rows++) {
 -      Mutation m = new Mutation(Integer.toString(rows));
 -      for (int cols = 0; cols < 500; cols++) {
 -        String value = Integer.toString(cols);
 -        m.put(value, "", value);
--      }
 -      bw.addMutation(m);
++    // Flush everything to try to make the replication records
++    for (String table : Arrays.asList(table1, table2, table3)) {
++      conn.tableOperations().flush(table, null, null, true);
+     }
  
-     } finally {
-       keepRunning.set(false);
-       t.join(5000);
 -    bw.close();
 -
+     // Flush everything to try to make the replication records
+     for (String table : Arrays.asList(table1, table2, table3)) {
+       conn.tableOperations().flush(table, null, null, true);
      }
  
      for (String table : Arrays.asList(MetadataTable.NAME, table1, table2, table3)) {