You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/05/21 06:04:08 UTC

[2/3] accumulo git commit: ACCUMULO-3838 Fix race condition in ReplicationIT

ACCUMULO-3838 Fix race condition in ReplicationIT

It was possible that the master sees a WAL that's closed,
copies it to replication and then deletes it from metadata
before the test sees the log entry.

Make sure that the test observes wals from metadata or
replication to make sure it passes.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/81fdad8d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/81fdad8d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/81fdad8d

Branch: refs/heads/master
Commit: 81fdad8d1b09a919feaa15885cd0b95bf15daa6e
Parents: 25d9b28
Author: Josh Elser <el...@apache.org>
Authored: Wed May 20 23:13:37 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 20 23:59:59 2015 -0400

----------------------------------------------------------------------
 .../test/replication/ReplicationIT.java         | 144 ++++++++++---------
 1 file changed, 73 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/81fdad8d/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 34c699e..e2ee215 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -41,6 +41,7 @@ import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.IteratorSetting.Column;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.conf.Property;
@@ -135,6 +136,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     scanner.setRange(new Range());
     for (Entry<Key,Value> entry : scanner) {
       if (Thread.interrupted()) {
+        Thread.currentThread().interrupt();
         return logs;
       }
 
@@ -148,6 +150,31 @@ public class ReplicationIT extends ConfigurableMacIT {
     return logs;
   }
 
+  private Multimap<String,String> getAllLogs(Connector conn) throws TableNotFoundException {
+    Multimap<String,String> logs = getLogs(conn);
+    try {
+      Scanner scanner = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY);
+      StatusSection.limit(scanner);
+      Text buff = new Text();
+      for (Entry<Key,Value> entry : scanner) {
+        if (Thread.interrupted()) {
+          Thread.currentThread().interrupt();
+          return logs;
+        }
+
+        StatusSection.getFile(entry.getKey(), buff);
+        String file = buff.toString();
+        StatusSection.getTableId(entry.getKey(), buff);
+        String tableId = buff.toString();
+
+        logs.put(file, tableId);
+      }
+    } catch (TableOfflineException e) {
+      log.debug("Replication table isn't online yet");
+    }
+    return logs;
+  }
+
   private void waitForGCLock(Connector conn) throws InterruptedException {
     // Check if the GC process has the lock before wasting our retry attempts
     ZooKeeperInstance zki = (ZooKeeperInstance) conn.getInstance();
@@ -513,7 +540,7 @@ public class ReplicationIT extends ConfigurableMacIT {
         // when that happens
         while (keepRunning.get()) {
           try {
-            logs.putAll(getLogs(conn));
+            logs.putAll(getAllLogs(conn));
           } catch (TableNotFoundException e) {
             log.error("Metadata table doesn't exist");
           }
@@ -604,6 +631,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     while (observedLogs.hasNext()) {
       Entry<String,String> observedLog = observedLogs.next();
       if (replicationTableId.equals(observedLog.getValue())) {
+        log.info("Removing {} because its tableId is for the replication table", observedLog);
         observedLogs.remove();
       }
     }
@@ -612,9 +640,10 @@ public class ReplicationIT extends ConfigurableMacIT {
     // They might not yet all be closed though (might be newfile)
     Assert.assertEquals("Metadata log distribution: " + logs, logs.keySet(), replFiles);
 
+    final Configuration conf = new Configuration();
     for (String replFile : replFiles) {
       Path p = new Path(replFile);
-      FileSystem fs = p.getFileSystem(new Configuration());
+      FileSystem fs = p.getFileSystem(conf);
       Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, fs.exists(p));
     }
   }
@@ -665,86 +694,59 @@ public class ReplicationIT extends ConfigurableMacIT {
     conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
     conn.tableOperations().deleteRows(ReplicationTable.NAME, null, null);
 
-    final AtomicBoolean keepRunning = new AtomicBoolean(true);
-    final Set<String> metadataWals = new HashSet<>();
-
-    Thread t = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        // Should really be able to interrupt here, but the Scanner throws a fit to the logger
-        // when that happens
-        while (keepRunning.get()) {
-          try {
-            metadataWals.addAll(getLogs(conn).keySet());
-          } catch (Exception e) {
-            log.error("Metadata table doesn't exist");
-          }
-        }
-      }
-
-    });
-
-    t.start();
-
     String table1 = "table1", table2 = "table2", table3 = "table3";
-    try {
-      conn.tableOperations().create(table1);
-      conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
-      conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
-      conn.tableOperations().create(table2);
-      conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
-      conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
-      conn.tableOperations().create(table3);
-      conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
-      conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+    conn.tableOperations().create(table1);
+    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+    conn.tableOperations().create(table2);
+    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
+    conn.tableOperations().create(table3);
+    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
+    conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1");
 
-      // Write some data to table1
-      BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
-      for (int rows = 0; rows < 200; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 500; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
+    // Write some data to table1
+    BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
       }
+      bw.addMutation(m);
+    }
 
-      bw.close();
+    bw.close();
 
-      // Write some data to table2
-      bw = conn.createBatchWriter(table2, new BatchWriterConfig());
-      for (int rows = 0; rows < 200; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 500; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
+    // Write some data to table2
+    bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
       }
+      bw.addMutation(m);
+    }
 
-      bw.close();
+    bw.close();
 
-      // Write some data to table3
-      bw = conn.createBatchWriter(table3, new BatchWriterConfig());
-      for (int rows = 0; rows < 200; rows++) {
-        Mutation m = new Mutation(Integer.toString(rows));
-        for (int cols = 0; cols < 500; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
+    // Write some data to table3
+    bw = conn.createBatchWriter(table3, new BatchWriterConfig());
+    for (int rows = 0; rows < 200; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 500; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
       }
+      bw.addMutation(m);
+    }
 
-      bw.close();
-
-      // Flush everything to try to make the replication records
-      for (String table : Arrays.asList(table1, table2, table3)) {
-        conn.tableOperations().flush(table, null, null, true);
-      }
+    bw.close();
 
-    } finally {
-      keepRunning.set(false);
-      t.join(5000);
+    // Flush everything to try to make the replication records
+    for (String table : Arrays.asList(table1, table2, table3)) {
+      conn.tableOperations().flush(table, null, null, true);
     }
 
     for (String table : Arrays.asList(MetadataTable.NAME, table1, table2, table3)) {