You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/02/06 19:50:57 UTC

accumulo git commit: ACCUMULO-4574 Modified TableOperations online to check if table is already online before executing fate transaction.

Repository: accumulo
Updated Branches:
  refs/heads/1.7 33712bb78 -> 15ae69c32


ACCUMULO-4574 Modified TableOperations online to check if table is already online before executing fate transaction.

When the online command is issued, the fate operation will block if a fate
transaction has locked the table. If the table is already online, there is no
reason to block and then issue the online operation. This modification turns
the online command into a noop if the table is already online. This change
includes an IT test that uses a compaction with slow iterator to cause the fate
transaction to lock a table and then runs the online command - checking that
the operation did not block.

Squashed commit of the following:

Note from kturner :In addition to squasing the following commits, I also
organized the imports on TableOperationsImpl.  This is a changes that not
present in the commits mentioned below.

commit 6d8a0502f3d028f6489512764a3ebc9e8a862d99
Author: Ed Coleman <de...@etcoleman.com>
Date:   Sat Feb 4 12:15:23 2017 -0500

    ACCUMULO-4574 Refactored Tables.getTableState() to optionally clear cache and TableOperations.online() to use added method.

    This updates the pull request with review comments to create a getTableState method that can optionally clear the
    zookeeper cache.

commit 03a8dbc82404c2ff34c166b522ddab3e922bc6ce
Merge: 0d0e103 33712bb
Author: Ed Coleman <de...@etcoleman.com>
Date:   Sat Feb 4 08:35:08 2017 -0500

    Merge remote-tracking branch 'upstream/1.7' into ACCUMULO-4574

commit 0d0e103d23b0f37921fc2ed5d03bea0e8de79f0b
Author: Ed Coleman <de...@etcoleman.com>
Date:   Sun Jan 29 03:09:41 2017 -0500

    ACCUMULO-4574 Modify table online operation to check for online state before executing fate operation

    This commit updates the pull request to incorporate review comments. It also contains the changes
    from Keith Turner in pull request 209 - ACCUMULO-4578 that provides refactored AdminUtil.FateStatus class.
    - modified test to use AdminUtil.FateStatus class.
    - added method to clear zoocache by path to Tables.
    - added cache clear for table state to online check.

commit 7588c90dc5ec755ce167f1e2877081ad3e98ecd8
Merge: 1a20212 db84650
Author: Ed Coleman <de...@etcoleman.com>
Date:   Sat Jan 28 15:25:01 2017 -0500

    Merge remote-tracking branch 'keith-turner/ACCUMULO-4578' into ACCUMULO-4574

commit 1a20212065a767b3edf428911fd9f184393deb5d
Merge: 32c13d5 0385bd7
Author: Ed Coleman <de...@etcoleman.com>
Date:   Sat Jan 28 14:37:04 2017 -0500

    Merge branch 'ACCUMULO-4574' of github.com:EdColeman/accumulo into ACCUMULO-4574

commit 32c13d59e6585fe3e8735f5d586593e801c23f27
Author: Ed Coleman <de...@etcoleman.com>
Date:   Tue Jan 24 00:14:32 2017 -0500

    ACCUMULO-4574 Partial update of online noop test incorporating some pull request comments for additional review.

    - Increased blocked online timeout to up to compaction time.
    - extended AccumuloClusterIT instead of ConfigurableMacIT
    - refactored online op thread to use callable with a future, simplyfing timing and eliminating exception handling.

    Other pull request comments dealing with FATE internals, possible FATE utility and zoo cache require addition
    evaluation and will be included in future commit.

commit 259eedaa18a3f3fff4df8eafb78c1a706b8d558b
Author: Ed Coleman <de...@etcoleman.com>
Date:   Sun Jan 22 20:25:58 2017 -0500

    ACCUMULO-4574 Modified TableOperations online to check if table is already online before executing fate transaction.

    When the online command is issued, the fate operation will block if a fate transaction has locked the table. If
    the table is already online, there is no reason to block and then issue the online operation. This modification
    turns the online command into a noop if the table is already online. This change includes an IT
    test that uses a compaction with slow iterator to cause the fate transaction to lock a table and
    then runs the online command - checking that the operation did not block.

commit 0385bd73ab2fa3d8c949c779623577810f877c35
Author: Ed Coleman <de...@etcoleman.com>
Date:   Tue Jan 24 00:14:32 2017 -0500

    ACCUMULO-4574 Partial update of online noop test incorporating some pull request comments for additional review.

    - Increased blocked online timeout to up to compaction time.
    - extended AccumuloClusterIT instead of ConfigurableMacIT
    - refactored online op thread to use callable with a future, simplyfing timing and eliminating exception handling.

    Other pull request comments dealing with FATE internals, possible FATE utility and zoo cache require addition
    evaluation and will be included in future commit.

commit 058173b59b8e97503a00ec095a9a4235370a9aaa
Author: Ed Coleman <de...@etcoleman.com>
Date:   Sun Jan 22 20:25:58 2017 -0500

    ACCUMULO-4574 Modified TableOperations online to check if table is already online before executing fate transaction.

    When the online command is issued, the fate operation will block if a fate transaction has locked the table. If
    the table is already online, there is no reason to block and then issue the online operation. This modification
    turns the online command into a noop if the table is already online. This change includes an IT
    test that uses a compaction with slow iterator to cause the fate transaction to lock a table and
    then runs the online command - checking that the operation did not block.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/15ae69c3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/15ae69c3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/15ae69c3

Branch: refs/heads/1.7
Commit: 15ae69c32d25142538d8f0ad22308e267495f87b
Parents: 33712bb
Author: Ed Coleman <de...@etcoleman.com>
Authored: Mon Feb 6 13:47:15 2017 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Feb 6 14:03:37 2017 -0500

----------------------------------------------------------------------
 .../core/client/impl/TableOperationsImpl.java   |  11 +
 .../accumulo/core/client/impl/Tables.java       |  45 ++
 .../test/functional/TableChangeStateIT.java     | 439 +++++++++++++++++++
 3 files changed, 495 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/15ae69c3/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
index 1e7b4ec..1a7a38c 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
@@ -1208,7 +1208,18 @@ public class TableOperationsImpl extends TableOperationsHelper {
   @Override
   public void online(String tableName, boolean wait) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
     checkArgument(tableName != null, "tableName is null");
+
     String tableId = Tables.getTableId(context.getInstance(), tableName);
+
+    /**
+     * ACCUMULO-4574 if table is already online return without executing fate operation.
+     */
+
+    TableState expectedState = Tables.getTableState(context.getInstance(), tableId, true);
+    if (expectedState == TableState.ONLINE) {
+      return;
+    }
+
     List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes(UTF_8)));
     Map<String,String> opts = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/15ae69c3/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
index 58ebb72..18971ad 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
@@ -149,6 +149,28 @@ public class Tables {
     getZooCache(instance).clear(ZooUtil.getRoot(instance) + Constants.ZNAMESPACES);
   }
 
+  /**
+   * Clears the zoo cache from instance/root/{PATH}
+   *
+   * @param instance
+   *          The Accumulo Instance
+   * @param zooPath
+   *          A zookeeper path
+   */
+  public static void clearCacheByPath(Instance instance, final String zooPath) {
+
+    String thePath;
+
+    if (zooPath.startsWith("/")) {
+      thePath = zooPath;
+    } else {
+      thePath = "/" + zooPath;
+    }
+
+    getZooCache(instance).clear(ZooUtil.getRoot(instance) + thePath);
+
+  }
+
   public static String getPrintableTableNameFromId(Map<String,String> tidToNameMap, String tableId) {
     String tableName = tidToNameMap.get(tableId);
     return tableName == null ? "(ID:" + tableId + ")" : tableName;
@@ -175,13 +197,36 @@ public class Tables {
   }
 
   public static TableState getTableState(Instance instance, String tableId) {
+    return getTableState(instance, tableId, false);
+  }
+
+  /**
+   * Get the current state of the table using the tableid. The boolean clearCache, if true will clear the table state in zookeeper before fetching the state.
+   * Added with ACCUMULO-4574.
+   *
+   * @param instance
+   *          the Accumulo instance.
+   * @param tableId
+   *          the table id
+   * @param clearCachedState
+   *          if true clear the table state in zookeeper before checking status
+   * @return the table state.
+   */
+  public static TableState getTableState(Instance instance, String tableId, boolean clearCachedState) {
+
     String statePath = ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE;
+
+    if (clearCachedState) {
+      Tables.clearCacheByPath(instance, statePath);
+    }
+
     ZooCache zc = getZooCache(instance);
     byte[] state = zc.get(statePath);
     if (state == null)
       return TableState.UNKNOWN;
 
     return TableState.valueOf(new String(state, UTF_8));
+
   }
 
   public static long getCacheResetCount() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/15ae69c3/test/src/test/java/org/apache/accumulo/test/functional/TableChangeStateIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/TableChangeStateIT.java b/test/src/test/java/org/apache/accumulo/test/functional/TableChangeStateIT.java
new file mode 100644
index 0000000..7372dc7
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/TableChangeStateIT.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.AdminUtil;
+import org.apache.accumulo.fate.ZooStore;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * ACCUMULO-4574. Test to verify that changing table state to online / offline {@link org.apache.accumulo.core.client.admin.TableOperations#online(String)} when
+ * the table is already in that state returns without blocking.
+ */
+public class TableChangeStateIT extends AccumuloClusterIT {
+
+  private static final Logger log = LoggerFactory.getLogger(TableChangeStateIT.class);
+
+  private static final int NUM_ROWS = 1000;
+  private static final long SLOW_SCAN_SLEEP_MS = 100L;
+
+  private Connector connector;
+
+  @Before
+  public void setup() {
+    connector = getConnector();
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 4 * 60;
+  }
+
+  /**
+   * Validate that {@code TableOperations} online operation does not block when table is already online and fate transaction lock is held by other operations.
+   * The test creates, populates a table and then runs a compaction with a slow iterator so that operation takes long enough to simulate the condition. After
+   * the online operation while compaction is running completes, the test is complete and the compaction is canceled so that other tests can run.
+   *
+   * @throws Exception
+   *           any exception is a test failure.
+   */
+  @Test
+  public void changeTableStateTest() throws Exception {
+
+    ExecutorService pool = Executors.newCachedThreadPool();
+
+    String tableName = getUniqueNames(1)[0];
+
+    createData(tableName);
+
+    assertEquals("verify table online after created", TableState.ONLINE, getTableState(tableName));
+
+    OnLineCallable onlineOp = new OnLineCallable(tableName);
+
+    Future<OnlineOpTiming> task = pool.submit(onlineOp);
+
+    OnlineOpTiming timing1 = task.get();
+
+    log.trace("Online 1 in {} ms", TimeUnit.MILLISECONDS.convert(timing1.runningTime(), TimeUnit.NANOSECONDS));
+
+    assertEquals("verify table is still online", TableState.ONLINE, getTableState(tableName));
+
+    // verify that offline then online functions as expected.
+
+    connector.tableOperations().offline(tableName, true);
+    assertEquals("verify table is offline", TableState.OFFLINE, getTableState(tableName));
+
+    onlineOp = new OnLineCallable(tableName);
+
+    task = pool.submit(onlineOp);
+
+    OnlineOpTiming timing2 = task.get();
+
+    log.trace("Online 2 in {} ms", TimeUnit.MILLISECONDS.convert(timing2.runningTime(), TimeUnit.NANOSECONDS));
+
+    assertEquals("verify table is back online", TableState.ONLINE, getTableState(tableName));
+
+    // launch a full table compaction with the slow iterator to ensure table lock is acquired and held by the compaction
+
+    Future<?> compactTask = pool.submit(new SlowCompactionRunner(tableName));
+    assertTrue("verify that compaction running and fate transaction exists", blockUntilCompactionRunning(tableName));
+
+    // try to set online while fate transaction is in progress - before ACCUMULO-4574 this would block
+
+    onlineOp = new OnLineCallable(tableName);
+
+    task = pool.submit(onlineOp);
+
+    OnlineOpTiming timing3 = task.get();
+
+    assertTrue("online should take less time than expected compaction time",
+        timing3.runningTime() < TimeUnit.NANOSECONDS.convert(NUM_ROWS * SLOW_SCAN_SLEEP_MS, TimeUnit.MILLISECONDS));
+
+    assertEquals("verify table is still online", TableState.ONLINE, getTableState(tableName));
+
+    assertTrue("verify compaction still running and fate transaction still exists", blockUntilCompactionRunning(tableName));
+
+    // test complete, cancel compaction and move on.
+    connector.tableOperations().cancelCompaction(tableName);
+
+    log.debug("Success: Timing results for online commands.");
+    log.debug("Time for unblocked online {} ms", TimeUnit.MILLISECONDS.convert(timing1.runningTime(), TimeUnit.NANOSECONDS));
+    log.debug("Time for online when offline {} ms", TimeUnit.MILLISECONDS.convert(timing2.runningTime(), TimeUnit.NANOSECONDS));
+    log.debug("Time for blocked online {} ms", TimeUnit.MILLISECONDS.convert(timing3.runningTime(), TimeUnit.NANOSECONDS));
+
+    // block if compaction still running
+    compactTask.get();
+
+  }
+
+  /**
+   * Blocks current thread until compaction is running.
+   *
+   * @return true if compaction and associate fate found.
+   */
+  private boolean blockUntilCompactionRunning(final String tableName) {
+
+    int runningCompactions = 0;
+
+    List<String> tservers = connector.instanceOperations().getTabletServers();
+
+    /*
+     * wait for compaction to start - The compaction will acquire a fate transaction lock that used to block a subsequent online command while the fate
+     * transaction lock was held.
+     */
+    while (runningCompactions == 0) {
+
+      try {
+
+        for (String tserver : tservers) {
+          runningCompactions += connector.instanceOperations().getActiveCompactions(tserver).size();
+          log.trace("tserver {}, running compactions {}", tservers, runningCompactions);
+        }
+
+      } catch (AccumuloSecurityException | AccumuloException ex) {
+        throw new IllegalStateException("failed to get active compactions, test fails.", ex);
+      }
+
+      try {
+        Thread.sleep(250);
+      } catch (InterruptedException ex) {
+        // reassert interrupt
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    // Validate that there is a compaction fate transaction - otherwise test is invalid.
+    return findFate(tableName);
+  }
+
+  /**
+   * Checks fates in zookeeper looking for transaction associated with a compaction as a double check that the test will be valid because the running compaction
+   * does have a fate transaction lock.
+   *
+   * @return true if corresponding fate transaction found, false otherwise
+   */
+  private boolean findFate(final String tableName) {
+
+    Instance instance = connector.getInstance();
+    AdminUtil<String> admin = new AdminUtil<>(false);
+
+    try {
+
+      String tableId = Tables.getTableId(instance, tableName);
+
+      log.trace("tid: {}", tableId);
+
+      String secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET);
+      IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret);
+      ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instance) + Constants.ZFATE, zk);
+      AdminUtil.FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS + "/" + tableId, null, null);
+
+      for (AdminUtil.TransactionStatus tx : fateStatus.getTransactions()) {
+
+        if (tx.getTop().contains("CompactionDriver") && tx.getDebug().contains("CompactRange")) {
+          return true;
+        }
+      }
+
+    } catch (KeeperException | TableNotFoundException | InterruptedException ex) {
+      throw new IllegalStateException(ex);
+    }
+
+    // did not find appropriate fate transaction for compaction.
+    return Boolean.FALSE;
+  }
+
+  /**
+   * Returns the current table state (ONLINE, OFFLINE,...) of named table.
+   *
+   * @param tableName
+   *          the table name
+   * @return the current table state
+   * @throws TableNotFoundException
+   *           if table does not exist
+   */
+  private TableState getTableState(String tableName) throws TableNotFoundException {
+
+    String tableId = Tables.getTableId(connector.getInstance(), tableName);
+
+    TableState tstate = Tables.getTableState(connector.getInstance(), tableId);
+
+    log.trace("tableName: '{}': tableId {}, current state: {}", tableName, tableId, tstate);
+
+    return tstate;
+  }
+
+  /**
+   * Create the provided table and populate with some data using a batch writer. The table is scanned to ensure it was populated as expected.
+   *
+   * @param tableName
+   *          the name of the table
+   */
+  private void createData(final String tableName) {
+
+    try {
+
+      // create table.
+      connector.tableOperations().create(tableName);
+      BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
+
+      // populate
+      for (int i = 0; i < NUM_ROWS; i++) {
+        Mutation m = new Mutation(new Text(String.format("%05d", i)));
+        m.put(new Text("col" + Integer.toString((i % 3) + 1)), new Text("qual"), new Value("junk".getBytes(UTF_8)));
+        bw.addMutation(m);
+      }
+      bw.close();
+
+      long startTimestamp = System.nanoTime();
+
+      Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+      int count = 0;
+      for (Map.Entry<Key,Value> elt : scanner) {
+        String expected = String.format("%05d", count);
+        assert (elt.getKey().getRow().toString().equals(expected));
+        count++;
+      }
+
+      log.trace("Scan time for {} rows {} ms", NUM_ROWS, TimeUnit.MILLISECONDS.convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
+
+      scanner.close();
+
+      if (count != NUM_ROWS) {
+        throw new IllegalStateException(String.format("Number of rows %1$d does not match expected %2$d", count, NUM_ROWS));
+      }
+    } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException | TableExistsException ex) {
+      throw new IllegalStateException("Create data failed with exception", ex);
+    }
+  }
+
+  /**
+   * Provides timing information for oline operation.
+   */
+  private static class OnlineOpTiming {
+
+    private long started = 0L;
+    private long completed = 0L;
+
+    OnlineOpTiming() {
+      started = System.nanoTime();
+    }
+
+    /**
+     * stop timing and set completion flag.
+     */
+    void setComplete() {
+      completed = System.nanoTime();
+    }
+
+    /**
+     * @return running time in nanoseconds.
+     */
+    long runningTime() {
+      return completed - started;
+    }
+  }
+
+  /**
+   * Run online operation in a separate thread and gather timing information.
+   */
+  private class OnLineCallable implements Callable<OnlineOpTiming> {
+
+    final String tableName;
+
+    /**
+     * Create an instance of this class to set the provided table online.
+     *
+     * @param tableName
+     *          The table name that will be set online.
+     */
+    OnLineCallable(final String tableName) {
+      this.tableName = tableName;
+    }
+
+    @Override
+    public OnlineOpTiming call() throws Exception {
+
+      OnlineOpTiming status = new OnlineOpTiming();
+
+      log.trace("Setting {} online", tableName);
+
+      connector.tableOperations().online(tableName, true);
+      // stop timing
+      status.setComplete();
+
+      log.trace("Online completed in {} ms", TimeUnit.MILLISECONDS.convert(status.runningTime(), TimeUnit.NANOSECONDS));
+
+      return status;
+    }
+  }
+
+  /**
+   * Instance to create / run a compaction using a slow iterator.
+   */
+  private class SlowCompactionRunner implements Runnable {
+
+    private final String tableName;
+
+    /**
+     * Create an instance of this class.
+     *
+     * @param tableName
+     *          the name of the table that will be compacted with the slow iterator.
+     */
+    SlowCompactionRunner(final String tableName) {
+      this.tableName = tableName;
+    }
+
+    @Override
+    public void run() {
+
+      long startTimestamp = System.nanoTime();
+
+      IteratorSetting slow = new IteratorSetting(30, "slow", SlowIterator.class);
+      SlowIterator.setSleepTime(slow, SLOW_SCAN_SLEEP_MS);
+
+      List<IteratorSetting> compactIterators = new ArrayList<>();
+      compactIterators.add(slow);
+
+      log.trace("Slow iterator {}", slow.toString());
+
+      try {
+
+        log.trace("Start compaction");
+
+        connector.tableOperations().compact(tableName, new Text("0"), new Text("z"), compactIterators, true, true);
+
+        log.trace("Compaction wait is complete");
+
+        log.trace("Slow compaction of {} rows took {} ms", NUM_ROWS, TimeUnit.MILLISECONDS.convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
+
+        // validate that number of rows matches expected.
+
+        startTimestamp = System.nanoTime();
+
+        // validate expected data created and exists in table.
+
+        Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+
+        int count = 0;
+        for (Map.Entry<Key,Value> elt : scanner) {
+          String expected = String.format("%05d", count);
+          assert (elt.getKey().getRow().toString().equals(expected));
+          count++;
+        }
+
+        log.trace("After compaction, scan time for {} rows {} ms", NUM_ROWS,
+            TimeUnit.MILLISECONDS.convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
+
+        if (count != NUM_ROWS) {
+          throw new IllegalStateException(String.format("After compaction, number of rows %1$d does not match expected %2$d", count, NUM_ROWS));
+        }
+
+      } catch (TableNotFoundException ex) {
+        throw new IllegalStateException("test failed, table " + tableName + " does not exist", ex);
+      } catch (AccumuloSecurityException ex) {
+        throw new IllegalStateException("test failed, could not add iterator due to security exception", ex);
+      } catch (AccumuloException ex) {
+        // test cancels compaction on complete, so ignore it as an exception.
+        if (!ex.getMessage().contains("Compaction canceled")) {
+          throw new IllegalStateException("test failed with an Accumulo exception", ex);
+        }
+      }
+    }
+  }
+}