You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2019/04/05 23:53:57 UTC

[accumulo] branch 1.9 updated: Improve FateConcurrencyIT test (#1061)

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new b52c165  Improve FateConcurrencyIT test (#1061)
b52c165 is described below

commit b52c16576e45c59190f8334301ccb120e7c627d5
Author: EdColeman <de...@etcoleman.com>
AuthorDate: Fri Apr 5 19:53:53 2019 -0400

    Improve FateConcurrencyIT test (#1061)
    
    - Removes NPE that sometimes caused the test to fail during release testing.
---
 .../test/functional/FateConcurrencyIT.java         | 175 ++++++++++++++-------
 1 file changed, 120 insertions(+), 55 deletions(-)

diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
index 56d51ad..dc1c225 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
@@ -18,12 +18,15 @@ package org.apache.accumulo.test.functional;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -194,69 +197,109 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
    * Validate the the AdminUtil.getStatus works correctly after refactor and validate that
    * getTransactionStatus can be called without lock map(s). The test starts a long running fate
    * transaction (slow compaction) and the calls AdminUtil functions to get the FATE.
-   *
-   * @throws Exception
-   *           any exception is a test failure
    */
   @Test
-  public void getFateStatus() throws Exception {
+  public void getFateStatus() {
 
-    assertEquals("verify table online after created", TableState.ONLINE, getTableState(tableName));
+    Instance instance = connector.getInstance();
+    String tableId;
+
+    try {
+
+      assertEquals("verify table online after created", TableState.ONLINE,
+          getTableState(tableName));
+
+      tableId = Tables.getTableId(instance, tableName);
+
+      log.trace("tid: {}", tableId);
+
+    } catch (TableNotFoundException ex) {
+      throw new IllegalStateException(
+          String.format("Table %s does not exist, failing test", tableName));
+    }
 
     Future<?> compactTask = startCompactTask();
 
     assertTrue("compaction fate transaction exits", findFate(tableName));
 
-    Instance instance = connector.getInstance();
+    AdminUtil.FateStatus withLocks = null;
+    List<AdminUtil.TransactionStatus> noLocks = null;
+
+    int maxRetries = 3;
+
     AdminUtil<String> admin = new AdminUtil<>(false);
 
-    try {
+    while (maxRetries > 0) {
 
-      String tableId = Tables.getTableId(instance, tableName);
+      try {
 
-      log.trace("tid: {}", tableId);
+        IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(
+            instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret);
 
-      IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(
-          instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret);
-      ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instance) + Constants.ZFATE, zk);
+        ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instance) + Constants.ZFATE, zk);
 
-      AdminUtil.FateStatus withLocks = admin.getStatus(zs, zk,
-          ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS + "/" + tableId, null, null);
+        withLocks = admin.getStatus(zs, zk,
+            ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS + "/" + tableId, null, null);
 
-      // call method that does not use locks.
-      List<AdminUtil.TransactionStatus> noLocks = admin.getTransactionStatus(zs, null, null);
+        // call method that does not use locks.
+        noLocks = admin.getTransactionStatus(zs, null, null);
 
-      // fast check - count number of transactions
-      assertEquals(withLocks.getTransactions().size(), noLocks.size());
+        // no zk exception, no need to retry
+        break;
 
-      int matchCount = 0;
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        fail("Interrupt received - test failed");
+        return;
+      } catch (KeeperException ex) {
+        maxRetries--;
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException intr_ex) {
+          Thread.currentThread().interrupt();
+          return;
+        }
+      }
+    }
 
-      for (AdminUtil.TransactionStatus tx : withLocks.getTransactions()) {
+    assertNotNull(withLocks);
+    assertNotNull(noLocks);
 
-        log.trace("Fate id: {}, status: {}", tx.getTxid(), tx.getStatus());
+    // fast check - count number of transactions
+    assertEquals(withLocks.getTransactions().size(), noLocks.size());
+
+    int matchCount = 0;
 
-        if (tx.getTop().contains("CompactionDriver") && tx.getDebug().contains("CompactRange")) {
+    for (AdminUtil.TransactionStatus tx : withLocks.getTransactions()) {
 
-          for (AdminUtil.TransactionStatus tx2 : noLocks) {
-            if (tx2.getTxid().equals(tx.getTxid())) {
-              matchCount++;
-            }
+      if (isCompaction(tx)) {
+
+        log.trace("Fate id: {}, status: {}", tx.getTxid(), tx.getStatus());
+
+        for (AdminUtil.TransactionStatus tx2 : noLocks) {
+          if (tx2.getTxid().equals(tx.getTxid())) {
+            matchCount++;
           }
         }
       }
+    }
 
-      assertTrue("Number of fates matches should be > 0", matchCount > 0);
+    assertTrue("Number of fates matches should be > 0", matchCount > 0);
 
-    } catch (KeeperException | TableNotFoundException | InterruptedException ex) {
-      throw new IllegalStateException(ex);
-    }
+    try {
 
-    // test complete, cancel compaction and move on.
-    connector.tableOperations().cancelCompaction(tableName);
+      // test complete, cancel compaction and move on.
+      connector.tableOperations().cancelCompaction(tableName);
 
-    // block if compaction still running
-    compactTask.get();
+      // block if compaction still running
+      compactTask.get();
 
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    } catch (TableNotFoundException | AccumuloSecurityException | AccumuloException
+        | ExecutionException ex) {
+      log.debug("Could not cancel compaction", ex);
+    }
   }
 
   /**
@@ -340,11 +383,8 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
 
       for (AdminUtil.TransactionStatus tx : fateStatus.getTransactions()) {
 
-        log.trace("Fate id: {}, status: {}", tx.getTxid(), tx.getStatus());
-
-        if (tx.getTop().contains("CompactionDriver") && tx.getDebug().contains("CompactRange")) {
+        if (isCompaction(tx))
           return true;
-        }
       }
 
     } catch (KeeperException | TableNotFoundException | InterruptedException ex) {
@@ -356,6 +396,31 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
   }
 
   /**
+   * Test that the transaction top contains "CompactionDriver" and the debug message contains
+   * "CompactRange"
+   *
+   * @param tx
+   *          transaction status
+   * @return true if tx top and debug have compaction messages.
+   */
+  private boolean isCompaction(AdminUtil.TransactionStatus tx) {
+
+    if (tx == null) {
+      log.trace("Fate tx is null");
+      return false;
+    }
+
+    log.trace("Fate id: {}, status: {}", tx.getTxid(), tx.getStatus());
+
+    String top = tx.getTop();
+    String debug = tx.getDebug();
+
+    return top != null && debug != null && top.contains("CompactionDriver")
+        && tx.getDebug().contains("CompactRange");
+
+  }
+
+  /**
    * Returns the current table state (ONLINE, OFFLINE,...) of named table.
    *
    * @param tableName
@@ -400,19 +465,11 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
 
       long startTimestamp = System.nanoTime();
 
-      Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
-      int count = 0;
-      for (Map.Entry<Key,Value> elt : scanner) {
-        String expected = String.format("%05d", count);
-        assert (elt.getKey().getRow().toString().equals(expected));
-        count++;
-      }
+      int count = scanCount(tableName);
 
       log.trace("Scan time for {} rows {} ms", NUM_ROWS, TimeUnit.MILLISECONDS
           .convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
 
-      scanner.close();
-
       if (count != NUM_ROWS) {
         throw new IllegalStateException(
             String.format("Number of rows %1$d does not match expected %2$d", count, NUM_ROWS));
@@ -423,6 +480,21 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
     }
   }
 
+  private int scanCount(String tableName) throws TableNotFoundException {
+
+    Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+    int count = 0;
+    for (Map.Entry<Key,Value> elt : scanner) {
+      String expected = String.format("%05d", count);
+      assert (elt.getKey().getRow().toString().equals(expected));
+      count++;
+    }
+
+    scanner.close();
+
+    return count;
+  }
+
   /**
    * Provides timing information for online operation.
    */
@@ -533,14 +605,7 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
 
         // validate expected data created and exists in table.
 
-        Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
-
-        int count = 0;
-        for (Map.Entry<Key,Value> elt : scanner) {
-          String expected = String.format("%05d", count);
-          assert (elt.getKey().getRow().toString().equals(expected));
-          count++;
-        }
+        int count = scanCount(tableName);
 
         log.trace("After compaction, scan time for {} rows {} ms", NUM_ROWS, TimeUnit.MILLISECONDS
             .convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));