You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/04/12 18:18:32 UTC

[accumulo] branch 1451-external-compactions-feature updated: Changes to get QA build working

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
     new 75d7f67  Changes to get QA build working
75d7f67 is described below

commit 75d7f67fef54771061b0fff45fdea507ebbf4ca9
Author: Dave Marion <dl...@apache.org>
AuthorDate: Mon Apr 12 18:18:10 2021 +0000

    Changes to get QA build working
---
 server/compaction-coordinator/pom.xml              |  12 ++
 .../coordinator/CompactionCoordinator.java         |  63 +++++++-
 .../accumulo/coordinator/CompactionFinalizer.java  |   4 +-
 .../coordinator/CompactionCoordinatorTest.java     | 170 ++++++++++-----------
 server/compactor/pom.xml                           |  29 +++-
 .../org/apache/accumulo/compactor/Compactor.java   |  27 +++-
 .../apache/accumulo/compactor/CompactorTest.java   |  40 +++--
 .../compactions/ExternalCompactionExecutor.java    |  20 +++
 8 files changed, 238 insertions(+), 127 deletions(-)

diff --git a/server/compaction-coordinator/pom.xml b/server/compaction-coordinator/pom.xml
index 0a3d7dd9..66f98b6 100644
--- a/server/compaction-coordinator/pom.xml
+++ b/server/compaction-coordinator/pom.xml
@@ -37,9 +37,17 @@
     </dependency>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-server-base</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-start</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-1.2-api</artifactId>
     </dependency>
@@ -48,6 +56,10 @@
       <artifactId>libthrift</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 8f1705a..3d6f749 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -130,6 +130,16 @@ public class CompactionCoordinator extends AbstractServer
     printStartupMsg();
   }
 
+  protected CompactionCoordinator(ServerOpts opts, String[] args, AccumuloConfiguration conf) {
+    super("compaction-coordinator", opts, args);
+    aconf = conf;
+    compactionFinalizer = createCompactionFinalizer();
+    tserverSet = createLiveTServerSet();
+    setupSecurity();
+    startGCLogger();
+    printStartupMsg();
+  }
+
   protected CompactionFinalizer createCompactionFinalizer() {
     return new CompactionFinalizer(getContext());
   }
@@ -160,7 +170,9 @@ public class CompactionCoordinator extends AbstractServer
    * @param clientAddress
    *          address of this Compactor
    * @throws KeeperException
+   *           zookeeper error
    * @throws InterruptedException
+   *           thread interrupted
    */
   protected void getCoordinatorLock(HostAndPort clientAddress)
       throws KeeperException, InterruptedException {
@@ -195,6 +207,7 @@ public class CompactionCoordinator extends AbstractServer
    *
    * @return address of this CompactionCoordinator client service
    * @throws UnknownHostException
+   *           host unknown
    */
   protected ServerAddress startCoordinatorClientService() throws UnknownHostException {
     Iface rpcProxy = TraceUtil.wrapService(this);
@@ -343,10 +356,10 @@ public class CompactionCoordinator extends AbstractServer
           }
         });
       }
+      tservers.clear();
     } else {
       LOG.info("No running tablet servers found, continuing startup");
     }
-    tservers.clear();
 
     tserverSet.startListeningForTabletServerChanges();
     new DeadCompactionDetector(getContext(), compactionFinalizer).start();
@@ -556,6 +569,7 @@ public class CompactionCoordinator extends AbstractServer
    *          tserver instance
    * @return thrift client
    * @throws TTransportException
+   *           thrift error
    */
   protected TabletClientService.Client getTabletServerConnection(TServerInstance tserver)
       throws TTransportException {
@@ -572,6 +586,7 @@ public class CompactionCoordinator extends AbstractServer
    *          compactor address
    * @return thrift client
    * @throws TTransportException
+   *           thrift error
    */
   protected Compactor.Client getCompactorConnection(HostAndPort compactorAddress)
       throws TTransportException {
@@ -582,6 +597,15 @@ public class CompactionCoordinator extends AbstractServer
 
   /**
    * Called by the TabletServer to cancel the running compaction.
+   *
+   * @param tinfo
+   *          trace info
+   * @param credentials
+   *          tcredentials object
+   * @param externalCompactionId
+   *          compaction id
+   * @throws TException
+   *           thrift error
    */
   @Override
   public void cancelCompaction(TInfo tinfo, TCredentials credentials, String externalCompactionId)
@@ -631,9 +655,15 @@ public class CompactionCoordinator extends AbstractServer
   /**
    * TServer calls getCompactionStatus to get information about the compaction
    *
+   * @param tinfo
+   *          trace info
+   * @param credentials
+   *          tcredentials object
    * @param externalCompactionId
-   *          id
+   *          compaction id
    * @return compaction stats or null if not running
+   * @throws TException
+   *           thrift error
    */
   @Override
   public List<Status> getCompactionStatus(TInfo tinfo, TCredentials credentials,
@@ -657,10 +687,20 @@ public class CompactionCoordinator extends AbstractServer
   /**
    * Compactor calls compactionCompleted passing in the CompactionStats
    *
-   * @param job
-   *          compaction job
+   * @param tinfo
+   *          trace info
+   * @param credentials
+   *          tcredentials object
+   * @param externalCompactionId
+   *          compaction id
+   * @param textent
+   *          tablet extent
    * @param stats
    *          compaction stats
+   * @throws UnknownCompactionIdException
+   *           if compaction is not running
+   * @throws TException
+   *           thrift error
    */
   @Override
   public void compactionCompleted(TInfo tinfo, TCredentials credentials,
@@ -715,9 +755,12 @@ public class CompactionCoordinator extends AbstractServer
    *
    *
    * @param externalCompactionId
+   *          compaction id
    * @return CompactionStats
    * @throws UnknownCompactionIdException
    *           if compaction is not running
+   * @throws TException
+   *           thrift error
    */
   public CompactionStats isCompactionCompleted(String externalCompactionId) throws TException {
     final var ecid = ExternalCompactionId.of(externalCompactionId);
@@ -743,14 +786,22 @@ public class CompactionCoordinator extends AbstractServer
   /**
    * Compactor calls to update the status of the assigned compaction
    *
-   * @param job
-   *          compaction job
+   * @param tinfo
+   *          trace info
+   * @param credentials
+   *          tcredentials object
+   * @param externalCompactionId
+   *          compaction id
    * @param state
    *          compaction state
    * @param message
    *          informational message
    * @param timestamp
    *          timestamp of the message
+   * @throws UnknownCompactionIdException
+   *           if compaction is not running
+   * @throws TException
+   *           thrift error
    */
   @Override
   public void updateCompactionStatus(TInfo tinfo, TCredentials credentials,
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
index 2b968ba..1b6c6b2 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
@@ -89,7 +89,9 @@ public class CompactionFinalizer {
 
     // queue RPC if queue is not full
     LOG.info("Queueing tserver notification for completed external compaction: {}", ecfs);
-    pendingNotifications.offer(ecfs);
+    if (!pendingNotifications.offer(ecfs)) {
+      LOG.info("Queue full, notification to tablet server will not occur.");
+    }
   }
 
   public void failCompactions(Map<ExternalCompactionId,KeyExtent> compactionsToFail) {
diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 3c450fc..a1522b8 100644
--- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -18,6 +18,11 @@
  */
 package org.apache.accumulo.coordinator;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -57,7 +62,6 @@ import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
 import org.easymock.EasyMock;
-import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
@@ -79,7 +83,6 @@ public class CompactionCoordinatorTest {
   public class TestCoordinator extends CompactionCoordinator {
 
     private final ServerContext ctx;
-    private final AccumuloConfiguration conf;
     private final ServerAddress client;
     private final TabletClientService.Client tabletServerClient;
 
@@ -87,8 +90,7 @@ public class CompactionCoordinatorTest {
         LiveTServerSet tservers, ServerAddress client,
         TabletClientService.Client tabletServerClient, ServerContext ctx,
         AuditedSecurityOperation security) {
-      super(new ServerOpts(), new String[] {});
-      this.conf = conf;
+      super(new ServerOpts(), new String[] {}, conf);
       this.compactionFinalizer = finalizer;
       this.tserverSet = tservers;
       this.client = client;
@@ -123,11 +125,6 @@ public class CompactionCoordinatorTest {
     protected void printStartupMsg() {}
 
     @Override
-    public AccumuloConfiguration getConfiguration() {
-      return this.conf;
-    }
-
-    @Override
     public ServerContext getContext() {
       return this.ctx;
     }
@@ -147,13 +144,6 @@ public class CompactionCoordinatorTest {
     }
 
     @Override
-    protected org.apache.accumulo.core.compaction.thrift.Compactor.Client
-        getCompactorConnection(HostAndPort compactorAddress) throws TTransportException {
-      // TODO Auto-generated method stub
-      return super.getCompactorConnection(compactorAddress);
-    }
-
-    @Override
     public void compactionCompleted(TInfo tinfo, TCredentials credentials,
         String externalCompactionId, TKeyExtent textent, CompactionStats stats) throws TException {}
 
@@ -221,13 +211,13 @@ public class CompactionCoordinatorTest {
 
     TestCoordinator coordinator =
         new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, security);
-    Assert.assertEquals(0, coordinator.getQueues().size());
-    Assert.assertEquals(0, coordinator.getIndex().size());
-    Assert.assertEquals(0, coordinator.getRunning().size());
+    assertEquals(0, coordinator.getQueues().size());
+    assertEquals(0, coordinator.getIndex().size());
+    assertEquals(0, coordinator.getRunning().size());
     coordinator.run();
-    Assert.assertEquals(0, coordinator.getQueues().size());
-    Assert.assertEquals(0, coordinator.getIndex().size());
-    Assert.assertEquals(0, coordinator.getRunning().size());
+    assertEquals(0, coordinator.getQueues().size());
+    assertEquals(0, coordinator.getIndex().size());
+    assertEquals(0, coordinator.getRunning().size());
 
     PowerMock.verifyAll();
     coordinator.getQueues().clear();
@@ -274,27 +264,27 @@ public class CompactionCoordinatorTest {
 
     TestCoordinator coordinator =
         new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, security);
-    Assert.assertEquals(0, coordinator.getQueues().size());
-    Assert.assertEquals(0, coordinator.getIndex().size());
-    Assert.assertEquals(0, coordinator.getRunning().size());
+    assertEquals(0, coordinator.getQueues().size());
+    assertEquals(0, coordinator.getIndex().size());
+    assertEquals(0, coordinator.getRunning().size());
     coordinator.run();
-    Assert.assertEquals(1, coordinator.getQueues().size());
+    assertEquals(1, coordinator.getQueues().size());
     QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), 1L);
     Map<Long,LinkedHashSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern());
-    Assert.assertNotNull(m);
-    Assert.assertEquals(1, m.size());
-    Assert.assertTrue(m.containsKey(1L));
+    assertNotNull(m);
+    assertEquals(1, m.size());
+    assertTrue(m.containsKey(1L));
     Set<TServerInstance> t = m.get(1L);
-    Assert.assertNotNull(t);
-    Assert.assertEquals(1, t.size());
+    assertNotNull(t);
+    assertEquals(1, t.size());
     TServerInstance queuedTsi = t.iterator().next();
-    Assert.assertEquals(tsi.getHostPortSession(), queuedTsi.getHostPortSession());
-    Assert.assertEquals(1, coordinator.getIndex().size());
-    Assert.assertTrue(coordinator.getIndex().containsKey(queuedTsi));
+    assertEquals(tsi.getHostPortSession(), queuedTsi.getHostPortSession());
+    assertEquals(1, coordinator.getIndex().size());
+    assertTrue(coordinator.getIndex().containsKey(queuedTsi));
     Set<QueueAndPriority> i = coordinator.getIndex().get(queuedTsi);
-    Assert.assertEquals(1, i.size());
-    Assert.assertEquals(qp, i.iterator().next());
-    Assert.assertEquals(0, coordinator.getRunning().size());
+    assertEquals(1, i.size());
+    assertEquals(qp, i.iterator().next());
+    assertEquals(0, coordinator.getRunning().size());
 
     PowerMock.verifyAll();
     coordinator.getQueues().clear();
@@ -352,27 +342,27 @@ public class CompactionCoordinatorTest {
 
     TestCoordinator coordinator =
         new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, security);
-    Assert.assertEquals(0, coordinator.getQueues().size());
-    Assert.assertEquals(0, coordinator.getIndex().size());
-    Assert.assertEquals(0, coordinator.getRunning().size());
+    assertEquals(0, coordinator.getQueues().size());
+    assertEquals(0, coordinator.getIndex().size());
+    assertEquals(0, coordinator.getRunning().size());
     coordinator.run();
-    Assert.assertEquals(1, coordinator.getQueues().size());
+    assertEquals(1, coordinator.getQueues().size());
     QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), 1L);
     Map<Long,LinkedHashSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern());
-    Assert.assertNotNull(m);
-    Assert.assertEquals(1, m.size());
-    Assert.assertTrue(m.containsKey(1L));
+    assertNotNull(m);
+    assertEquals(1, m.size());
+    assertTrue(m.containsKey(1L));
     Set<TServerInstance> t = m.get(1L);
-    Assert.assertNotNull(t);
-    Assert.assertEquals(1, t.size());
+    assertNotNull(t);
+    assertEquals(1, t.size());
     TServerInstance queuedTsi = t.iterator().next();
-    Assert.assertEquals(instance.getHostPortSession(), queuedTsi.getHostPortSession());
-    Assert.assertEquals(1, coordinator.getIndex().size());
-    Assert.assertTrue(coordinator.getIndex().containsKey(queuedTsi));
+    assertEquals(instance.getHostPortSession(), queuedTsi.getHostPortSession());
+    assertEquals(1, coordinator.getIndex().size());
+    assertTrue(coordinator.getIndex().containsKey(queuedTsi));
     Set<QueueAndPriority> i = coordinator.getIndex().get(queuedTsi);
-    Assert.assertEquals(1, i.size());
-    Assert.assertEquals(qp, i.iterator().next());
-    Assert.assertEquals(0, coordinator.getRunning().size());
+    assertEquals(1, i.size());
+    assertEquals(qp, i.iterator().next());
+    assertEquals(0, coordinator.getRunning().size());
 
     PowerMock.verifyAll();
     coordinator.getQueues().clear();
@@ -439,27 +429,27 @@ public class CompactionCoordinatorTest {
 
     TestCoordinator coordinator =
         new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, security);
-    Assert.assertEquals(0, coordinator.getQueues().size());
-    Assert.assertEquals(0, coordinator.getIndex().size());
-    Assert.assertEquals(0, coordinator.getRunning().size());
+    assertEquals(0, coordinator.getQueues().size());
+    assertEquals(0, coordinator.getIndex().size());
+    assertEquals(0, coordinator.getRunning().size());
     coordinator.run();
-    Assert.assertEquals(1, coordinator.getQueues().size());
+    assertEquals(1, coordinator.getQueues().size());
     QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), 1L);
     Map<Long,LinkedHashSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern());
-    Assert.assertNotNull(m);
-    Assert.assertEquals(1, m.size());
-    Assert.assertTrue(m.containsKey(1L));
+    assertNotNull(m);
+    assertEquals(1, m.size());
+    assertTrue(m.containsKey(1L));
     Set<TServerInstance> t = m.get(1L);
-    Assert.assertNotNull(t);
-    Assert.assertEquals(1, t.size());
+    assertNotNull(t);
+    assertEquals(1, t.size());
     TServerInstance queuedTsi = t.iterator().next();
-    Assert.assertEquals(instance.getHostPortSession(), queuedTsi.getHostPortSession());
-    Assert.assertEquals(1, coordinator.getIndex().size());
-    Assert.assertTrue(coordinator.getIndex().containsKey(queuedTsi));
+    assertEquals(instance.getHostPortSession(), queuedTsi.getHostPortSession());
+    assertEquals(1, coordinator.getIndex().size());
+    assertTrue(coordinator.getIndex().containsKey(queuedTsi));
     Set<QueueAndPriority> i = coordinator.getIndex().get(queuedTsi);
-    Assert.assertEquals(1, i.size());
-    Assert.assertEquals(qp, i.iterator().next());
-    Assert.assertEquals(1, coordinator.getRunning().size());
+    assertEquals(1, i.size());
+    assertEquals(qp, i.iterator().next());
+    assertEquals(1, coordinator.getRunning().size());
 
     PowerMock.verifyAll();
     coordinator.getQueues().clear();
@@ -518,44 +508,44 @@ public class CompactionCoordinatorTest {
 
     TestCoordinator coordinator =
         new TestCoordinator(conf, finalizer, tservers, client, tsc, ctx, security);
-    Assert.assertEquals(0, coordinator.getQueues().size());
-    Assert.assertEquals(0, coordinator.getIndex().size());
-    Assert.assertEquals(0, coordinator.getRunning().size());
+    assertEquals(0, coordinator.getQueues().size());
+    assertEquals(0, coordinator.getIndex().size());
+    assertEquals(0, coordinator.getRunning().size());
     // Use coordinator.run() to populate the internal data structures. This is tested in a different
     // test.
     coordinator.run();
 
-    Assert.assertEquals(1, coordinator.getQueues().size());
+    assertEquals(1, coordinator.getQueues().size());
     QueueAndPriority qp = QueueAndPriority.get("R2DQ".intern(), 1L);
     Map<Long,LinkedHashSet<TServerInstance>> m = coordinator.getQueues().get("R2DQ".intern());
-    Assert.assertNotNull(m);
-    Assert.assertEquals(1, m.size());
-    Assert.assertTrue(m.containsKey(1L));
+    assertNotNull(m);
+    assertEquals(1, m.size());
+    assertTrue(m.containsKey(1L));
     Set<TServerInstance> t = m.get(1L);
-    Assert.assertNotNull(t);
-    Assert.assertEquals(1, t.size());
+    assertNotNull(t);
+    assertEquals(1, t.size());
     TServerInstance queuedTsi = t.iterator().next();
-    Assert.assertEquals(tsi.getHostPortSession(), queuedTsi.getHostPortSession());
-    Assert.assertEquals(1, coordinator.getIndex().size());
-    Assert.assertTrue(coordinator.getIndex().containsKey(queuedTsi));
+    assertEquals(tsi.getHostPortSession(), queuedTsi.getHostPortSession());
+    assertEquals(1, coordinator.getIndex().size());
+    assertTrue(coordinator.getIndex().containsKey(queuedTsi));
     Set<QueueAndPriority> i = coordinator.getIndex().get(queuedTsi);
-    Assert.assertEquals(1, i.size());
-    Assert.assertEquals(qp, i.iterator().next());
-    Assert.assertEquals(0, coordinator.getRunning().size());
+    assertEquals(1, i.size());
+    assertEquals(qp, i.iterator().next());
+    assertEquals(0, coordinator.getRunning().size());
 
     // Get the next job
     TExternalCompactionJob createdJob =
         coordinator.getCompactionJob(trace, creds, "R2DQ", "localhost:10241", eci.toString());
-    Assert.assertEquals(eci.toString(), createdJob.getExternalCompactionId());
+    assertEquals(eci.toString(), createdJob.getExternalCompactionId());
 
-    Assert.assertEquals(1, coordinator.getQueues().size());
-    Assert.assertEquals(0, coordinator.getIndex().size());
-    Assert.assertEquals(1, coordinator.getRunning().size());
+    assertEquals(1, coordinator.getQueues().size());
+    assertEquals(0, coordinator.getIndex().size());
+    assertEquals(1, coordinator.getRunning().size());
     Entry<ExternalCompactionId,RunningCompaction> entry =
         coordinator.getRunning().entrySet().iterator().next();
-    Assert.assertEquals(eci.toString(), entry.getKey().toString());
-    Assert.assertEquals("localhost:10241", entry.getValue().getCompactorAddress());
-    Assert.assertEquals(eci.toString(), entry.getValue().getJob().getExternalCompactionId());
+    assertEquals(eci.toString(), entry.getKey().toString());
+    assertEquals("localhost:10241", entry.getValue().getCompactorAddress());
+    assertEquals(eci.toString(), entry.getValue().getJob().getExternalCompactionId());
 
     PowerMock.verifyAll();
     coordinator.getQueues().clear();
@@ -595,7 +585,7 @@ public class CompactionCoordinatorTest {
     coordinator.getRunning().clear();
     TExternalCompactionJob job = coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, "R2DQ",
         "localhost:10240", UUID.randomUUID().toString());
-    Assert.assertNull(job.getExternalCompactionId());
+    assertNull(job.getExternalCompactionId());
 
     PowerMock.verifyAll();
     coordinator.close();
diff --git a/server/compactor/pom.xml b/server/compactor/pom.xml
index e193175..0649328 100644
--- a/server/compactor/pom.xml
+++ b/server/compactor/pom.xml
@@ -31,25 +31,41 @@
   <name>Apache Accumulo Compactor</name>
   <dependencies>
     <dependency>
+      <groupId>com.beust</groupId>
+      <artifactId>jcommander</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.auto.service</groupId>
       <artifactId>auto-service</artifactId>
       <optional>true</optional>
     </dependency>
     <dependency>
-      <groupId>commons-logging</groupId>
-      <artifactId>commons-logging</artifactId>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-core</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-server-base</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-start</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client-api</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-1.2-api</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-api</artifactId>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
     </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
@@ -66,6 +82,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.powermock</groupId>
       <artifactId>powermock-api-easymock</artifactId>
       <scope>test</scope>
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 9b61811..ade2c09 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -136,6 +136,15 @@ public class Compactor extends AbstractServer
     printStartupMsg();
   }
 
+  protected Compactor(CompactorServerOpts opts, String[] args, AccumuloConfiguration conf) {
+    super("compactor", opts, args);
+    queueName = opts.getQueueName();
+    aconf = conf;
+    setupSecurity();
+    startGCLogger();
+    printStartupMsg();
+  }
+
   protected void setupSecurity() {
     getContext().setupCrypto();
     security = AuditedSecurityOperation.getInstance(getContext());
@@ -159,7 +168,9 @@ public class Compactor extends AbstractServer
    *          address of this Compactor
    *
    * @throws KeeperException
+   *           zookeeper error
    * @throws InterruptedException
+   *           thread interrupted
    */
   protected void announceExistence(HostAndPort clientAddress)
       throws KeeperException, InterruptedException {
@@ -227,6 +238,7 @@ public class Compactor extends AbstractServer
    *
    * @return address of this compactor client service
    * @throws UnknownHostException
+   *           host unknown
    */
   protected ServerAddress startCompactorClientService() throws UnknownHostException {
     Iface rpcProxy = TraceUtil.wrapService(this);
@@ -273,7 +285,11 @@ public class Compactor extends AbstractServer
    * Cancel the compaction with this id.
    *
    * @param externalCompactionId
+   *          compaction id
+   * @throws UnknownCompactionIdException
+   *           if the externalCompactionId does not match the currently executing compaction
    * @throws TException
+   *           thrift error
    */
   private void cancel(String externalCompactionId) throws TException {
     synchronized (JOB_HOLDER) {
@@ -299,6 +315,7 @@ public class Compactor extends AbstractServer
    * @param message
    *          updated message
    * @throws RetriesExceededException
+   *           thrown when retries have been exceeded
    */
   protected void updateCompactionState(TExternalCompactionJob job, CompactionState state,
       String message) throws RetriesExceededException {
@@ -330,6 +347,7 @@ public class Compactor extends AbstractServer
    * @param job
    *          current compaction job
    * @throws RetriesExceededException
+   *           thrown when retries have been exceeded
    */
   protected void updateCompactionFailed(TExternalCompactionJob job)
       throws RetriesExceededException {
@@ -359,6 +377,7 @@ public class Compactor extends AbstractServer
    * @param stats
    *          compaction stats
    * @throws RetriesExceededException
+   *           thrown when retries have been exceeded
    */
   protected void updateCompactionCompleted(TExternalCompactionJob job, CompactionStats stats)
       throws RetriesExceededException {
@@ -383,12 +402,11 @@ public class Compactor extends AbstractServer
   /**
    * Get the next job to run
    *
-   * @param coordinatorClient
-   *          address of the CompactionCoordinator
-   * @param compactorAddress
-   *          address of this Compactor
+   * @param uuid
+   *          uuid supplier
    * @return CompactionJob
    * @throws RetriesExceededException
+   *           thrown when retries have been exceeded
    */
   protected TExternalCompactionJob getNextJob(Supplier<UUID> uuid) throws RetriesExceededException {
     RetryableThriftCall<TExternalCompactionJob> nextJobThriftCall =
@@ -513,6 +531,7 @@ public class Compactor extends AbstractServer
    * Returns the number of seconds to wait in between progress checks based on input file sizes
    *
    * @param numBytes
+   *          number of bytes in input file
    * @return number of seconds to wait between progress checks
    */
   protected long calculateProgressCheckTime(long numBytes) {
diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index 1ed79fb..73825d9 100644
--- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -18,6 +18,10 @@
  */
 package org.apache.accumulo.compactor;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import java.net.UnknownHostException;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -45,7 +49,6 @@ import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.zookeeper.KeeperException;
 import org.easymock.EasyMock;
-import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
@@ -157,7 +160,6 @@ public class CompactorTest {
     private final Supplier<UUID> uuid;
     private final ServerAddress address;
     private final TExternalCompactionJob job;
-    private final AccumuloConfiguration conf;
     private final ServerContext ctx;
     private final ExternalCompactionId eci;
     private volatile boolean completedCalled = false;
@@ -166,11 +168,10 @@ public class CompactorTest {
 
     SuccessfulCompactor(Supplier<UUID> uuid, ServerAddress address, TExternalCompactionJob job,
         AccumuloConfiguration conf, ServerContext ctx, ExternalCompactionId eci) {
-      super(new CompactorServerOpts(), new String[] {"-q", "testQ"});
+      super(new CompactorServerOpts(), new String[] {"-q", "testQ"}, conf);
       this.uuid = uuid;
       this.address = address;
       this.job = job;
-      this.conf = conf;
       this.ctx = ctx;
       this.eci = eci;
     }
@@ -190,11 +191,6 @@ public class CompactorTest {
     }
 
     @Override
-    public AccumuloConfiguration getConfiguration() {
-      return this.conf;
-    }
-
-    @Override
     protected void announceExistence(HostAndPort clientAddress)
         throws KeeperException, InterruptedException {}
 
@@ -291,11 +287,11 @@ public class CompactorTest {
   public void testCheckTime() throws Exception {
     // Instantiates class without calling constructor
     Compactor c = Whitebox.newInstance(Compactor.class);
-    Assert.assertEquals(1, c.calculateProgressCheckTime(1024));
-    Assert.assertEquals(1, c.calculateProgressCheckTime(1048576));
-    Assert.assertEquals(1, c.calculateProgressCheckTime(10485760));
-    Assert.assertEquals(10, c.calculateProgressCheckTime(104857600));
-    Assert.assertEquals(102, c.calculateProgressCheckTime(1024 * 1024 * 1024));
+    assertEquals(1, c.calculateProgressCheckTime(1024));
+    assertEquals(1, c.calculateProgressCheckTime(1048576));
+    assertEquals(1, c.calculateProgressCheckTime(10485760));
+    assertEquals(10, c.calculateProgressCheckTime(104857600));
+    assertEquals(102, c.calculateProgressCheckTime(1024 * 1024 * 1024));
   }
 
   @Test
@@ -341,8 +337,8 @@ public class CompactorTest {
     PowerMock.verifyAll();
     c.close();
 
-    Assert.assertTrue(c.isCompletedCalled());
-    Assert.assertFalse(c.isFailedCalled());
+    assertTrue(c.isCompletedCalled());
+    assertFalse(c.isFailedCalled());
   }
 
   @Test
@@ -388,9 +384,9 @@ public class CompactorTest {
     PowerMock.verifyAll();
     c.close();
 
-    Assert.assertFalse(c.isCompletedCalled());
-    Assert.assertTrue(c.isFailedCalled());
-    Assert.assertEquals(CompactionState.FAILED, c.getLatestState());
+    assertFalse(c.isCompletedCalled());
+    assertTrue(c.isFailedCalled());
+    assertEquals(CompactionState.FAILED, c.getLatestState());
   }
 
   @Test
@@ -436,9 +432,9 @@ public class CompactorTest {
     PowerMock.verifyAll();
     c.close();
 
-    Assert.assertFalse(c.isCompletedCalled());
-    Assert.assertTrue(c.isFailedCalled());
-    Assert.assertEquals(CompactionState.CANCELLED, c.getLatestState());
+    assertFalse(c.isCompletedCalled());
+    assertTrue(c.isFailedCalled());
+    assertEquals(CompactionState.CANCELLED, c.getLatestState());
   }
 
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
index 7785b19..48f8d35 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
@@ -79,6 +79,26 @@ public class ExternalCompactionExecutor implements CompactionExecutor {
       return Long.compare(o.getJob().getPriority(), getJob().getPriority());
     }
 
+    @Override
+    public boolean equals(Object obj) {
+      if (null == obj) {
+        return false;
+      }
+      if (obj == this) {
+        return true;
+      }
+      if (obj instanceof ExternalJob) {
+        ExternalJob other = (ExternalJob) obj;
+        return (this.compareTo(other) == 0);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return Long.hashCode(this.getJob().getPriority());
+    }
+
   }
 
   private PriorityBlockingQueue<ExternalJob> queue;