You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/03/09 14:46:21 UTC

[accumulo] branch 1451-external-compactions-feature updated: Moved inner classes out of Compactor and CompactionCoordinator to make them smaller in overall size

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
     new c43ebb8  Moved inner classes out of Compactor and CompactionCoordinator to make them smaller in overall size
c43ebb8 is described below

commit c43ebb8ceaf78f76db7d9222b97672b713e123b2
Author: Dave Marion <dl...@apache.org>
AuthorDate: Tue Mar 9 14:45:44 2021 +0000

    Moved inner classes out of Compactor and CompactionCoordinator to make them smaller in overall size
---
 .../server/compaction/ExternalCompactionUtil.java  |  19 ++
 .../coordinator/CompactionCoordinator.java         | 187 +-------------
 .../accumulo/coordinator/CompactionUpdate.java}    |  35 ++-
 .../coordinator/CoordinatorLockWatcher.java        |  53 ++++
 .../accumulo/coordinator/QueueAndPriority.java     |  88 +++++++
 .../accumulo/coordinator/RunningCompaction.java    |  72 ++++++
 .../accumulo/compactor/CompactionEnvironment.java  |  95 +++++++
 .../accumulo/compactor/CompactionJobHolder.java    |  77 ++++++
 .../org/apache/accumulo/compactor/Compactor.java   | 283 ++++++---------------
 9 files changed, 517 insertions(+), 392 deletions(-)

diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
index 51c743f..e346baa 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
@@ -24,6 +24,25 @@ import org.apache.accumulo.server.ServerContext;
 
 public class ExternalCompactionUtil {
 
+  /**
+   * Utility for returning the address of a service in the form host:port
+   *
+   * @param address
+   *          HostAndPort of service
+   * @return host and port
+   */
+  public static String getHostPortString(HostAndPort address) {
+    if (address == null) {
+      return null;
+    }
+    return address.getHost() + ":" + address.getPort();
+  }
+
+  /**
+   * 
+   * @param context
+   * @return
+   */
   public static HostAndPort findCompactionCoordinator(ServerContext context) {
     final String lockPath = context.getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK;
     byte[] address = context.getZooCache().get(lockPath);
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 227714a..db6777f 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -29,7 +29,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.WeakHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
@@ -48,17 +47,15 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
 import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.trace.TraceUtil;
-import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.HostAndPort;
-import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.accumulo.fate.zookeeper.ZooLock;
-import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.server.compaction.RetryableThriftCall;
 import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
 import org.apache.accumulo.server.compaction.RetryableThriftFunction;
@@ -79,178 +76,6 @@ public class CompactionCoordinator extends AbstractServer
     implements org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface,
     LiveTServerSet.Listener {
 
-  private static class QueueAndPriority implements Comparable<QueueAndPriority> {
-
-    private static WeakHashMap<Pair<String,Long>,QueueAndPriority> CACHE = new WeakHashMap<>();
-
-    public static QueueAndPriority get(String queue, Long priority) {
-      return CACHE.putIfAbsent(new Pair<>(queue, priority), new QueueAndPriority(queue, priority));
-    }
-
-    private final String queue;
-    private final Long priority;
-
-    private QueueAndPriority(String queue, Long priority) {
-      super();
-      this.queue = queue;
-      this.priority = priority;
-    }
-
-    public String getQueue() {
-      return queue;
-    }
-
-    public Long getPriority() {
-      return priority;
-    }
-
-    @Override
-    public int hashCode() {
-      return queue.hashCode() + priority.hashCode();
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder buf = new StringBuilder();
-      buf.append("queue: ").append(queue);
-      buf.append(", priority: ").append(priority);
-      return buf.toString();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (null == obj)
-        return false;
-      if (obj == this)
-        return true;
-      if (!(obj instanceof QueueAndPriority)) {
-        return false;
-      } else {
-        QueueAndPriority other = (QueueAndPriority) obj;
-        return this.queue.equals(other.queue) && this.priority.equals(other.priority);
-      }
-    }
-
-    @Override
-    public int compareTo(QueueAndPriority other) {
-      int result = this.queue.compareTo(other.queue);
-      if (result == 0) {
-        // reversing order such that if other priority is lower, then this has a higher priority
-        return Long.compare(other.priority, this.priority);
-      } else {
-        return result;
-      }
-    }
-
-  }
-
-  private static class CoordinatorLockWatcher implements ZooLock.AccumuloLockWatcher {
-
-    @Override
-    public void lostLock(LockLossReason reason) {
-      Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
-    }
-
-    @Override
-    public void unableToMonitorLockNode(final Exception e) {
-      // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility
-      Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor Coordinator lock node", e));
-
-    }
-
-    @Override
-    public synchronized void acquiredLock() {
-      // This is overridden by the LockWatcherWrapper in ZooLock.tryLock()
-    }
-
-    @Override
-    public synchronized void failedToAcquireLock(Exception e) {
-      // This is overridden by the LockWatcherWrapper in ZooLock.tryLock()
-    }
-
-  }
-
-  /**
-   * Utility for returning the address in the form host:port
-   *
-   * @return host and port for Compactor client connections
-   */
-  private static String getHostPortString(HostAndPort address) {
-    if (address == null) {
-      return null;
-    }
-    return address.getHost() + ":" + address.getPort();
-  }
-
-  private static class CompactionUpdate {
-    private final Long timestamp;
-    private final String message;
-    private final CompactionState state;
-
-    public CompactionUpdate(Long timestamp, String message, CompactionState state) {
-      super();
-      this.timestamp = timestamp;
-      this.message = message;
-      this.state = state;
-    }
-
-    public Long getTimestamp() {
-      return timestamp;
-    }
-
-    public String getMessage() {
-      return message;
-    }
-
-    public CompactionState getState() {
-      return state;
-    }
-  }
-
-  private static class RunningCompaction {
-    private final TExternalCompactionJob job;
-    private final String compactorAddress;
-    private final TServerInstance tserver;
-    private Map<Long,CompactionUpdate> updates = new TreeMap<>();
-    private CompactionStats stats = null;
-
-    public RunningCompaction(TExternalCompactionJob job, String compactorAddress,
-        TServerInstance tserver) {
-      super();
-      this.job = job;
-      this.compactorAddress = compactorAddress;
-      this.tserver = tserver;
-    }
-
-    public Map<Long,CompactionUpdate> getUpdates() {
-      return updates;
-    }
-
-    public void addUpdate(Long timestamp, String message, CompactionState state) {
-      this.updates.put(timestamp, new CompactionUpdate(timestamp, message, state));
-    }
-
-    public CompactionStats getStats() {
-      return stats;
-    }
-
-    public void setStats(CompactionStats stats) {
-      this.stats = stats;
-    }
-
-    public TExternalCompactionJob getJob() {
-      return job;
-    }
-
-    public String getCompactorAddress() {
-      return compactorAddress;
-    }
-
-    public TServerInstance getTserver() {
-      return tserver;
-    }
-  }
-
   private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class);
   private static final long TIME_BETWEEN_CHECKS = 5000;
 
@@ -290,11 +115,11 @@ public class CompactionCoordinator extends AbstractServer
    * @throws KeeperException
    * @throws InterruptedException
    */
-  private boolean getCoordinatorLock(HostAndPort clientAddress)
+  protected boolean getCoordinatorLock(HostAndPort clientAddress)
       throws KeeperException, InterruptedException {
     LOG.info("trying to get coordinator lock");
 
-    final String coordinatorClientAddress = getHostPortString(clientAddress);
+    final String coordinatorClientAddress = ExternalCompactionUtil.getHostPortString(clientAddress);
     final String lockPath = getContext().getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK;
     final UUID zooLockUUID = UUID.randomUUID();
 
@@ -309,7 +134,7 @@ public class CompactionCoordinator extends AbstractServer
    * @return address of this CompactionCoordinator client service
    * @throws UnknownHostException
    */
-  private ServerAddress startCoordinatorClientService() throws UnknownHostException {
+  protected ServerAddress startCoordinatorClientService() throws UnknownHostException {
     CompactionCoordinator rpcProxy = TraceUtil.wrapService(this);
     final org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<
         CompactionCoordinator> processor;
@@ -488,7 +313,7 @@ public class CompactionCoordinator extends AbstractServer
     }
   }
 
-  private TabletClientService.Client getTabletServerConnection(TServerInstance tserver)
+  protected TabletClientService.Client getTabletServerConnection(TServerInstance tserver)
       throws TTransportException {
     TServerConnection connection = tserverSet.getConnection(tserver);
     TTransport transport =
@@ -496,7 +321,7 @@ public class CompactionCoordinator extends AbstractServer
     return ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport);
   }
 
-  private Compactor.Client getCompactorConnection(HostAndPort compactorAddress)
+  protected Compactor.Client getCompactorConnection(HostAndPort compactorAddress)
       throws TTransportException {
     TTransport transport =
         ThriftTransportPool.getInstance().getTransport(compactorAddress, 0, getContext());
diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionUpdate.java
similarity index 57%
copy from server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
copy to server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionUpdate.java
index 51c743f..becc428 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionUpdate.java
@@ -16,18 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.server.compaction;
+package org.apache.accumulo.coordinator;
 
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.util.HostAndPort;
-import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.core.compaction.thrift.CompactionState;
 
-public class ExternalCompactionUtil {
+public class CompactionUpdate {
 
-  public static HostAndPort findCompactionCoordinator(ServerContext context) {
-    final String lockPath = context.getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK;
-    byte[] address = context.getZooCache().get(lockPath);
-    String coordinatorAddress = new String(address);
-    return HostAndPort.fromString(coordinatorAddress);
+  private final Long timestamp;
+  private final String message;
+  private final CompactionState state;
+
+  CompactionUpdate(Long timestamp, String message, CompactionState state) {
+    super();
+    this.timestamp = timestamp;
+    this.message = message;
+    this.state = state;
+  }
+
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  public CompactionState getState() {
+    return state;
   }
+
 }
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java
new file mode 100644
index 0000000..30ebb2e
--- /dev/null
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorLockWatcher.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.coordinator;
+
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.fate.zookeeper.ZooLock;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CoordinatorLockWatcher implements ZooLock.AccumuloLockWatcher {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLockWatcher.class);
+
+  @Override
+  public void lostLock(LockLossReason reason) {
+    Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
+  }
+
+  @Override
+  public void unableToMonitorLockNode(final Exception e) {
+    // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility
+    Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor Coordinator lock node", e));
+
+  }
+
+  @Override
+  public synchronized void acquiredLock() {
+    // This is overridden by the LockWatcherWrapper in ZooLock.tryLock()
+  }
+
+  @Override
+  public synchronized void failedToAcquireLock(Exception e) {
+    // This is overridden by the LockWatcherWrapper in ZooLock.tryLock()
+  }
+
+}
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java
new file mode 100644
index 0000000..641fa97
--- /dev/null
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueAndPriority.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.coordinator;
+
+import java.util.WeakHashMap;
+
+import org.apache.accumulo.core.util.Pair;
+
+public class QueueAndPriority implements Comparable<QueueAndPriority> {
+
+  private static WeakHashMap<Pair<String,Long>,QueueAndPriority> CACHE = new WeakHashMap<>();
+
+  public static QueueAndPriority get(String queue, Long priority) {
+    return CACHE.putIfAbsent(new Pair<>(queue, priority), new QueueAndPriority(queue, priority));
+  }
+
+  private final String queue;
+  private final Long priority;
+
+  private QueueAndPriority(String queue, Long priority) {
+    super();
+    this.queue = queue;
+    this.priority = priority;
+  }
+
+  public String getQueue() {
+    return queue;
+  }
+
+  public Long getPriority() {
+    return priority;
+  }
+
+  @Override
+  public int hashCode() {
+    return queue.hashCode() + priority.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("queue: ").append(queue);
+    buf.append(", priority: ").append(priority);
+    return buf.toString();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (null == obj)
+      return false;
+    if (obj == this)
+      return true;
+    if (!(obj instanceof QueueAndPriority)) {
+      return false;
+    } else {
+      QueueAndPriority other = (QueueAndPriority) obj;
+      return this.queue.equals(other.queue) && this.priority.equals(other.priority);
+    }
+  }
+
+  @Override
+  public int compareTo(QueueAndPriority other) {
+    int result = this.queue.compareTo(other.queue);
+    if (result == 0) {
+      // reversing order such that if other priority is lower, then this has a higher priority
+      return Long.compare(other.priority, this.priority);
+    } else {
+      return result;
+    }
+  }
+
+}
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java
new file mode 100644
index 0000000..cc36f50
--- /dev/null
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/RunningCompaction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.coordinator;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.compaction.thrift.CompactionState;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionStats;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+
+public class RunningCompaction {
+
+  private final TExternalCompactionJob job;
+  private final String compactorAddress;
+  private final TServerInstance tserver;
+  private Map<Long,CompactionUpdate> updates = new TreeMap<>();
+  private CompactionStats stats = null;
+
+  RunningCompaction(TExternalCompactionJob job, String compactorAddress, TServerInstance tserver) {
+    super();
+    this.job = job;
+    this.compactorAddress = compactorAddress;
+    this.tserver = tserver;
+  }
+
+  public Map<Long,CompactionUpdate> getUpdates() {
+    return updates;
+  }
+
+  public void addUpdate(Long timestamp, String message, CompactionState state) {
+    this.updates.put(timestamp, new CompactionUpdate(timestamp, message, state));
+  }
+
+  public CompactionStats getStats() {
+    return stats;
+  }
+
+  public void setStats(CompactionStats stats) {
+    this.stats = stats;
+  }
+
+  public TExternalCompactionJob getJob() {
+    return job;
+  }
+
+  public String getCompactorAddress() {
+    return compactorAddress;
+  }
+
+  public TServerInstance getTserver() {
+    return tserver;
+  }
+
+}
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
new file mode 100644
index 0000000..bfda224
--- /dev/null
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.compactor;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.compaction.Compactor.CompactionEnv;
+import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
+import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
+
+public class CompactionEnvironment implements CompactionEnv {
+
+  private final ServerContext context;
+  private final CompactionJobHolder jobHolder;
+
+  CompactionEnvironment(ServerContext context, CompactionJobHolder jobHolder) {
+    this.context = context;
+    this.jobHolder = jobHolder;
+  }
+
+  @Override
+  public boolean isCompactionEnabled() {
+    return !jobHolder.isCancelled();
+  }
+
+  @Override
+  public IteratorScope getIteratorScope() {
+    return IteratorScope.majc;
+  }
+
+  @Override
+  public RateLimiter getReadLimiter() {
+    return SharedRateLimiterFactory.getInstance(context.getConfiguration())
+        .create("read_rate_limiter", () -> jobHolder.getJob().getReadRate());
+  }
+
+  @Override
+  public RateLimiter getWriteLimiter() {
+    return SharedRateLimiterFactory.getInstance(context.getConfiguration())
+        .create("write_rate_limiter", () -> jobHolder.getJob().getWriteRate());
+  }
+
+  @Override
+  public SystemIteratorEnvironment createIteratorEnv(ServerContext context,
+      AccumuloConfiguration acuTableConf, TableId tableId) {
+    return new TabletIteratorEnvironment(context, IteratorScope.majc,
+        !jobHolder.getJob().isPropagateDeletes(), acuTableConf, tableId,
+        CompactionKind.valueOf(jobHolder.getJob().getKind().name()));
+  }
+
+  @Override
+  public SortedKeyValueIterator<Key,Value> getMinCIterator() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public CompactionReason getReason() {
+    switch (jobHolder.getJob().getKind()) {
+      case USER:
+        return CompactionReason.USER;
+      case CHOP:
+        return CompactionReason.CHOP;
+      case SELECTOR:
+      case SYSTEM:
+      default:
+        return CompactionReason.SYSTEM;
+    }
+  }
+
+}
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
new file mode 100644
index 0000000..f6ab00c
--- /dev/null
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.compactor;
+
+import java.util.Objects;
+
+import org.apache.accumulo.core.tabletserver.thrift.CompactionStats;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+
+public class CompactionJobHolder {
+
+  private TExternalCompactionJob job;
+  private Thread compactionThread;
+  private volatile Boolean cancelled = Boolean.FALSE;
+  private CompactionStats stats = null;
+
+  CompactionJobHolder() {}
+
+  public void reset() {
+    job = null;
+    compactionThread = null;
+    cancelled = Boolean.FALSE;
+    stats = null;
+  }
+
+  public TExternalCompactionJob getJob() {
+    return job;
+  }
+
+  public Thread getThread() {
+    return compactionThread;
+  }
+
+  public CompactionStats getStats() {
+    return stats;
+  }
+
+  public void setStats(CompactionStats stats) {
+    this.stats = stats;
+  }
+
+  public void cancel() {
+    cancelled = Boolean.TRUE;
+  }
+
+  public boolean isCancelled() {
+    return cancelled;
+  }
+
+  public boolean isSet() {
+    return (null != this.job);
+  }
+
+  public void set(TExternalCompactionJob job, Thread compactionThread) {
+    Objects.requireNonNull(job, "CompactionJob is null");
+    Objects.requireNonNull(compactionThread, "Compaction thread is null");
+    this.job = job;
+    this.compactionThread = compactionThread;
+  }
+
+}
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index d2e94e3..992c383 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -26,7 +26,6 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -41,26 +40,18 @@ import org.apache.accumulo.core.compaction.thrift.CompactionState;
 import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.rpc.ThriftUtil;
-import org.apache.accumulo.core.spi.compaction.CompactionKind;
-import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
 import org.apache.accumulo.core.tabletserver.thrift.CompactionStats;
 import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.HostAndPort;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
-import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.fate.util.UtilWaitThread;
@@ -80,8 +71,6 @@ import org.apache.accumulo.server.compaction.RetryableThriftCall;
 import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException;
 import org.apache.accumulo.server.compaction.RetryableThriftFunction;
 import org.apache.accumulo.server.conf.TableConfiguration;
-import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
-import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
 import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
 import org.apache.accumulo.server.rpc.TServerUtils;
@@ -107,73 +96,6 @@ public class Compactor extends AbstractServer
     }
   }
 
-  /**
-   * Object used to hold information about the current compaction
-   */
-  private static class CompactionJobHolder {
-    private TExternalCompactionJob job;
-    private Thread compactionThread;
-    private volatile Boolean cancelled = Boolean.FALSE;
-    private CompactionStats stats = null;
-
-    public CompactionJobHolder() {}
-
-    public void reset() {
-      job = null;
-      compactionThread = null;
-      cancelled = Boolean.FALSE;
-      stats = null;
-    }
-
-    public TExternalCompactionJob getJob() {
-      return job;
-    }
-
-    public Thread getThread() {
-      return compactionThread;
-    }
-
-    public CompactionStats getStats() {
-      return stats;
-    }
-
-    public void setStats(CompactionStats stats) {
-      this.stats = stats;
-    }
-
-    public void cancel() {
-      cancelled = Boolean.TRUE;
-    }
-
-    public boolean isCancelled() {
-      return cancelled;
-    }
-
-    public boolean isSet() {
-      return (null != this.job);
-    }
-
-    public void set(TExternalCompactionJob job, Thread compactionThread) {
-      Objects.requireNonNull(job, "CompactionJob is null");
-      Objects.requireNonNull(compactionThread, "Compaction thread is null");
-      this.job = job;
-      this.compactionThread = compactionThread;
-    }
-
-  }
-
-  /**
-   * Utility for returning the address in the form host:port
-   *
-   * @return host and port for Compactor client connections
-   */
-  private static String getHostPortString(HostAndPort address) {
-    if (address == null) {
-      return null;
-    }
-    return address.getHost() + ":" + address.getPort();
-  }
-
   public static final String COMPACTOR_SERVICE = "COMPACTOR_SVC";
 
   private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
@@ -186,6 +108,7 @@ public class Compactor extends AbstractServer
   private final String queueName;
   private final AtomicReference<CompactionCoordinator.Client> coordinatorClient =
       new AtomicReference<>();
+
   private ZooLock compactorLock;
   private ServerAddress compactorAddress = null;
 
@@ -212,10 +135,10 @@ public class Compactor extends AbstractServer
    * @throws KeeperException
    * @throws InterruptedException
    */
-  private void announceExistence(HostAndPort clientAddress)
+  protected void announceExistence(HostAndPort clientAddress)
       throws KeeperException, InterruptedException {
 
-    String hostPort = getHostPortString(clientAddress);
+    String hostPort = ExternalCompactionUtil.getHostPortString(clientAddress);
 
     ZooReaderWriter zoo = getContext().getZooReaderWriter();
     String compactorQueuePath =
@@ -276,7 +199,7 @@ public class Compactor extends AbstractServer
    * @return address of this compactor client service
    * @throws UnknownHostException
    */
-  private ServerAddress startCompactorClientService() throws UnknownHostException {
+  protected ServerAddress startCompactorClientService() throws UnknownHostException {
     Compactor rpcProxy = TraceUtil.wrapService(this);
     final org.apache.accumulo.core.compaction.thrift.Compactor.Processor<Compactor> processor;
     if (getContext().getThriftServerType() == ThriftServerType.SASL) {
@@ -298,11 +221,12 @@ public class Compactor extends AbstractServer
   }
 
   /**
-   * Called by a thrift client to cancel the currently running compaction if it matches the supplied
-   * job
+   * Called by a CompactionCoordinator to cancel the currently running compaction
    *
-   * @param compactionJob
-   *          job
+   * @param externalCompactionId
+   *          compaction id
+   * @throws UnknownCompactionIdException
+   *           if the externalCompactionId does not match the currently executing compaction
    */
   @Override
   public void cancel(String externalCompactionId) throws TException {
@@ -319,18 +243,17 @@ public class Compactor extends AbstractServer
   }
 
   /**
-   * Send an update to the coordinator for this job
+   * Send an update to the CompactionCoordinator for this job
    *
-   * @param coordinatorClient
-   *          address of the CompactionCoordinator
    * @param job
    *          compactionJob
    * @param state
    *          updated state
    * @param message
    *          updated message
+   * @throws RetriesExceededException
    */
-  private void updateCompactionState(TExternalCompactionJob job, CompactionState state,
+  protected void updateCompactionState(TExternalCompactionJob job, CompactionState state,
       String message) throws RetriesExceededException {
     RetryableThriftCall<Void> thriftCall = new RetryableThriftCall<>(1000,
         RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<Void>() {
@@ -351,7 +274,7 @@ public class Compactor extends AbstractServer
   }
 
   /**
-   * Update the coordinator with the stats from the job
+   * Update the CompactionCoordinator with the stats from the completed job
    *
    * @param job
    *          current compaction job
@@ -359,7 +282,7 @@ public class Compactor extends AbstractServer
    *          compaction stats
    * @throws RetriesExceededException
    */
-  private void updateCompactionCompleted(TExternalCompactionJob job, CompactionStats stats)
+  protected void updateCompactionCompleted(TExternalCompactionJob job, CompactionStats stats)
       throws RetriesExceededException {
     RetryableThriftCall<Void> thriftCall = new RetryableThriftCall<>(1000,
         RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<Void>() {
@@ -388,7 +311,7 @@ public class Compactor extends AbstractServer
    * @return CompactionJob
    * @throws RetriesExceededException
    */
-  private TExternalCompactionJob getNextJob() throws RetriesExceededException {
+  protected TExternalCompactionJob getNextJob() throws RetriesExceededException {
     RetryableThriftCall<TExternalCompactionJob> nextJobThriftCall =
         new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 0,
             new RetryableThriftFunction<TExternalCompactionJob>() {
@@ -397,7 +320,7 @@ public class Compactor extends AbstractServer
                 try {
                   coordinatorClient.compareAndSet(null, getCoordinatorClient());
                   return coordinatorClient.get().getCompactionJob(queueName,
-                      getHostPortString(compactorAddress.getAddress()));
+                      ExternalCompactionUtil.getHostPortString(compactorAddress.getAddress()));
                 } catch (TException e) {
                   ThriftUtil.returnClient(coordinatorClient.getAndSet(null));
                   throw e;
@@ -414,7 +337,7 @@ public class Compactor extends AbstractServer
    * @throws TTransportException
    *           when unable to get client
    */
-  private CompactionCoordinator.Client getCoordinatorClient() throws TTransportException {
+  protected CompactionCoordinator.Client getCoordinatorClient() throws TTransportException {
     HostAndPort coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(getContext());
     if (null == coordinatorHost) {
       throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
@@ -425,60 +348,72 @@ public class Compactor extends AbstractServer
   }
 
   /**
-   * Create and return a new CompactionEnv for the current compaction job
-   *
+   * Create compaction runnable
+   * 
    * @param job
-   *          current compaction job
-   * @return new env
+   *          compaction job
+   * @param totalInputEntries
+   *          object to capture total entries
+   * @param started
+   *          started latch
+   * @param stopped
+   *          stopped latch
+   * @param err
+   *          reference to error
+   * @return Runnable compaction job
    */
-  private CompactionEnv getCompactionEnvironment(TExternalCompactionJob job) {
-    return new CompactionEnv() {
-      @Override
-      public boolean isCompactionEnabled() {
-        return !jobHolder.isCancelled();
-      }
+  protected Runnable createCompactionJob(final TExternalCompactionJob job,
+      final LongAdder totalInputEntries, final CountDownLatch started, final CountDownLatch stopped,
+      final AtomicReference<Throwable> err) {
 
+    return new Runnable() {
       @Override
-      public IteratorScope getIteratorScope() {
-        return IteratorScope.majc;
-      }
-
-      @Override
-      public RateLimiter getReadLimiter() {
-        return SharedRateLimiterFactory.getInstance(getContext().getConfiguration())
-            .create("read_rate_limiter", () -> job.getReadRate());
-      }
-
-      @Override
-      public RateLimiter getWriteLimiter() {
-        return SharedRateLimiterFactory.getInstance(getContext().getConfiguration())
-            .create("write_rate_limiter", () -> job.getWriteRate());
-      }
-
-      @Override
-      public SystemIteratorEnvironment createIteratorEnv(ServerContext context,
-          AccumuloConfiguration acuTableConf, TableId tableId) {
-        return new TabletIteratorEnvironment(getContext(), IteratorScope.majc,
-            !job.isPropagateDeletes(), acuTableConf, tableId,
-            CompactionKind.valueOf(job.getKind().name()));
-      }
-
-      @Override
-      public SortedKeyValueIterator<Key,Value> getMinCIterator() {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public CompactionReason getReason() {
-        switch (job.getKind()) {
-          case USER:
-            return CompactionReason.USER;
-          case CHOP:
-            return CompactionReason.CHOP;
-          case SELECTOR:
-          case SYSTEM:
-          default:
-            return CompactionReason.SYSTEM;
+      public void run() {
+        try {
+          LOG.info("Starting up compaction runnable for job: {}", job);
+          updateCompactionState(job, CompactionState.STARTED, "Compaction started");
+
+          final TableId tableId = TableId.of(new String(job.getExtent().getTable(), UTF_8));
+          final TableConfiguration tConfig = getContext().getTableConfiguration(tableId);
+          final TabletFile outputFile = new TabletFile(new Path(job.getOutputFile()));
+          final CompactionEnv cenv = new CompactionEnvironment(getContext(), jobHolder);
+
+          final Map<StoredTabletFile,DataFileValue> files = new TreeMap<>();
+          job.getFiles().forEach(f -> {
+            files.put(new StoredTabletFile(f.getMetadataFileEntry()),
+                new DataFileValue(f.getSize(), f.getEntries(), f.getTimestamp()));
+            totalInputEntries.add(f.getEntries());
+          });
+
+          final List<IteratorSetting> iters = new ArrayList<>();
+          job.getIteratorSettings().getIterators()
+              .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis)));
+
+          org.apache.accumulo.server.compaction.Compactor compactor =
+              new org.apache.accumulo.server.compaction.Compactor(getContext(),
+                  KeyExtent.fromThrift(job.getExtent()), files, outputFile,
+                  job.isPropagateDeletes(), cenv, iters, tConfig);
+
+          LOG.info("Starting compactor");
+          started.countDown();
+
+          org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call();
+          CompactionStats cs = new CompactionStats();
+          cs.setEntriesRead(stat.getEntriesRead());
+          cs.setEntriesWritten(stat.getEntriesWritten());
+          cs.setFileSize(stat.getFileSize());
+          jobHolder.setStats(cs);
+          LOG.info("Compaction completed successfully");
+          // Update state when completed
+          updateCompactionState(job, CompactionState.SUCCEEDED,
+              "Compaction completed successfully");
+        } catch (Exception e) {
+          LOG.error("Compaction failed", e);
+          err.set(e);
+          throw new RuntimeException("Compaction failed", e);
+        } finally {
+          stopped.countDown();
+          // TODO: Any cleanup
         }
       }
     };
@@ -518,67 +453,13 @@ public class Compactor extends AbstractServer
         }
         LOG.info("Received next compaction job: {}", job);
 
-        final LongAdder totalInputSize = new LongAdder();
         final LongAdder totalInputEntries = new LongAdder();
         final CountDownLatch started = new CountDownLatch(1);
         final CountDownLatch stopped = new CountDownLatch(1);
 
-        Thread compactionThread = Threads.createThread(
-            "Compaction job for tablet " + job.getExtent().toString(), new Runnable() {
-
-              @Override
-              public void run() {
-                try {
-                  LOG.info("Setting up to run compactor");
-                  updateCompactionState(job, CompactionState.STARTED, "Compaction started");
-
-                  final TableId tableId = TableId.of(new String(job.getExtent().getTable(), UTF_8));
-                  final TableConfiguration tConfig = getContext().getTableConfiguration(tableId);
-
-                  final Map<StoredTabletFile,DataFileValue> files = new TreeMap<>();
-                  job.getFiles().forEach(f -> {
-                    files.put(new StoredTabletFile(f.getMetadataFileEntry()),
-                        new DataFileValue(f.getSize(), f.getEntries(), f.getTimestamp()));
-                    totalInputSize.add(f.getSize());
-                    totalInputEntries.add(f.getEntries());
-                  });
-
-                  final TabletFile outputFile = new TabletFile(new Path(job.getOutputFile()));
-
-                  final CompactionEnv cenv = getCompactionEnvironment(job);
-
-                  final List<IteratorSetting> iters = new ArrayList<>();
-                  job.getIteratorSettings().getIterators()
-                      .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis)));
-
-                  org.apache.accumulo.server.compaction.Compactor compactor =
-                      new org.apache.accumulo.server.compaction.Compactor(getContext(),
-                          KeyExtent.fromThrift(job.getExtent()), files, outputFile,
-                          job.isPropagateDeletes(), cenv, iters, tConfig);
-
-                  LOG.info("Starting compactor");
-                  started.countDown();
-
-                  org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call();
-                  CompactionStats cs = new CompactionStats();
-                  cs.setEntriesRead(stat.getEntriesRead());
-                  cs.setEntriesWritten(stat.getEntriesWritten());
-                  cs.setFileSize(stat.getFileSize());
-                  jobHolder.setStats(cs);
-                  LOG.info("Compaction completed successfully");
-                  // Update state when completed
-                  updateCompactionState(job, CompactionState.SUCCEEDED,
-                      "Compaction completed successfully");
-                } catch (Exception e) {
-                  LOG.error("Compaction failed", e);
-                  err.set(e);
-                  throw new RuntimeException("Compaction failed", e);
-                } finally {
-                  stopped.countDown();
-                  // TODO: Any cleanup
-                }
-              }
-            });
+        Thread compactionThread =
+            Threads.createThread("Compaction job for tablet " + job.getExtent().toString(),
+                this.createCompactionJob(job, totalInputEntries, started, stopped, err));
 
         synchronized (jobHolder) {
           jobHolder.set(job, compactionThread);