You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2019/04/05 23:53:57 UTC
[accumulo] branch 1.9 updated: Improve FateConcurrencyIT test
(#1061)
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1.9 by this push:
new b52c165 Improve FateConcurrencyIT test (#1061)
b52c165 is described below
commit b52c16576e45c59190f8334301ccb120e7c627d5
Author: EdColeman <de...@etcoleman.com>
AuthorDate: Fri Apr 5 19:53:53 2019 -0400
Improve FateConcurrencyIT test (#1061)
- Removes NPE that sometimes caused the test to fail during release testing.
---
.../test/functional/FateConcurrencyIT.java | 175 ++++++++++++++-------
1 file changed, 120 insertions(+), 55 deletions(-)
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
index 56d51ad..dc1c225 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
@@ -18,12 +18,15 @@ package org.apache.accumulo.test.functional;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -194,69 +197,109 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
* Validate the the AdminUtil.getStatus works correctly after refactor and validate that
* getTransactionStatus can be called without lock map(s). The test starts a long running fate
* transaction (slow compaction) and the calls AdminUtil functions to get the FATE.
- *
- * @throws Exception
- * any exception is a test failure
*/
@Test
- public void getFateStatus() throws Exception {
+ public void getFateStatus() {
- assertEquals("verify table online after created", TableState.ONLINE, getTableState(tableName));
+ Instance instance = connector.getInstance();
+ String tableId;
+
+ try {
+
+ assertEquals("verify table online after created", TableState.ONLINE,
+ getTableState(tableName));
+
+ tableId = Tables.getTableId(instance, tableName);
+
+ log.trace("tid: {}", tableId);
+
+ } catch (TableNotFoundException ex) {
+ throw new IllegalStateException(
+ String.format("Table %s does not exist, failing test", tableName));
+ }
Future<?> compactTask = startCompactTask();
assertTrue("compaction fate transaction exits", findFate(tableName));
- Instance instance = connector.getInstance();
+ AdminUtil.FateStatus withLocks = null;
+ List<AdminUtil.TransactionStatus> noLocks = null;
+
+ int maxRetries = 3;
+
AdminUtil<String> admin = new AdminUtil<>(false);
- try {
+ while (maxRetries > 0) {
- String tableId = Tables.getTableId(instance, tableName);
+ try {
- log.trace("tid: {}", tableId);
+ IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(
+ instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret);
- IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(
- instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret);
- ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instance) + Constants.ZFATE, zk);
+ ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instance) + Constants.ZFATE, zk);
- AdminUtil.FateStatus withLocks = admin.getStatus(zs, zk,
- ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS + "/" + tableId, null, null);
+ withLocks = admin.getStatus(zs, zk,
+ ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS + "/" + tableId, null, null);
- // call method that does not use locks.
- List<AdminUtil.TransactionStatus> noLocks = admin.getTransactionStatus(zs, null, null);
+ // call method that does not use locks.
+ noLocks = admin.getTransactionStatus(zs, null, null);
- // fast check - count number of transactions
- assertEquals(withLocks.getTransactions().size(), noLocks.size());
+ // no zk exception, no need to retry
+ break;
- int matchCount = 0;
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ fail("Interrupt received - test failed");
+ return;
+ } catch (KeeperException ex) {
+ maxRetries--;
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException intr_ex) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
- for (AdminUtil.TransactionStatus tx : withLocks.getTransactions()) {
+ assertNotNull(withLocks);
+ assertNotNull(noLocks);
- log.trace("Fate id: {}, status: {}", tx.getTxid(), tx.getStatus());
+ // fast check - count number of transactions
+ assertEquals(withLocks.getTransactions().size(), noLocks.size());
+
+ int matchCount = 0;
- if (tx.getTop().contains("CompactionDriver") && tx.getDebug().contains("CompactRange")) {
+ for (AdminUtil.TransactionStatus tx : withLocks.getTransactions()) {
- for (AdminUtil.TransactionStatus tx2 : noLocks) {
- if (tx2.getTxid().equals(tx.getTxid())) {
- matchCount++;
- }
+ if (isCompaction(tx)) {
+
+ log.trace("Fate id: {}, status: {}", tx.getTxid(), tx.getStatus());
+
+ for (AdminUtil.TransactionStatus tx2 : noLocks) {
+ if (tx2.getTxid().equals(tx.getTxid())) {
+ matchCount++;
}
}
}
+ }
- assertTrue("Number of fates matches should be > 0", matchCount > 0);
+ assertTrue("Number of fates matches should be > 0", matchCount > 0);
- } catch (KeeperException | TableNotFoundException | InterruptedException ex) {
- throw new IllegalStateException(ex);
- }
+ try {
- // test complete, cancel compaction and move on.
- connector.tableOperations().cancelCompaction(tableName);
+ // test complete, cancel compaction and move on.
+ connector.tableOperations().cancelCompaction(tableName);
- // block if compaction still running
- compactTask.get();
+ // block if compaction still running
+ compactTask.get();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ } catch (TableNotFoundException | AccumuloSecurityException | AccumuloException
+ | ExecutionException ex) {
+ log.debug("Could not cancel compaction", ex);
+ }
}
/**
@@ -340,11 +383,8 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
for (AdminUtil.TransactionStatus tx : fateStatus.getTransactions()) {
- log.trace("Fate id: {}, status: {}", tx.getTxid(), tx.getStatus());
-
- if (tx.getTop().contains("CompactionDriver") && tx.getDebug().contains("CompactRange")) {
+ if (isCompaction(tx))
return true;
- }
}
} catch (KeeperException | TableNotFoundException | InterruptedException ex) {
@@ -356,6 +396,31 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
}
/**
+ * Test that the transaction top contains "CompactionDriver" and the debug message contains
+ * "CompactRange"
+ *
+ * @param tx
+ * transaction status
+ * @return true if tx top and debug have compaction messages.
+ */
+ private boolean isCompaction(AdminUtil.TransactionStatus tx) {
+
+ if (tx == null) {
+ log.trace("Fate tx is null");
+ return false;
+ }
+
+ log.trace("Fate id: {}, status: {}", tx.getTxid(), tx.getStatus());
+
+ String top = tx.getTop();
+ String debug = tx.getDebug();
+
+ return top != null && debug != null && top.contains("CompactionDriver")
+ && tx.getDebug().contains("CompactRange");
+
+ }
+
+ /**
* Returns the current table state (ONLINE, OFFLINE,...) of named table.
*
* @param tableName
@@ -400,19 +465,11 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
long startTimestamp = System.nanoTime();
- Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
- int count = 0;
- for (Map.Entry<Key,Value> elt : scanner) {
- String expected = String.format("%05d", count);
- assert (elt.getKey().getRow().toString().equals(expected));
- count++;
- }
+ int count = scanCount(tableName);
log.trace("Scan time for {} rows {} ms", NUM_ROWS, TimeUnit.MILLISECONDS
.convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
- scanner.close();
-
if (count != NUM_ROWS) {
throw new IllegalStateException(
String.format("Number of rows %1$d does not match expected %2$d", count, NUM_ROWS));
@@ -423,6 +480,21 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
}
}
+ private int scanCount(String tableName) throws TableNotFoundException {
+
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ int count = 0;
+ for (Map.Entry<Key,Value> elt : scanner) {
+ String expected = String.format("%05d", count);
+ assert (elt.getKey().getRow().toString().equals(expected));
+ count++;
+ }
+
+ scanner.close();
+
+ return count;
+ }
+
/**
* Provides timing information for online operation.
*/
@@ -533,14 +605,7 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
// validate expected data created and exists in table.
- Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
-
- int count = 0;
- for (Map.Entry<Key,Value> elt : scanner) {
- String expected = String.format("%05d", count);
- assert (elt.getKey().getRow().toString().equals(expected));
- count++;
- }
+ int count = scanCount(tableName);
log.trace("After compaction, scan time for {} rows {} ms", NUM_ROWS, TimeUnit.MILLISECONDS
.convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));