You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2019/11/07 23:16:01 UTC

[accumulo] branch master updated (c7bc980 -> 8f96071)

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git.


    from c7bc980  Move system iterators to iteratorsImpl package. Closes #1390 (#1411)
     add 2b8243b  Fix #1308 - Refactor fate concurrency IT (#1414)
     new 8f96071  Merge branch '1.9'

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../clientImpl/AccumuloBulkMergeException.java     |   1 +
 .../test/functional/FateConcurrencyIT.java         | 354 ++++++---------------
 .../org/apache/accumulo/test/util/SlowOps.java     | 347 ++++++++++++++++++++
 3 files changed, 438 insertions(+), 264 deletions(-)
 create mode 100644 test/src/main/java/org/apache/accumulo/test/util/SlowOps.java


[accumulo] 01/01: Merge branch '1.9'

Posted by ct...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 8f96071472f7d60a4f59d93b0fc03eaf5f4515b8
Merge: c7bc980 2b8243b
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Thu Nov 7 18:15:50 2019 -0500

    Merge branch '1.9'

 .../clientImpl/AccumuloBulkMergeException.java     |   1 +
 .../test/functional/FateConcurrencyIT.java         | 354 ++++++---------------
 .../org/apache/accumulo/test/util/SlowOps.java     | 347 ++++++++++++++++++++
 3 files changed, 438 insertions(+), 264 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloBulkMergeException.java
index 2a7527e,0000000..98d7015
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloBulkMergeException.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloBulkMergeException.java
@@@ -1,32 -1,0 +1,33 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.clientImpl;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +
 +/**
 + * Internal class indicating a concurrent merge occurred during the new bulk import.
 + */
 +public class AccumuloBulkMergeException extends AccumuloException {
 +
++  private static final long serialVersionUID = 1L;
 +  private static final String MSG = "Concurrent merge happened";
 +
 +  public AccumuloBulkMergeException(final Throwable cause) {
 +    super(MSG, cause);
 +  }
 +
 +}
diff --cc test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
index 8367891,c3a4d79..47a315a
--- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
@@@ -33,34 -30,22 +30,25 @@@ import java.util.concurrent.Future
  import java.util.concurrent.TimeUnit;
  
  import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Accumulo;
 +import org.apache.accumulo.core.client.AccumuloClient;
  import org.apache.accumulo.core.client.AccumuloException;
  import org.apache.accumulo.core.client.AccumuloSecurityException;
- import org.apache.accumulo.core.client.BatchWriter;
- import org.apache.accumulo.core.client.IteratorSetting;
- import org.apache.accumulo.core.client.Scanner;
- import org.apache.accumulo.core.client.TableExistsException;
 -import org.apache.accumulo.core.client.Connector;
 -import org.apache.accumulo.core.client.Instance;
  import org.apache.accumulo.core.client.TableNotFoundException;
 -import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.clientImpl.ClientContext;
- import org.apache.accumulo.core.clientImpl.ClientInfo;
 +import org.apache.accumulo.core.clientImpl.Tables;
  import org.apache.accumulo.core.conf.Property;
- import org.apache.accumulo.core.data.Key;
- import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.TableId;
- import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.core.master.state.tables.TableState;
- import org.apache.accumulo.core.security.Authorizations;
 -import org.apache.accumulo.core.zookeeper.ZooUtil;
  import org.apache.accumulo.fate.AdminUtil;
  import org.apache.accumulo.fate.ZooStore;
  import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil;
  import org.apache.accumulo.harness.AccumuloClusterHarness;
  import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
- import org.apache.hadoop.io.Text;
+ import org.apache.accumulo.test.util.SlowOps;
  import org.apache.zookeeper.KeeperException;
 +import org.junit.After;
  import org.junit.AfterClass;
  import org.junit.Before;
  import org.junit.Test;
@@@ -85,8 -70,7 +73,8 @@@ public class FateConcurrencyIT extends 
    private static final int NUM_ROWS = 1000;
    private static final long SLOW_SCAN_SLEEP_MS = 250L;
  
-   private AccumuloClient accumuloClient;
 -  private Connector connector;
++  private AccumuloClient client;
 +  private ClientContext context;
  
    private static final ExecutorService pool = Executors.newCachedThreadPool();
  
@@@ -101,19 -84,16 +88,22 @@@
  
    @Before
    public void setup() {
-     accumuloClient = Accumulo.newClient().from(getClientProps()).build();
-     context = (ClientContext) accumuloClient;
+ 
 -    connector = getConnector();
++    client = Accumulo.newClient().from(getClientProps()).build();
++    context = (ClientContext) client;
  
      tableName = getUniqueNames(1)[0];
  
      secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET);
  
-     createData(tableName);
+     maxWait = defaultTimeoutSeconds() <= 0 ? 60_000 : ((defaultTimeoutSeconds() * 1000) / 2);
+ 
 -    slowOps = new SlowOps(connector, tableName, maxWait, 1);
++    slowOps = new SlowOps(client, tableName, maxWait, 1);
 +  }
 +
 +  @After
 +  public void closeClient() {
-     accumuloClient.close();
++    client.close();
    }
  
    @AfterClass
@@@ -154,7 -134,7 +144,7 @@@
  
      // verify that offline then online functions as expected.
  
-     accumuloClient.tableOperations().offline(tableName, true);
 -    connector.tableOperations().offline(tableName, true);
++    client.tableOperations().offline(tableName, true);
      assertEquals("verify table is offline", TableState.OFFLINE, getTableState(tableName));
  
      onlineOp = new OnLineCallable(tableName);
@@@ -187,11 -166,10 +176,10 @@@
  
      assertEquals("verify table is still online", TableState.ONLINE, getTableState(tableName));
  
-     assertTrue("verify compaction still running and fate transaction still exists",
-         blockUntilCompactionRunning(tableName));
+     assertTrue("Find FATE operation for table", findFate(tableName));
  
      // test complete, cancel compaction and move on.
-     accumuloClient.tableOperations().cancelCompaction(tableName);
 -    connector.tableOperations().cancelCompaction(tableName);
++    client.tableOperations().cancelCompaction(tableName);
  
      log.debug("Success: Timing results for online commands.");
      log.debug("Time for unblocked online {} ms",
@@@ -214,14 -215,9 +225,8 @@@
    @Test
    public void getFateStatus() {
  
 -    Instance instance = connector.getInstance();
 -    String tableId;
 +    TableId tableId;
  
-     // for development testing - force transient condition that was failing this test so that
-     // we know if multiple compactions are running, they are properly handled by the test code.
-     if (runMultipleCompactions) {
-       runMultipleCompactions();
-     }
- 
      try {
  
        assertEquals("verify table online after created", TableState.ONLINE,
@@@ -249,14 -245,13 +254,13 @@@
  
        try {
  
-         String instanceId = accumuloClient.instanceOperations().getInstanceID();
-         ClientInfo info = ClientInfo.from(accumuloClient.properties());
-         IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(info.getZooKeepers(),
-             info.getZooKeepersSessionTimeOut(), secret);
++        String instanceId = context.getInstanceID();
+         IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(
 -            instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret);
 -
 -        ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instance) + Constants.ZFATE, zk);
++            context.getZooKeepers(), context.getZooKeepersSessionTimeOut(), secret);
 +        ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk);
  
          withLocks = admin.getStatus(zs, zk,
 -            ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS + "/" + tableId, null, null);
 +            ZooUtil.getRoot(instanceId) + Constants.ZTABLE_LOCKS + "/" + tableId, null, null);
  
          // call method that does not use locks.
          noLocks = admin.getTransactionStatus(zs, null, null);
@@@ -306,111 -301,15 +310,15 @@@
      try {
  
        // test complete, cancel compaction and move on.
-       accumuloClient.tableOperations().cancelCompaction(tableName);
 -      connector.tableOperations().cancelCompaction(tableName);
++      client.tableOperations().cancelCompaction(tableName);
  
        // block if compaction still running
-       compactTask.get();
- 
-     } catch (InterruptedException ex) {
-       Thread.currentThread().interrupt();
-     } catch (TableNotFoundException | AccumuloSecurityException | AccumuloException
-         | ExecutionException ex) {
-       log.debug("Could not cancel compaction", ex);
-     }
-   }
- 
-   /**
-    * This method was helpful for debugging a condition that was causing transient test failures.
-    * This forces a condition that the test should be able to handle. This method is not needed
-    * during normal testing, it was kept to aid future test development / troubleshooting if other
-    * transient failures occur.
-    */
-   private void runMultipleCompactions() {
- 
-     for (int i = 0; i < 4; i++) {
- 
-       String aTableName = getUniqueNames(1)[0] + "_" + i;
- 
-       createData(aTableName);
- 
-       log.debug("Table: {}", aTableName);
- 
-       pool.submit(new SlowCompactionRunner(aTableName));
- 
-       assertTrue("verify that compaction running and fate transaction exists",
-           blockUntilCompactionRunning(aTableName));
- 
-     }
-   }
- 
-   /**
-    * Create and run a slow running compaction task. The method will block until the compaction has
-    * been started.
-    *
-    * @return a reference to the running compaction task.
-    */
-   private Future<?> startCompactTask() {
-     Future<?> compactTask = pool.submit(new SlowCompactionRunner(tableName));
-     assertTrue("verify that compaction running and fate transaction exists",
-         blockUntilCompactionRunning(tableName));
-     return compactTask;
-   }
- 
-   /**
-    * Blocks current thread until compaction is running.
-    *
-    * @return true if compaction and associate fate found.
-    */
-   private boolean blockUntilCompactionRunning(final String tableName) {
- 
-     long maxWait = defaultTimeoutSeconds() <= 0 ? 60_000 : ((defaultTimeoutSeconds() * 1000) / 2);
- 
-     long startWait = System.currentTimeMillis();
- 
-     List<String> tservers = accumuloClient.instanceOperations().getTabletServers();
- 
-     /*
-      * wait for compaction to start on table - The compaction will acquire a fate transaction lock
-      * that used to block a subsequent online command while the fate transaction lock was held.
-      */
-     while (System.currentTimeMillis() < (startWait + maxWait)) {
- 
-       try {
- 
-         int runningCompactions = 0;
- 
-         for (String tserver : tservers) {
-           runningCompactions +=
-               accumuloClient.instanceOperations().getActiveCompactions(tserver).size();
-           log.trace("tserver {}, running compactions {}", tservers, runningCompactions);
-         }
- 
-         if (runningCompactions > 0) {
-           // Validate that there is a compaction fate transaction - otherwise test is invalid.
-           if (findFate(tableName)) {
-             return true;
-           }
-         }
- 
-       } catch (AccumuloSecurityException | AccumuloException ex) {
-         throw new IllegalStateException("failed to get active compactions, test fails.", ex);
-       } catch (KeeperException ex) {
-         log.trace("Saw possible transient zookeeper error");
-       }
+       boolean cancelled = slowOps.blockWhileCompactionRunning();
+       log.debug("Cancel completed successfully: {}", cancelled);
  
-       try {
-         Thread.sleep(250);
-       } catch (InterruptedException ex) {
-         // reassert interrupt
-         Thread.currentThread().interrupt();
-       }
+     } catch (TableNotFoundException | AccumuloSecurityException | AccumuloException ex) {
+       log.debug("Could not cancel compaction due to exception", ex);
      }
- 
-     log.debug("Could not find compaction for {} after {} seconds", tableName,
-         TimeUnit.MILLISECONDS.toSeconds(maxWait));
- 
-     return false;
- 
    }
  
    /**
@@@ -428,8 -327,9 +336,8 @@@
     * @throws KeeperException
     *           if a zookeeper error occurred - allows for retries.
     */
-   private boolean findFate(final String tableName) throws KeeperException {
+   private boolean lookupFateInZookeeper(final String tableName) throws KeeperException {
  
 -    Instance instance = connector.getInstance();
      AdminUtil<String> admin = new AdminUtil<>(false);
  
      try {
@@@ -438,16 -338,11 +346,12 @@@
  
        log.trace("tid: {}", tableId);
  
-       ClientInfo info = ClientInfo.from(accumuloClient.properties());
-       IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(info.getZooKeepers(),
-           info.getZooKeepersSessionTimeOut(), secret);
-       ZooStore<String> zs = new ZooStore<>(
-           ZooUtil.getRoot(accumuloClient.instanceOperations().getInstanceID()) + Constants.ZFATE,
-           zk);
 -      IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(
 -          instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret);
 -      ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instance) + Constants.ZFATE, zk);
++      String instanceId = context.getInstanceID();
++      IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(context.getZooKeepers(),
++          context.getZooKeepersSessionTimeOut(), secret);
++      ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk);
        AdminUtil.FateStatus fateStatus = admin.getStatus(zs, zk,
-           ZooUtil.getRoot(accumuloClient.instanceOperations().getInstanceID())
-               + Constants.ZTABLE_LOCKS + "/" + tableId,
-           null, null);
 -          ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS + "/" + tableId, null, null);
++          ZooUtil.getRoot(instanceId) + Constants.ZTABLE_LOCKS + "/" + tableId, null, null);
  
        log.trace("current fates: {}", fateStatus.getTransactions().size());
  
@@@ -614,7 -456,7 +465,7 @@@
  
        log.trace("Setting {} online", tableName);
  
-       accumuloClient.tableOperations().online(tableName, true);
 -      connector.tableOperations().online(tableName, true);
++      client.tableOperations().online(tableName, true);
        // stop timing
        status.setComplete();
  
@@@ -626,73 -468,48 +477,48 @@@
    }
  
    /**
-    * Instance to create / run a compaction using a slow iterator.
+    * Concurrency testing - ensure that tests are valid id multiple compactions are running. for
+    * development testing - force transient condition that was failing this test so that we know if
+    * multiple compactions are running, they are properly handled by the test code and the tests are
+    * valid.
     */
-   private class SlowCompactionRunner implements Runnable {
- 
-     private final String tableName;
+   @Test
+   public void multipleCompactions() {
  
-     /**
-      * Create an instance of this class.
-      *
-      * @param tableName
-      *          the name of the table that will be compacted with the slow iterator.
-      */
-     SlowCompactionRunner(final String tableName) {
-       this.tableName = tableName;
-     }
+     int tableCount = 4;
  
-     @Override
-     public void run() {
+     List<SlowOps> tables = new ArrayList<>();
  
-       long startTimestamp = System.nanoTime();
+     for (int i = 0; i < tableCount; i++) {
+       String uniqueName = getUniqueNames(1)[0] + "_" + i;
 -      SlowOps gen = new SlowOps(connector, uniqueName, maxWait, tableCount);
++      SlowOps gen = new SlowOps(client, uniqueName, maxWait, tableCount);
+       tables.add(gen);
+       gen.startCompactTask();
+     }
  
-       IteratorSetting slow = new IteratorSetting(30, "slow", SlowIterator.class);
-       SlowIterator.setSleepTime(slow, SLOW_SCAN_SLEEP_MS);
+     int foundCount = 0;
  
-       List<IteratorSetting> compactIterators = new ArrayList<>();
-       compactIterators.add(slow);
+     for (SlowOps t : tables) {
+       log.debug("Look for fate {}", t.getTableName());
+       if (findFate(t.getTableName())) {
+         log.debug("Found fate {}", t.getTableName());
+         foundCount++;
+       }
+     }
  
-       log.trace("Slow iterator {}", slow);
+     assertEquals(tableCount, foundCount);
  
+     for (SlowOps t : tables) {
        try {
- 
-         log.trace("Start compaction");
- 
-         accumuloClient.tableOperations().compact(tableName, new Text("0"), new Text("z"),
-             compactIterators, true, true);
- 
-         log.trace("Compaction wait is complete");
- 
-         log.trace("Slow compaction of {} rows took {} ms", NUM_ROWS, TimeUnit.MILLISECONDS
-             .convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
- 
-         // validate that number of rows matches expected.
- 
-         startTimestamp = System.nanoTime();
- 
-         // validate expected data created and exists in table.
- 
-         int count = scanCount(tableName);
- 
-         log.trace("After compaction, scan time for {} rows {} ms", NUM_ROWS, TimeUnit.MILLISECONDS
-             .convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
- 
-         if (count != NUM_ROWS) {
-           throw new IllegalStateException(
-               String.format("After compaction, number of rows %1$d does not match expected %2$d",
-                   count, NUM_ROWS));
-         }
-       } catch (TableNotFoundException ex) {
-         throw new IllegalStateException("test failed, table " + tableName + " does not exist", ex);
-       } catch (AccumuloSecurityException ex) {
-         throw new IllegalStateException(
-             "test failed, could not add iterator due to security exception", ex);
-       } catch (AccumuloException ex) {
-         // test cancels compaction on complete, so ignore it as an exception.
-         if (!ex.getMessage().contains("Compaction canceled")) {
-           throw new IllegalStateException("test failed with an Accumulo exception", ex);
 -        connector.tableOperations().cancelCompaction(t.getTableName());
++        client.tableOperations().cancelCompaction(t.getTableName());
+         // block if compaction still running
+         boolean cancelled = t.blockWhileCompactionRunning();
+         if (!cancelled) {
+           log.info("Failed to cancel compaction during multiple compaction test clean-up for {}",
+               t.getTableName());
          }
+       } catch (AccumuloSecurityException | TableNotFoundException | AccumuloException ex) {
+         log.debug("Exception throw during multiple table test clean-up", ex);
        }
      }
    }
diff --cc test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
index 0000000,bd51990..5e28a06
mode 000000,100644..100644
--- a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
+++ b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
@@@ -1,0 -1,347 +1,347 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.test.util;
+ 
+ import static java.nio.charset.StandardCharsets.UTF_8;
+ 
+ import java.util.ArrayList;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.TimeUnit;
+ 
++import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.client.AccumuloException;
+ import org.apache.accumulo.core.client.AccumuloSecurityException;
+ import org.apache.accumulo.core.client.BatchWriter;
+ import org.apache.accumulo.core.client.BatchWriterConfig;
 -import org.apache.accumulo.core.client.Connector;
+ import org.apache.accumulo.core.client.IteratorSetting;
+ import org.apache.accumulo.core.client.Scanner;
+ import org.apache.accumulo.core.client.TableExistsException;
+ import org.apache.accumulo.core.client.TableNotFoundException;
+ import org.apache.accumulo.core.client.admin.ActiveCompaction;
+ import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.data.Mutation;
+ import org.apache.accumulo.core.data.Value;
+ import org.apache.accumulo.core.security.Authorizations;
+ import org.apache.accumulo.test.functional.SlowIterator;
+ import org.apache.hadoop.io.Text;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  * Common methods for performing operations that are deliberately take some period of time so that
+  * tests can interact while the operations are in progress.
+  */
+ public class SlowOps {
+ 
+   private static final Logger log = LoggerFactory.getLogger(SlowOps.class);
+ 
+   private static final String TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX =
+       "tserver.compaction.major.concurrent.max";
+ 
+   private static final long SLOW_SCAN_SLEEP_MS = 250L;
+   private static final int NUM_DATA_ROWS = 1000;
+ 
 -  private final Connector connector;
++  private final AccumuloClient client;
+   private final String tableName;
+   private final long maxWait;
+ 
+   // private final int numRows = DEFAULT_NUM_DATA_ROWS;
+ 
+   private static final ExecutorService pool = Executors.newCachedThreadPool();
+ 
+   private Future<?> compactTask = null;
+ 
 -  private SlowOps(final Connector connector, final String tableName, final long maxWait) {
++  private SlowOps(final AccumuloClient client, final String tableName, final long maxWait) {
+ 
 -    this.connector = connector;
++    this.client = client;
+     this.tableName = tableName;
+     this.maxWait = maxWait;
+ 
+     createData();
+   }
+ 
 -  public SlowOps(final Connector connector, final String tableName, final long maxWait,
++  public SlowOps(final AccumuloClient client, final String tableName, final long maxWait,
+       final int numParallelExpected) {
+ 
 -    this(connector, tableName, maxWait);
++    this(client, tableName, maxWait);
+ 
+     setExpectedCompactions(numParallelExpected);
+ 
+   }
+ 
+   public void setExpectedCompactions(final int numParallelExpected) {
+ 
+     final int target = numParallelExpected + 1;
+ 
+     Map<String,String> sysConfig;
+ 
+     try {
+ 
 -      sysConfig = connector.instanceOperations().getSystemConfiguration();
++      sysConfig = client.instanceOperations().getSystemConfiguration();
+ 
+       int current = Integer.parseInt(sysConfig.get("tserver.compaction.major.concurrent.max"));
+ 
+       if (current < target) {
 -        connector.instanceOperations().setProperty(TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX,
++        client.instanceOperations().setProperty(TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX,
+             Integer.toString(target));
+ 
 -        sysConfig = connector.instanceOperations().getSystemConfiguration();
++        sysConfig = client.instanceOperations().getSystemConfiguration();
+ 
+       }
+ 
+       Integer.parseInt(sysConfig.get(TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX));
+ 
+     } catch (AccumuloException | AccumuloSecurityException | NumberFormatException ex) {
+       throw new IllegalStateException("Could not set parallel compaction limit to " + target, ex);
+     }
+   }
+ 
+   public String getTableName() {
+     return tableName;
+   }
+ 
+   private void createData() {
+ 
+     try {
+ 
+       // create table.
 -      connector.tableOperations().create(tableName);
++      client.tableOperations().create(tableName);
+ 
+       log.info("Created table id: {}, name \'{}\'",
 -          connector.tableOperations().tableIdMap().get(tableName), tableName);
++          client.tableOperations().tableIdMap().get(tableName), tableName);
+ 
 -      try (BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig())) {
++      try (BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig())) {
+         // populate
+         for (int i = 0; i < NUM_DATA_ROWS; i++) {
+           Mutation m = new Mutation(new Text(String.format("%05d", i)));
+           m.put(new Text("col" + ((i % 3) + 1)), new Text("qual"),
+               new Value("junk".getBytes(UTF_8)));
+           bw.addMutation(m);
+         }
+       }
+ 
+       verifyRows();
+ 
+     } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException
+         | TableExistsException ex) {
+       throw new IllegalStateException("Create data failed with exception", ex);
+     }
+   }
+ 
+   private void verifyRows() {
+ 
+     long startTimestamp = System.nanoTime();
+ 
+     int count = scanCount();
+ 
+     log.trace("Scan time for {} rows {} ms", NUM_DATA_ROWS,
+         TimeUnit.MILLISECONDS.convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
+ 
+     if (count != NUM_DATA_ROWS) {
+       throw new IllegalStateException(
+           String.format("Number of rows %1$d does not match expected %2$d", count, NUM_DATA_ROWS));
+     }
+   }
+ 
+   private int scanCount() {
 -    try (Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY)) {
++    try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+ 
+       int count = 0;
+ 
+       for (Map.Entry<Key,Value> elt : scanner) {
+         String expected = String.format("%05d", count);
+         assert (elt.getKey().getRow().toString().equals(expected));
+         count++;
+       }
+       return count;
+     } catch (TableNotFoundException ex) {
+       log.debug("cannot verify row count, table \'{}\' does not exist", tableName);
+       throw new IllegalStateException(ex);
+     }
+   }
+ 
+   /**
+    * Create and run a slow running compaction task. The method will block until the compaction has
+    * been started. The compaction should be cancelled using Accumulo tableOps, and then the caller
+    * can use blockWhileCompactionRunning() on the instance of this class.
+    */
+   public void startCompactTask() {
+ 
+     compactTask = pool.submit(new SlowCompactionRunner());
+ 
+     if (!blockUntilCompactionRunning()) {
+       throw new IllegalStateException("Compaction could not be started for " + tableName);
+     }
+   }
+ 
+   /**
+    * Instance to create / run a compaction using a slow iterator.
+    */
+   private class SlowCompactionRunner implements Runnable {
+ 
+     SlowCompactionRunner() {}
+ 
+     @Override
+     public void run() {
+ 
+       long startTimestamp = System.nanoTime();
+ 
+       IteratorSetting slow = new IteratorSetting(30, "slow", SlowIterator.class);
+       SlowIterator.setSleepTime(slow, SLOW_SCAN_SLEEP_MS);
+ 
+       List<IteratorSetting> compactIterators = new ArrayList<>();
+       compactIterators.add(slow);
+ 
+       log.trace("Starting slow operation using iterator: {}", slow);
+ 
+       int retry = 0;
+       boolean completed = false;
+ 
+       while (!completed && retry++ < 5) {
+ 
+         try {
+           log.info("Starting compaction.  Attempt {}", retry);
 -          connector.tableOperations().compact(tableName, null, null, compactIterators, true, true);
++          client.tableOperations().compact(tableName, null, null, compactIterators, true, true);
+           completed = true;
+         } catch (Throwable ex) {
+           // test cancels compaction on complete, so ignore it as an exception.
+           if (ex.getMessage().contains("Compaction canceled")) {
+             return;
+           }
+           log.info("Exception thrown while waiting for compaction - will retry", ex);
+           try {
+             Thread.sleep(10_000 * retry);
+           } catch (InterruptedException iex) {
+             Thread.currentThread().interrupt();
+             return;
+           }
+         }
+       }
+       log.debug("Compaction wait is complete");
+ 
+       log.trace("Slow compaction of {} rows took {} ms", NUM_DATA_ROWS, TimeUnit.MILLISECONDS
+           .convert((System.nanoTime() - startTimestamp), TimeUnit.NANOSECONDS));
+ 
+       // validate that number of rows matches expected.
+ 
+       startTimestamp = System.nanoTime();
+ 
+       // validate expected data created and exists in table.
+ 
+       int count = scanCount();
+ 
+       log.trace("After compaction, scan time for {} rows {} ms", NUM_DATA_ROWS,
+           TimeUnit.MILLISECONDS.convert((System.nanoTime() - startTimestamp),
+               TimeUnit.NANOSECONDS));
+ 
+       if (count != NUM_DATA_ROWS) {
+         throw new IllegalStateException(
+             String.format("After compaction, number of rows %1$d does not match expected %2$d",
+                 count, NUM_DATA_ROWS));
+       }
+     }
+   }
+ 
+   /**
+    * Blocks current thread until compaction is running.
+    *
+    * @return true if compaction and associate fate found.
+    */
+   private boolean blockUntilCompactionRunning() {
+ 
+     long startWait = System.currentTimeMillis();
+ 
 -    List<String> tservers = connector.instanceOperations().getTabletServers();
++    List<String> tservers = client.instanceOperations().getTabletServers();
+ 
+     /*
+      * wait for compaction to start on table - The compaction will acquire a fate transaction lock
+      * that used to block a subsequent online command while the fate transaction lock was held.
+      */
+     while (System.currentTimeMillis() < (startWait + maxWait)) {
+ 
+       try {
+ 
+         List<ActiveCompaction> activeCompactions = new ArrayList<>();
+ 
+         for (String tserver : tservers) {
 -          List<ActiveCompaction> ac = connector.instanceOperations().getActiveCompactions(tserver);
++          List<ActiveCompaction> ac = client.instanceOperations().getActiveCompactions(tserver);
+           activeCompactions.addAll(ac);
+           // runningCompactions += ac.size();
+           log.trace("tserver {}, running compactions {}", tservers, ac.size());
+         }
+ 
+         if (!activeCompactions.isEmpty()) {
+           try {
+             for (ActiveCompaction compaction : activeCompactions) {
+               log.debug("Compaction running for {}", compaction.getTable());
+               if (compaction.getTable().compareTo(tableName) == 0) {
+                 return true;
+               }
+             }
+           } catch (TableNotFoundException ex) {
+             log.trace("Compaction found for unknown table {}", activeCompactions);
+           }
+         }
+       } catch (AccumuloSecurityException | AccumuloException ex) {
+         throw new IllegalStateException("failed to get active compactions, test fails.", ex);
+       }
+ 
+       try {
+         Thread.sleep(3_000);
+       } catch (InterruptedException ex) {
+         // reassert interrupt
+         Thread.currentThread().interrupt();
+       }
+     }
+ 
+     log.debug("Could not find compaction for {} after {} seconds", tableName,
+         TimeUnit.MILLISECONDS.toSeconds(maxWait));
+ 
+     return false;
+ 
+   }
+ 
+   /**
+    * Will block as long as the underlying compaction task is running. This method is intended to be
+    * used when the the compaction is cancelled via table operation cancel method - when the cancel
+    * command completed, the running task will terminate and then this method will return.
+    *
+    * @return true if the task returned.
+    */
+   public boolean blockWhileCompactionRunning() {
+ 
+     try {
+       if (compactTask == null) {
+         throw new IllegalStateException(
+             "Compaction task has not been started - call startCompactionTask() before blocking");
+       }
+       compactTask.get();
+       return true;
+     } catch (InterruptedException ex) {
+       Thread.currentThread().interrupt();
+       return false;
+     } catch (ExecutionException ex) {
+       return false;
+     }
+   }
+ 
+ }