You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2019/11/07 22:30:40 UTC
[accumulo] branch 1.9 updated: Fix #1308 - Refactor fate
concurrency IT (#1414)
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1.9 by this push:
new 2b8243b Fix #1308 - Refactor fate concurrency IT (#1414)
2b8243b is described below
commit 2b8243b72eace4e1fcba0c77077981a84401b8ea
Author: EdColeman <de...@etcoleman.com>
AuthorDate: Thu Nov 7 17:30:32 2019 -0500
Fix #1308 - Refactor fate concurrency IT (#1414)
fix error that caused test failure.
---
.../test/functional/FateConcurrencyIT.java | 316 +++++--------------
.../org/apache/accumulo/test/util/SlowOps.java | 347 +++++++++++++++++++++
2 files changed, 419 insertions(+), 244 deletions(-)
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
index 863e3c4..c3a4d79 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
@@ -16,7 +16,6 @@
*/
package org.apache.accumulo.test.functional;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -24,9 +23,7 @@ import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -35,28 +32,19 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.AdminUtil;
import org.apache.accumulo.fate.ZooStore;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
-import org.apache.hadoop.io.Text;
+import org.apache.accumulo.test.util.SlowOps;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.Before;
@@ -90,10 +78,9 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
private String secret;
- // Test development only. When true, multiple tables, multiple compactions will be
- // used during the test run which simulates transient condition that was causing
- // the test to fail..
- private boolean runMultipleCompactions = false;
+ private long maxWait;
+
+ private SlowOps slowOps;
@Before
public void setup() {
@@ -104,7 +91,9 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET);
- createData(tableName);
+ maxWait = defaultTimeoutSeconds() <= 0 ? 60_000 : ((defaultTimeoutSeconds() * 1000) / 2);
+
+ slowOps = new SlowOps(connector, tableName, maxWait, 1);
}
@AfterClass
@@ -161,8 +150,7 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
// launch a full table compaction with the slow iterator to ensure table lock is acquired and
// held by the compaction
-
- Future<?> compactTask = startCompactTask();
+ slowOps.startCompactTask();
// try to set online while fate transaction is in progress - before ACCUMULO-4574 this would
// block
@@ -178,8 +166,7 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
assertEquals("verify table is still online", TableState.ONLINE, getTableState(tableName));
- assertTrue("verify compaction still running and fate transaction still exists",
- blockUntilCompactionRunning(tableName));
+ assertTrue("Find FATE operation for table", findFate(tableName));
// test complete, cancel compaction and move on.
connector.tableOperations().cancelCompaction(tableName);
@@ -193,8 +180,31 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
TimeUnit.MILLISECONDS.convert(timing3.runningTime(), TimeUnit.NANOSECONDS));
// block if compaction still running
- compactTask.get();
+ slowOps.blockWhileCompactionRunning();
+
+ }
+
+ private boolean findFate(String aTableName) {
+ for (int retry = 0; retry < 5; retry++) {
+
+ try {
+ boolean found = lookupFateInZookeeper(aTableName);
+ log.trace("Try {}: Fate in zk for table {} : {}", retry, aTableName, found);
+ if (found) {
+ log.trace("found for {}", aTableName);
+ return true;
+ } else {
+ Thread.sleep(150);
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ return false;
+ } catch (Exception ex) {
+ log.debug("Find fate failed for table name {} with exception, will retry", aTableName, ex);
+ }
+ }
+ return false;
}
/**
@@ -208,12 +218,6 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
Instance instance = connector.getInstance();
String tableId;
- // for development testing - force transient condition that was failing this test so that
- // we know if multiple compactions are running, they are properly handled by the test code.
- if (runMultipleCompactions) {
- runMultipleCompactions();
- }
-
try {
assertEquals("verify table online after created", TableState.ONLINE,
@@ -228,7 +232,7 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
String.format("Table %s does not exist, failing test", tableName));
}
- Future<?> compactTask = startCompactTask();
+ slowOps.startCompactTask();
AdminUtil.FateStatus withLocks = null;
List<AdminUtil.TransactionStatus> noLocks = null;
@@ -300,114 +304,19 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
connector.tableOperations().cancelCompaction(tableName);
// block if compaction still running
- compactTask.get();
-
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- } catch (TableNotFoundException | AccumuloSecurityException | AccumuloException
- | ExecutionException ex) {
- log.debug("Could not cancel compaction", ex);
- }
- }
-
- /**
- * This method was helpful for debugging a condition that was causing transient test failures.
- * This forces a condition that the test should be able to handle. This method is not needed
- * during normal testing, it was kept to aid future test development / troubleshooting if other
- * transient failures occur.
- */
- private void runMultipleCompactions() {
-
- for (int i = 0; i < 4; i++) {
-
- String aTableName = getUniqueNames(1)[0] + "_" + i;
-
- createData(aTableName);
-
- log.debug("Table: {}", aTableName);
-
- pool.submit(new SlowCompactionRunner(aTableName));
-
- assertTrue("verify that compaction running and fate transaction exists",
- blockUntilCompactionRunning(aTableName));
+ boolean cancelled = slowOps.blockWhileCompactionRunning();
+ log.debug("Cancel completed successfully: {}", cancelled);
+ } catch (TableNotFoundException | AccumuloSecurityException | AccumuloException ex) {
+ log.debug("Could not cancel compaction due to exception", ex);
}
}
/**
- * Create and run a slow running compaction task. The method will block until the compaction has
- * been started.
- *
- * @return a reference to the running compaction task.
- */
- private Future<?> startCompactTask() {
- Future<?> compactTask = pool.submit(new SlowCompactionRunner(tableName));
- assertTrue("verify that compaction running and fate transaction exists",
- blockUntilCompactionRunning(tableName));
- return compactTask;
- }
-
- /**
- * Blocks current thread until compaction is running.
- *
- * @return true if compaction and associate fate found.
- */
- private boolean blockUntilCompactionRunning(final String tableName) {
-
- long maxWait = defaultTimeoutSeconds() <= 0 ? 60_000 : ((defaultTimeoutSeconds() * 1000) / 2);
-
- long startWait = System.currentTimeMillis();
-
- List<String> tservers = connector.instanceOperations().getTabletServers();
-
- /*
- * wait for compaction to start on table - The compaction will acquire a fate transaction lock
- * that used to block a subsequent online command while the fate transaction lock was held.
- */
- while (System.currentTimeMillis() < (startWait + maxWait)) {
-
- try {
-
- int runningCompactions = 0;
-
- for (String tserver : tservers) {
- runningCompactions += connector.instanceOperations().getActiveCompactions(tserver).size();
- log.trace("tserver {}, running compactions {}", tservers, runningCompactions);
- }
-
- if (runningCompactions > 0) {
- // Validate that there is a compaction fate transaction - otherwise test is invalid.
- if (findFate(tableName)) {
- return true;
- }
- }
-
- } catch (AccumuloSecurityException | AccumuloException ex) {
- throw new IllegalStateException("failed to get active compactions, test fails.", ex);
- } catch (KeeperException ex) {
- log.trace("Saw possible transient zookeeper error");
- }
-
- try {
- Thread.sleep(250);
- } catch (InterruptedException ex) {
- // reassert interrupt
- Thread.currentThread().interrupt();
- }
- }
-
- log.debug("Could not find compaction for {} after {} seconds", tableName,
- TimeUnit.MILLISECONDS.toSeconds(maxWait));
-
- return false;
-
- }
-
- /**
* Checks fates in zookeeper looking for transaction associated with a compaction as a double
* check that the test will be valid because the running compaction does have a fate transaction
* lock.
- *
+ * <p>
* This method throws can throw either IllegalStateException (failed) or a Zookeeper exception.
* Throwing the Zookeeper exception allows for retries if desired to handle transient zookeeper
* issues.
@@ -418,7 +327,7 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
* @throws KeeperException
* if a zookeeper error occurred - allows for retries.
*/
- private boolean findFate(final String tableName) throws KeeperException {
+ private boolean lookupFateInZookeeper(final String tableName) throws KeeperException {
Instance instance = connector.getInstance();
AdminUtil<String> admin = new AdminUtil<>(false);
@@ -497,61 +406,6 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
}
/**
- * Create the provided table and populate with some data using a batch writer. The table is
- * scanned to ensure it was populated as expected.
- *
- * @param tableName
- * the name of the table
- */
- private void createData(final String tableName) {
-
- try {
-
- // create table.
- connector.tableOperations().create(tableName);
- BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
-
- // populate
- for (int i = 0; i < NUM_ROWS; i++) {
- Mutation m = new Mutation(new Text(String.format("%05d", i)));
- m.put(new Text("col" + ((i % 3) + 1)), new Text("qual"), new Value("junk".getBytes(UTF_8)));
- bw.addMutation(m);
- }
- bw.close();
-
- long startTimestamp = System.nanoTime();
-
- int count = scanCount(tableName);
-
- log.trace("Scan time for {} rows {} ms", NUM_ROWS, TimeUnit.MILLISECONDS
- .convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
-
- if (count != NUM_ROWS) {
- throw new IllegalStateException(
- String.format("Number of rows %1$d does not match expected %2$d", count, NUM_ROWS));
- }
- } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException
- | TableExistsException ex) {
- throw new IllegalStateException("Create data failed with exception", ex);
- }
- }
-
- private int scanCount(String tableName) throws TableNotFoundException {
-
- Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
- int count = 0;
- for (Map.Entry<Key,Value> elt : scanner) {
- String expected = String.format("%05d", count);
- assert (elt.getKey().getRow().toString().equals(expected));
- count++;
- }
-
- scanner.close();
-
- return count;
- }
-
- /**
* Provides timing information for online operation.
*/
private static class OnlineOpTiming {
@@ -614,74 +468,48 @@ public class FateConcurrencyIT extends AccumuloClusterHarness {
}
/**
- * Instance to create / run a compaction using a slow iterator.
+ * Concurrency testing - ensure that tests are valid id multiple compactions are running. for
+ * development testing - force transient condition that was failing this test so that we know if
+ * multiple compactions are running, they are properly handled by the test code and the tests are
+ * valid.
*/
- private class SlowCompactionRunner implements Runnable {
-
- private final String tableName;
+ @Test
+ public void multipleCompactions() {
- /**
- * Create an instance of this class.
- *
- * @param tableName
- * the name of the table that will be compacted with the slow iterator.
- */
- SlowCompactionRunner(final String tableName) {
- this.tableName = tableName;
- }
+ int tableCount = 4;
- @Override
- public void run() {
+ List<SlowOps> tables = new ArrayList<>();
- long startTimestamp = System.nanoTime();
+ for (int i = 0; i < tableCount; i++) {
+ String uniqueName = getUniqueNames(1)[0] + "_" + i;
+ SlowOps gen = new SlowOps(connector, uniqueName, maxWait, tableCount);
+ tables.add(gen);
+ gen.startCompactTask();
+ }
- IteratorSetting slow = new IteratorSetting(30, "slow", SlowIterator.class);
- SlowIterator.setSleepTime(slow, SLOW_SCAN_SLEEP_MS);
+ int foundCount = 0;
- List<IteratorSetting> compactIterators = new ArrayList<>();
- compactIterators.add(slow);
+ for (SlowOps t : tables) {
+ log.debug("Look for fate {}", t.getTableName());
+ if (findFate(t.getTableName())) {
+ log.debug("Found fate {}", t.getTableName());
+ foundCount++;
+ }
+ }
- log.trace("Slow iterator {}", slow.toString());
+ assertEquals(tableCount, foundCount);
+ for (SlowOps t : tables) {
try {
-
- log.trace("Start compaction");
-
- connector.tableOperations().compact(tableName, new Text("0"), new Text("z"),
- compactIterators, true, true);
-
- log.trace("Compaction wait is complete");
-
- log.trace("Slow compaction of {} rows took {} ms", NUM_ROWS, TimeUnit.MILLISECONDS
- .convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
-
- // validate that number of rows matches expected.
-
- startTimestamp = System.nanoTime();
-
- // validate expected data created and exists in table.
-
- int count = scanCount(tableName);
-
- log.trace("After compaction, scan time for {} rows {} ms", NUM_ROWS, TimeUnit.MILLISECONDS
- .convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
-
- if (count != NUM_ROWS) {
- throw new IllegalStateException(
- String.format("After compaction, number of rows %1$d does not match expected %2$d",
- count, NUM_ROWS));
- }
-
- } catch (TableNotFoundException ex) {
- throw new IllegalStateException("test failed, table " + tableName + " does not exist", ex);
- } catch (AccumuloSecurityException ex) {
- throw new IllegalStateException(
- "test failed, could not add iterator due to security exception", ex);
- } catch (AccumuloException ex) {
- // test cancels compaction on complete, so ignore it as an exception.
- if (!ex.getMessage().contains("Compaction canceled")) {
- throw new IllegalStateException("test failed with an Accumulo exception", ex);
+ connector.tableOperations().cancelCompaction(t.getTableName());
+ // block if compaction still running
+ boolean cancelled = t.blockWhileCompactionRunning();
+ if (!cancelled) {
+ log.info("Failed to cancel compaction during multiple compaction test clean-up for {}",
+ t.getTableName());
}
+ } catch (AccumuloSecurityException | TableNotFoundException | AccumuloException ex) {
+ log.debug("Exception throw during multiple table test clean-up", ex);
}
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
new file mode 100644
index 0000000..bd51990
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.ActiveCompaction;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Common methods for performing operations that are deliberately take some period of time so that
+ * tests can interact while the operations are in progress.
+ */
+public class SlowOps {
+
+ private static final Logger log = LoggerFactory.getLogger(SlowOps.class);
+
+ private static final String TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX =
+ "tserver.compaction.major.concurrent.max";
+
+ private static final long SLOW_SCAN_SLEEP_MS = 250L;
+ private static final int NUM_DATA_ROWS = 1000;
+
+ private final Connector connector;
+ private final String tableName;
+ private final long maxWait;
+
+ // private final int numRows = DEFAULT_NUM_DATA_ROWS;
+
+ private static final ExecutorService pool = Executors.newCachedThreadPool();
+
+ private Future<?> compactTask = null;
+
+ private SlowOps(final Connector connector, final String tableName, final long maxWait) {
+
+ this.connector = connector;
+ this.tableName = tableName;
+ this.maxWait = maxWait;
+
+ createData();
+ }
+
+ public SlowOps(final Connector connector, final String tableName, final long maxWait,
+ final int numParallelExpected) {
+
+ this(connector, tableName, maxWait);
+
+ setExpectedCompactions(numParallelExpected);
+
+ }
+
+ public void setExpectedCompactions(final int numParallelExpected) {
+
+ final int target = numParallelExpected + 1;
+
+ Map<String,String> sysConfig;
+
+ try {
+
+ sysConfig = connector.instanceOperations().getSystemConfiguration();
+
+ int current = Integer.parseInt(sysConfig.get("tserver.compaction.major.concurrent.max"));
+
+ if (current < target) {
+ connector.instanceOperations().setProperty(TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX,
+ Integer.toString(target));
+
+ sysConfig = connector.instanceOperations().getSystemConfiguration();
+
+ }
+
+ Integer.parseInt(sysConfig.get(TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX));
+
+ } catch (AccumuloException | AccumuloSecurityException | NumberFormatException ex) {
+ throw new IllegalStateException("Could not set parallel compaction limit to " + target, ex);
+ }
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ private void createData() {
+
+ try {
+
+ // create table.
+ connector.tableOperations().create(tableName);
+
+ log.info("Created table id: {}, name \'{}\'",
+ connector.tableOperations().tableIdMap().get(tableName), tableName);
+
+ try (BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig())) {
+ // populate
+ for (int i = 0; i < NUM_DATA_ROWS; i++) {
+ Mutation m = new Mutation(new Text(String.format("%05d", i)));
+ m.put(new Text("col" + ((i % 3) + 1)), new Text("qual"),
+ new Value("junk".getBytes(UTF_8)));
+ bw.addMutation(m);
+ }
+ }
+
+ verifyRows();
+
+ } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException
+ | TableExistsException ex) {
+ throw new IllegalStateException("Create data failed with exception", ex);
+ }
+ }
+
+ private void verifyRows() {
+
+ long startTimestamp = System.nanoTime();
+
+ int count = scanCount();
+
+ log.trace("Scan time for {} rows {} ms", NUM_DATA_ROWS,
+ TimeUnit.MILLISECONDS.convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
+
+ if (count != NUM_DATA_ROWS) {
+ throw new IllegalStateException(
+ String.format("Number of rows %1$d does not match expected %2$d", count, NUM_DATA_ROWS));
+ }
+ }
+
+ private int scanCount() {
+ try (Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY)) {
+
+ int count = 0;
+
+ for (Map.Entry<Key,Value> elt : scanner) {
+ String expected = String.format("%05d", count);
+ assert (elt.getKey().getRow().toString().equals(expected));
+ count++;
+ }
+ return count;
+ } catch (TableNotFoundException ex) {
+ log.debug("cannot verify row count, table \'{}\' does not exist", tableName);
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ /**
+ * Create and run a slow running compaction task. The method will block until the compaction has
+ * been started. The compaction should be cancelled using Accumulo tableOps, and then the caller
+ * can use blockWhileCompactionRunning() on the instance of this class.
+ */
+ public void startCompactTask() {
+
+ compactTask = pool.submit(new SlowCompactionRunner());
+
+ if (!blockUntilCompactionRunning()) {
+ throw new IllegalStateException("Compaction could not be started for " + tableName);
+ }
+ }
+
+ /**
+ * Instance to create / run a compaction using a slow iterator.
+ */
+ private class SlowCompactionRunner implements Runnable {
+
+ SlowCompactionRunner() {}
+
+ @Override
+ public void run() {
+
+ long startTimestamp = System.nanoTime();
+
+ IteratorSetting slow = new IteratorSetting(30, "slow", SlowIterator.class);
+ SlowIterator.setSleepTime(slow, SLOW_SCAN_SLEEP_MS);
+
+ List<IteratorSetting> compactIterators = new ArrayList<>();
+ compactIterators.add(slow);
+
+ log.trace("Starting slow operation using iterator: {}", slow);
+
+ int retry = 0;
+ boolean completed = false;
+
+ while (!completed && retry++ < 5) {
+
+ try {
+ log.info("Starting compaction. Attempt {}", retry);
+ connector.tableOperations().compact(tableName, null, null, compactIterators, true, true);
+ completed = true;
+ } catch (Throwable ex) {
+ // test cancels compaction on complete, so ignore it as an exception.
+ if (ex.getMessage().contains("Compaction canceled")) {
+ return;
+ }
+ log.info("Exception thrown while waiting for compaction - will retry", ex);
+ try {
+ Thread.sleep(10_000 * retry);
+ } catch (InterruptedException iex) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+ log.debug("Compaction wait is complete");
+
+ log.trace("Slow compaction of {} rows took {} ms", NUM_DATA_ROWS, TimeUnit.MILLISECONDS
+ .convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
+
+ // validate that number of rows matches expected.
+
+ startTimestamp = System.nanoTime();
+
+ // validate expected data created and exists in table.
+
+ int count = scanCount();
+
+ log.trace("After compaction, scan time for {} rows {} ms", NUM_DATA_ROWS,
+ TimeUnit.MILLISECONDS.convert((System.nanoTime() - startTimestamp),
+ TimeUnit.NANOSECONDS));
+
+ if (count != NUM_DATA_ROWS) {
+ throw new IllegalStateException(
+ String.format("After compaction, number of rows %1$d does not match expected %2$d",
+ count, NUM_DATA_ROWS));
+ }
+ }
+ }
+
+ /**
+ * Blocks current thread until compaction is running.
+ *
+ * @return true if compaction and associate fate found.
+ */
+ private boolean blockUntilCompactionRunning() {
+
+ long startWait = System.currentTimeMillis();
+
+ List<String> tservers = connector.instanceOperations().getTabletServers();
+
+ /*
+ * wait for compaction to start on table - The compaction will acquire a fate transaction lock
+ * that used to block a subsequent online command while the fate transaction lock was held.
+ */
+ while (System.currentTimeMillis() < (startWait + maxWait)) {
+
+ try {
+
+ List<ActiveCompaction> activeCompactions = new ArrayList<>();
+
+ for (String tserver : tservers) {
+ List<ActiveCompaction> ac = connector.instanceOperations().getActiveCompactions(tserver);
+ activeCompactions.addAll(ac);
+ // runningCompactions += ac.size();
+ log.trace("tserver {}, running compactions {}", tservers, ac.size());
+ }
+
+ if (!activeCompactions.isEmpty()) {
+ try {
+ for (ActiveCompaction compaction : activeCompactions) {
+ log.debug("Compaction running for {}", compaction.getTable());
+ if (compaction.getTable().compareTo(tableName) == 0) {
+ return true;
+ }
+ }
+ } catch (TableNotFoundException ex) {
+ log.trace("Compaction found for unknown table {}", activeCompactions);
+ }
+ }
+ } catch (AccumuloSecurityException | AccumuloException ex) {
+ throw new IllegalStateException("failed to get active compactions, test fails.", ex);
+ }
+
+ try {
+ Thread.sleep(3_000);
+ } catch (InterruptedException ex) {
+ // reassert interrupt
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ log.debug("Could not find compaction for {} after {} seconds", tableName,
+ TimeUnit.MILLISECONDS.toSeconds(maxWait));
+
+ return false;
+
+ }
+
+ /**
+ * Will block as long as the underlying compaction task is running. This method is intended to be
+ * used when the the compaction is cancelled via table operation cancel method - when the cancel
+ * command completed, the running task will terminate and then this method will return.
+ *
+ * @return true if the task returned.
+ */
+ public boolean blockWhileCompactionRunning() {
+
+ try {
+ if (compactTask == null) {
+ throw new IllegalStateException(
+ "Compaction task has not been started - call startCompactionTask() before blocking");
+ }
+ compactTask.get();
+ return true;
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ return false;
+ } catch (ExecutionException ex) {
+ return false;
+ }
+ }
+
+}