You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/03/03 16:34:13 UTC

[accumulo] branch 1451-external-compactions-feature updated: More work on compactor and coordinator

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
     new 6c37802  More work on compactor and coordinator
6c37802 is described below

commit 6c378021b12906ccb33675d80d4ad8686df9d06b
Author: Dave Marion <dl...@apache.org>
AuthorDate: Wed Mar 3 16:33:45 2021 +0000

    More work on compactor and coordinator
---
 .../compaction/thrift/CompactionCoordinator.java   |  68 +-
 .../core/tabletserver/thrift/CompactionJob.java    | 258 +++++++-
 .../core/tabletserver/thrift/CompactionKind.java   |  66 ++
 .../core/tabletserver/thrift/CompactionStats.java  | 582 +++++++++++++++++
 .../core/tabletserver/thrift/InputFile.java        | 687 +++++++++++++++++++++
 core/src/main/thrift/compaction-coordinator.thrift |  33 +-
 core/src/main/thrift/data.thrift                   |   6 -
 core/src/main/thrift/tabletserver.thrift           |  24 +-
 .../server/compaction/ExternalCompactionUtil.java} |  17 +-
 .../server/compaction}/RetryableThriftCall.java    |   3 +-
 .../compaction}/RetryableThriftFunction.java       |   2 +-
 .../coordinator/CompactionCoordinator.java         |  82 ++-
 .../org/apache/accumulo/compactor/Compactor.java   | 271 ++++----
 13 files changed, 1846 insertions(+), 253 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinator.java b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinator.java
index ffd00f9..9397c22 100644
--- a/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinator.java
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinator.java
@@ -35,7 +35,7 @@ public class CompactionCoordinator {
 
     public org.apache.accumulo.core.tabletserver.thrift.CompactionJob getCompactionJob(java.lang.String queueName, java.lang.String compactor) throws org.apache.thrift.TException;
 
-    public void compactionCompleted(org.apache.accumulo.core.tabletserver.thrift.CompactionJob job, org.apache.accumulo.core.dataImpl.thrift.CompactionStats stats) throws org.apache.thrift.TException;
+    public void compactionCompleted(org.apache.accumulo.core.tabletserver.thrift.CompactionJob job, org.apache.accumulo.core.tabletserver.thrift.CompactionStats stats) throws org.apache.thrift.TException;
 
     public void updateCompactionStatus(org.apache.accumulo.core.tabletserver.thrift.CompactionJob compaction, CompactionState state, java.lang.String message, long timestamp) throws org.apache.thrift.TException;
 
@@ -49,7 +49,7 @@ public class CompactionCoordinator {
 
     public void getCompactionJob(java.lang.String queueName, java.lang.String compactor, org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.tabletserver.thrift.CompactionJob> resultHandler) throws org.apache.thrift.TException;
 
-    public void compactionCompleted(org.apache.accumulo.core.tabletserver.thrift.CompactionJob job, org.apache.accumulo.core.dataImpl.thrift.CompactionStats stats, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException;
+    public void compactionCompleted(org.apache.accumulo.core.tabletserver.thrift.CompactionJob job, org.apache.accumulo.core.tabletserver.thrift.CompactionStats stats, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException;
 
     public void updateCompactionStatus(org.apache.accumulo.core.tabletserver.thrift.CompactionJob compaction, CompactionState state, java.lang.String message, long timestamp, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException;
 
@@ -146,13 +146,13 @@ public class CompactionCoordinator {
       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getCompactionJob failed: unknown result");
     }
 
-    public void compactionCompleted(org.apache.accumulo.core.tabletserver.thrift.CompactionJob job, org.apache.accumulo.core.dataImpl.thrift.CompactionStats stats) throws org.apache.thrift.TException
+    public void compactionCompleted(org.apache.accumulo.core.tabletserver.thrift.CompactionJob job, org.apache.accumulo.core.tabletserver.thrift.CompactionStats stats) throws org.apache.thrift.TException
     {
       send_compactionCompleted(job, stats);
       recv_compactionCompleted();
     }
 
-    public void send_compactionCompleted(org.apache.accumulo.core.tabletserver.thrift.CompactionJob job, org.apache.accumulo.core.dataImpl.thrift.CompactionStats stats) throws org.apache.thrift.TException
+    public void send_compactionCompleted(org.apache.accumulo.core.tabletserver.thrift.CompactionJob job, org.apache.accumulo.core.tabletserver.thrift.CompactionStats stats) throws org.apache.thrift.TException
     {
       compactionCompleted_args args = new compactionCompleted_args();
       args.setJob(job);
@@ -319,7 +319,7 @@ public class CompactionCoordinator {
       }
     }
 
-    public void compactionCompleted(org.apache.accumulo.core.tabletserver.thrift.CompactionJob job, org.apache.accumulo.core.dataImpl.thrift.CompactionStats stats, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException {
+    public void compactionCompleted(org.apache.accumulo.core.tabletserver.thrift.CompactionJob job, org.apache.accumulo.core.tabletserver.thrift.CompactionStats stats, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException {
       checkReady();
       compactionCompleted_call method_call = new compactionCompleted_call(job, stats, resultHandler, this, ___protocolFactory, ___transport);
       this.___currentMethod = method_call;
@@ -328,8 +328,8 @@ public class CompactionCoordinator {
 
     public static class compactionCompleted_call extends org.apache.thrift.async.TAsyncMethodCall<Void> {
       private org.apache.accumulo.core.tabletserver.thrift.CompactionJob job;
-      private org.apache.accumulo.core.dataImpl.thrift.CompactionStats stats;
-      public compactionCompleted_call(org.apache.accumulo.core.tabletserver.thrift.CompactionJob job, org.apache.accumulo.core.dataImpl.thrift.CompactionStats stats, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+      private org.apache.accumulo.core.tabletserver.thrift.CompactionStats stats;
+      public compactionCompleted_call(org.apache.accumulo.core.tabletserver.thrift.CompactionJob job, org.apache.accumulo.core.tabletserver.thrift.CompactionStats stats, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
         super(client, protocolFactory, transport, resultHandler, false);
         this.job = job;
         this.stats = stats;
@@ -2594,14 +2594,14 @@ public class CompactionCoordinator {
             case 0: // SUCCESS
               if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
                 {
-                  org.apache.thrift.protocol.TList _list8 = iprot.readListBegin();
-                  struct.success = new java.util.ArrayList<Status>(_list8.size);
-                  @org.apache.thrift.annotation.Nullable Status _elem9;
-                  for (int _i10 = 0; _i10 < _list8.size; ++_i10)
+                  org.apache.thrift.protocol.TList _list0 = iprot.readListBegin();
+                  struct.success = new java.util.ArrayList<Status>(_list0.size);
+                  @org.apache.thrift.annotation.Nullable Status _elem1;
+                  for (int _i2 = 0; _i2 < _list0.size; ++_i2)
                   {
-                    _elem9 = new Status();
-                    _elem9.read(iprot);
-                    struct.success.add(_elem9);
+                    _elem1 = new Status();
+                    _elem1.read(iprot);
+                    struct.success.add(_elem1);
                   }
                   iprot.readListEnd();
                 }
@@ -2629,9 +2629,9 @@ public class CompactionCoordinator {
           oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
           {
             oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.success.size()));
-            for (Status _iter11 : struct.success)
+            for (Status _iter3 : struct.success)
             {
-              _iter11.write(oprot);
+              _iter3.write(oprot);
             }
             oprot.writeListEnd();
           }
@@ -2662,9 +2662,9 @@ public class CompactionCoordinator {
         if (struct.isSetSuccess()) {
           {
             oprot.writeI32(struct.success.size());
-            for (Status _iter12 : struct.success)
+            for (Status _iter4 : struct.success)
             {
-              _iter12.write(oprot);
+              _iter4.write(oprot);
             }
           }
         }
@@ -2676,14 +2676,14 @@ public class CompactionCoordinator {
         java.util.BitSet incoming = iprot.readBitSet(1);
         if (incoming.get(0)) {
           {
-            org.apache.thrift.protocol.TList _list13 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
-            struct.success = new java.util.ArrayList<Status>(_list13.size);
-            @org.apache.thrift.annotation.Nullable Status _elem14;
-            for (int _i15 = 0; _i15 < _list13.size; ++_i15)
+            org.apache.thrift.protocol.TList _list5 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+            struct.success = new java.util.ArrayList<Status>(_list5.size);
+            @org.apache.thrift.annotation.Nullable Status _elem6;
+            for (int _i7 = 0; _i7 < _list5.size; ++_i7)
             {
-              _elem14 = new Status();
-              _elem14.read(iprot);
-              struct.success.add(_elem14);
+              _elem6 = new Status();
+              _elem6.read(iprot);
+              struct.success.add(_elem6);
             }
           }
           struct.setSuccessIsSet(true);
@@ -3550,7 +3550,7 @@ public class CompactionCoordinator {
     private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new compactionCompleted_argsTupleSchemeFactory();
 
     public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.tabletserver.thrift.CompactionJob job; // required
-    public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.dataImpl.thrift.CompactionStats stats; // required
+    public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.tabletserver.thrift.CompactionStats stats; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
     public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -3622,7 +3622,7 @@ public class CompactionCoordinator {
       tmpMap.put(_Fields.JOB, new org.apache.thrift.meta_data.FieldMetaData("job", org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.tabletserver.thrift.CompactionJob.class)));
       tmpMap.put(_Fields.STATS, new org.apache.thrift.meta_data.FieldMetaData("stats", org.apache.thrift.TFieldRequirementType.DEFAULT, 
-          new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.dataImpl.thrift.CompactionStats.class)));
+          new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.tabletserver.thrift.CompactionStats.class)));
       metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
       org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(compactionCompleted_args.class, metaDataMap);
     }
@@ -3632,7 +3632,7 @@ public class CompactionCoordinator {
 
     public compactionCompleted_args(
       org.apache.accumulo.core.tabletserver.thrift.CompactionJob job,
-      org.apache.accumulo.core.dataImpl.thrift.CompactionStats stats)
+      org.apache.accumulo.core.tabletserver.thrift.CompactionStats stats)
     {
       this();
       this.job = job;
@@ -3647,7 +3647,7 @@ public class CompactionCoordinator {
         this.job = new org.apache.accumulo.core.tabletserver.thrift.CompactionJob(other.job);
       }
       if (other.isSetStats()) {
-        this.stats = new org.apache.accumulo.core.dataImpl.thrift.CompactionStats(other.stats);
+        this.stats = new org.apache.accumulo.core.tabletserver.thrift.CompactionStats(other.stats);
       }
     }
 
@@ -3687,11 +3687,11 @@ public class CompactionCoordinator {
     }
 
     @org.apache.thrift.annotation.Nullable
-    public org.apache.accumulo.core.dataImpl.thrift.CompactionStats getStats() {
+    public org.apache.accumulo.core.tabletserver.thrift.CompactionStats getStats() {
       return this.stats;
     }
 
-    public compactionCompleted_args setStats(@org.apache.thrift.annotation.Nullable org.apache.accumulo.core.dataImpl.thrift.CompactionStats stats) {
+    public compactionCompleted_args setStats(@org.apache.thrift.annotation.Nullable org.apache.accumulo.core.tabletserver.thrift.CompactionStats stats) {
       this.stats = stats;
       return this;
     }
@@ -3725,7 +3725,7 @@ public class CompactionCoordinator {
         if (value == null) {
           unsetStats();
         } else {
-          setStats((org.apache.accumulo.core.dataImpl.thrift.CompactionStats)value);
+          setStats((org.apache.accumulo.core.tabletserver.thrift.CompactionStats)value);
         }
         break;
 
@@ -3935,7 +3935,7 @@ public class CompactionCoordinator {
               break;
             case 2: // STATS
               if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
-                struct.stats = new org.apache.accumulo.core.dataImpl.thrift.CompactionStats();
+                struct.stats = new org.apache.accumulo.core.tabletserver.thrift.CompactionStats();
                 struct.stats.read(iprot);
                 struct.setStatsIsSet(true);
               } else { 
@@ -4010,7 +4010,7 @@ public class CompactionCoordinator {
           struct.setJobIsSet(true);
         }
         if (incoming.get(1)) {
-          struct.stats = new org.apache.accumulo.core.dataImpl.thrift.CompactionStats();
+          struct.stats = new org.apache.accumulo.core.tabletserver.thrift.CompactionStats();
           struct.stats.read(iprot);
           struct.setStatsIsSet(true);
         }
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/CompactionJob.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/CompactionJob.java
index 8ff1214..e74ea50 100644
--- a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/CompactionJob.java
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/CompactionJob.java
@@ -40,6 +40,8 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
   private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)11);
   private static final org.apache.thrift.protocol.TField REASON_FIELD_DESC = new org.apache.thrift.protocol.TField("reason", org.apache.thrift.protocol.TType.I32, (short)12);
   private static final org.apache.thrift.protocol.TField OUTPUT_FILE_FIELD_DESC = new org.apache.thrift.protocol.TField("outputFile", org.apache.thrift.protocol.TType.STRING, (short)13);
+  private static final org.apache.thrift.protocol.TField PROPAGATE_DELETES_FIELD_DESC = new org.apache.thrift.protocol.TField("propagateDeletes", org.apache.thrift.protocol.TType.BOOL, (short)14);
+  private static final org.apache.thrift.protocol.TField KIND_FIELD_DESC = new org.apache.thrift.protocol.TField("kind", org.apache.thrift.protocol.TType.I32, (short)15);
 
   private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new CompactionJobStandardSchemeFactory();
   private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new CompactionJobTupleSchemeFactory();
@@ -48,7 +50,7 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
   public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials; // required
   public long compactionId; // required
   public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent; // required
-  public @org.apache.thrift.annotation.Nullable java.util.List<java.lang.String> files; // required
+  public @org.apache.thrift.annotation.Nullable java.util.List<InputFile> files; // required
   public int priority; // required
   public int readRate; // required
   public int writeRate; // required
@@ -64,6 +66,8 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
    */
   public @org.apache.thrift.annotation.Nullable CompactionReason reason; // required
   public @org.apache.thrift.annotation.Nullable java.lang.String outputFile; // required
+  public boolean propagateDeletes; // required
+  public @org.apache.thrift.annotation.Nullable CompactionKind kind; // required
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -86,7 +90,9 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
      * @see CompactionReason
      */
     REASON((short)12, "reason"),
-    OUTPUT_FILE((short)13, "outputFile");
+    OUTPUT_FILE((short)13, "outputFile"),
+    PROPAGATE_DELETES((short)14, "propagateDeletes"),
+    KIND((short)15, "kind");
 
     private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
 
@@ -126,6 +132,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
           return REASON;
         case 13: // OUTPUT_FILE
           return OUTPUT_FILE;
+        case 14: // PROPAGATE_DELETES
+          return PROPAGATE_DELETES;
+        case 15: // KIND
+          return KIND;
         default:
           return null;
       }
@@ -171,6 +181,7 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
   private static final int __PRIORITY_ISSET_ID = 1;
   private static final int __READRATE_ISSET_ID = 2;
   private static final int __WRITERATE_ISSET_ID = 3;
+  private static final int __PROPAGATEDELETES_ISSET_ID = 4;
   private byte __isset_bitfield = 0;
   public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
@@ -185,7 +196,7 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
         new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent.class)));
     tmpMap.put(_Fields.FILES, new org.apache.thrift.meta_data.FieldMetaData("files", org.apache.thrift.TFieldRequirementType.DEFAULT, 
         new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, 
-            new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+            new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, InputFile.class))));
     tmpMap.put(_Fields.PRIORITY, new org.apache.thrift.meta_data.FieldMetaData("priority", org.apache.thrift.TFieldRequirementType.DEFAULT, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
     tmpMap.put(_Fields.READ_RATE, new org.apache.thrift.meta_data.FieldMetaData("readRate", org.apache.thrift.TFieldRequirementType.DEFAULT, 
@@ -200,6 +211,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
         new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, CompactionReason.class)));
     tmpMap.put(_Fields.OUTPUT_FILE, new org.apache.thrift.meta_data.FieldMetaData("outputFile", org.apache.thrift.TFieldRequirementType.DEFAULT, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.PROPAGATE_DELETES, new org.apache.thrift.meta_data.FieldMetaData("propagateDeletes", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
+    tmpMap.put(_Fields.KIND, new org.apache.thrift.meta_data.FieldMetaData("kind", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.ENUM        , "CompactionKind")));
     metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CompactionJob.class, metaDataMap);
   }
@@ -212,14 +227,16 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
     org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
     long compactionId,
     org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent,
-    java.util.List<java.lang.String> files,
+    java.util.List<InputFile> files,
     int priority,
     int readRate,
     int writeRate,
     IteratorConfig iteratorSettings,
     CompactionType type,
     CompactionReason reason,
-    java.lang.String outputFile)
+    java.lang.String outputFile,
+    boolean propagateDeletes,
+    CompactionKind kind)
   {
     this();
     this.traceInfo = traceInfo;
@@ -238,6 +255,9 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
     this.type = type;
     this.reason = reason;
     this.outputFile = outputFile;
+    this.propagateDeletes = propagateDeletes;
+    setPropagateDeletesIsSet(true);
+    this.kind = kind;
   }
 
   /**
@@ -256,7 +276,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       this.extent = new org.apache.accumulo.core.dataImpl.thrift.TKeyExtent(other.extent);
     }
     if (other.isSetFiles()) {
-      java.util.List<java.lang.String> __this__files = new java.util.ArrayList<java.lang.String>(other.files);
+      java.util.List<InputFile> __this__files = new java.util.ArrayList<InputFile>(other.files.size());
+      for (InputFile other_element : other.files) {
+        __this__files.add(new InputFile(other_element));
+      }
       this.files = __this__files;
     }
     this.priority = other.priority;
@@ -274,6 +297,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
     if (other.isSetOutputFile()) {
       this.outputFile = other.outputFile;
     }
+    this.propagateDeletes = other.propagateDeletes;
+    if (other.isSetKind()) {
+      this.kind = other.kind;
+    }
   }
 
   public CompactionJob deepCopy() {
@@ -298,6 +325,9 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
     this.type = null;
     this.reason = null;
     this.outputFile = null;
+    setPropagateDeletesIsSet(false);
+    this.propagateDeletes = false;
+    this.kind = null;
   }
 
   @org.apache.thrift.annotation.Nullable
@@ -403,23 +433,23 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
   }
 
   @org.apache.thrift.annotation.Nullable
-  public java.util.Iterator<java.lang.String> getFilesIterator() {
+  public java.util.Iterator<InputFile> getFilesIterator() {
     return (this.files == null) ? null : this.files.iterator();
   }
 
-  public void addToFiles(java.lang.String elem) {
+  public void addToFiles(InputFile elem) {
     if (this.files == null) {
-      this.files = new java.util.ArrayList<java.lang.String>();
+      this.files = new java.util.ArrayList<InputFile>();
     }
     this.files.add(elem);
   }
 
   @org.apache.thrift.annotation.Nullable
-  public java.util.List<java.lang.String> getFiles() {
+  public java.util.List<InputFile> getFiles() {
     return this.files;
   }
 
-  public CompactionJob setFiles(@org.apache.thrift.annotation.Nullable java.util.List<java.lang.String> files) {
+  public CompactionJob setFiles(@org.apache.thrift.annotation.Nullable java.util.List<InputFile> files) {
     this.files = files;
     return this;
   }
@@ -624,6 +654,54 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
     }
   }
 
+  public boolean isPropagateDeletes() {
+    return this.propagateDeletes;
+  }
+
+  public CompactionJob setPropagateDeletes(boolean propagateDeletes) {
+    this.propagateDeletes = propagateDeletes;
+    setPropagateDeletesIsSet(true);
+    return this;
+  }
+
+  public void unsetPropagateDeletes() {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __PROPAGATEDELETES_ISSET_ID);
+  }
+
+  /** Returns true if field propagateDeletes is set (has been assigned a value) and false otherwise */
+  public boolean isSetPropagateDeletes() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __PROPAGATEDELETES_ISSET_ID);
+  }
+
+  public void setPropagateDeletesIsSet(boolean value) {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __PROPAGATEDELETES_ISSET_ID, value);
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  public CompactionKind getKind() {
+    return this.kind;
+  }
+
+  public CompactionJob setKind(@org.apache.thrift.annotation.Nullable CompactionKind kind) {
+    this.kind = kind;
+    return this;
+  }
+
+  public void unsetKind() {
+    this.kind = null;
+  }
+
+  /** Returns true if field kind is set (has been assigned a value) and false otherwise */
+  public boolean isSetKind() {
+    return this.kind != null;
+  }
+
+  public void setKindIsSet(boolean value) {
+    if (!value) {
+      this.kind = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
     switch (field) {
     case TRACE_INFO:
@@ -662,7 +740,7 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       if (value == null) {
         unsetFiles();
       } else {
-        setFiles((java.util.List<java.lang.String>)value);
+        setFiles((java.util.List<InputFile>)value);
       }
       break;
 
@@ -722,6 +800,22 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       }
       break;
 
+    case PROPAGATE_DELETES:
+      if (value == null) {
+        unsetPropagateDeletes();
+      } else {
+        setPropagateDeletes((java.lang.Boolean)value);
+      }
+      break;
+
+    case KIND:
+      if (value == null) {
+        unsetKind();
+      } else {
+        setKind((CompactionKind)value);
+      }
+      break;
+
     }
   }
 
@@ -764,6 +858,12 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
     case OUTPUT_FILE:
       return getOutputFile();
 
+    case PROPAGATE_DELETES:
+      return isPropagateDeletes();
+
+    case KIND:
+      return getKind();
+
     }
     throw new java.lang.IllegalStateException();
   }
@@ -799,6 +899,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       return isSetReason();
     case OUTPUT_FILE:
       return isSetOutputFile();
+    case PROPAGATE_DELETES:
+      return isSetPropagateDeletes();
+    case KIND:
+      return isSetKind();
     }
     throw new java.lang.IllegalStateException();
   }
@@ -926,6 +1030,24 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
         return false;
     }
 
+    boolean this_present_propagateDeletes = true;
+    boolean that_present_propagateDeletes = true;
+    if (this_present_propagateDeletes || that_present_propagateDeletes) {
+      if (!(this_present_propagateDeletes && that_present_propagateDeletes))
+        return false;
+      if (this.propagateDeletes != that.propagateDeletes)
+        return false;
+    }
+
+    boolean this_present_kind = true && this.isSetKind();
+    boolean that_present_kind = true && that.isSetKind();
+    if (this_present_kind || that_present_kind) {
+      if (!(this_present_kind && that_present_kind))
+        return false;
+      if (!this.kind.equals(that.kind))
+        return false;
+    }
+
     return true;
   }
 
@@ -973,6 +1095,12 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
     if (isSetOutputFile())
       hashCode = hashCode * 8191 + outputFile.hashCode();
 
+    hashCode = hashCode * 8191 + ((propagateDeletes) ? 131071 : 524287);
+
+    hashCode = hashCode * 8191 + ((isSetKind()) ? 131071 : 524287);
+    if (isSetKind())
+      hashCode = hashCode * 8191 + kind.getValue();
+
     return hashCode;
   }
 
@@ -1104,6 +1232,26 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
         return lastComparison;
       }
     }
+    lastComparison = java.lang.Boolean.valueOf(isSetPropagateDeletes()).compareTo(other.isSetPropagateDeletes());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetPropagateDeletes()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.propagateDeletes, other.propagateDeletes);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = java.lang.Boolean.valueOf(isSetKind()).compareTo(other.isSetKind());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetKind()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.kind, other.kind);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -1204,6 +1352,18 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       sb.append(this.outputFile);
     }
     first = false;
+    if (!first) sb.append(", ");
+    sb.append("propagateDeletes:");
+    sb.append(this.propagateDeletes);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("kind:");
+    if (this.kind == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.kind);
+    }
+    first = false;
     sb.append(")");
     return sb.toString();
   }
@@ -1300,11 +1460,12 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
             if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
               {
                 org.apache.thrift.protocol.TList _list116 = iprot.readListBegin();
-                struct.files = new java.util.ArrayList<java.lang.String>(_list116.size);
-                @org.apache.thrift.annotation.Nullable java.lang.String _elem117;
+                struct.files = new java.util.ArrayList<InputFile>(_list116.size);
+                @org.apache.thrift.annotation.Nullable InputFile _elem117;
                 for (int _i118 = 0; _i118 < _list116.size; ++_i118)
                 {
-                  _elem117 = iprot.readString();
+                  _elem117 = new InputFile();
+                  _elem117.read(iprot);
                   struct.files.add(_elem117);
                 }
                 iprot.readListEnd();
@@ -1371,6 +1532,22 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 14: // PROPAGATE_DELETES
+            if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+              struct.propagateDeletes = iprot.readBool();
+              struct.setPropagateDeletesIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 15: // KIND
+            if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+              struct.kind = org.apache.accumulo.core.tabletserver.thrift.CompactionKind.findByValue(iprot.readI32());
+              struct.setKindIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -1407,10 +1584,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       if (struct.files != null) {
         oprot.writeFieldBegin(FILES_FIELD_DESC);
         {
-          oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.files.size()));
-          for (java.lang.String _iter119 : struct.files)
+          oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.files.size()));
+          for (InputFile _iter119 : struct.files)
           {
-            oprot.writeString(_iter119);
+            _iter119.write(oprot);
           }
           oprot.writeListEnd();
         }
@@ -1445,6 +1622,14 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
         oprot.writeString(struct.outputFile);
         oprot.writeFieldEnd();
       }
+      oprot.writeFieldBegin(PROPAGATE_DELETES_FIELD_DESC);
+      oprot.writeBool(struct.propagateDeletes);
+      oprot.writeFieldEnd();
+      if (struct.kind != null) {
+        oprot.writeFieldBegin(KIND_FIELD_DESC);
+        oprot.writeI32(struct.kind.getValue());
+        oprot.writeFieldEnd();
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1499,7 +1684,13 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       if (struct.isSetOutputFile()) {
         optionals.set(11);
       }
-      oprot.writeBitSet(optionals, 12);
+      if (struct.isSetPropagateDeletes()) {
+        optionals.set(12);
+      }
+      if (struct.isSetKind()) {
+        optionals.set(13);
+      }
+      oprot.writeBitSet(optionals, 14);
       if (struct.isSetTraceInfo()) {
         struct.traceInfo.write(oprot);
       }
@@ -1515,9 +1706,9 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       if (struct.isSetFiles()) {
         {
           oprot.writeI32(struct.files.size());
-          for (java.lang.String _iter120 : struct.files)
+          for (InputFile _iter120 : struct.files)
           {
-            oprot.writeString(_iter120);
+            _iter120.write(oprot);
           }
         }
       }
@@ -1542,12 +1733,18 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       if (struct.isSetOutputFile()) {
         oprot.writeString(struct.outputFile);
       }
+      if (struct.isSetPropagateDeletes()) {
+        oprot.writeBool(struct.propagateDeletes);
+      }
+      if (struct.isSetKind()) {
+        oprot.writeI32(struct.kind.getValue());
+      }
     }
 
     @Override
     public void read(org.apache.thrift.protocol.TProtocol prot, CompactionJob struct) throws org.apache.thrift.TException {
       org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
-      java.util.BitSet incoming = iprot.readBitSet(12);
+      java.util.BitSet incoming = iprot.readBitSet(14);
       if (incoming.get(0)) {
         struct.traceInfo = new org.apache.accumulo.core.trace.thrift.TInfo();
         struct.traceInfo.read(iprot);
@@ -1569,12 +1766,13 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       }
       if (incoming.get(4)) {
         {
-          org.apache.thrift.protocol.TList _list121 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
-          struct.files = new java.util.ArrayList<java.lang.String>(_list121.size);
-          @org.apache.thrift.annotation.Nullable java.lang.String _elem122;
+          org.apache.thrift.protocol.TList _list121 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+          struct.files = new java.util.ArrayList<InputFile>(_list121.size);
+          @org.apache.thrift.annotation.Nullable InputFile _elem122;
           for (int _i123 = 0; _i123 < _list121.size; ++_i123)
           {
-            _elem122 = iprot.readString();
+            _elem122 = new InputFile();
+            _elem122.read(iprot);
             struct.files.add(_elem122);
           }
         }
@@ -1609,6 +1807,14 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
         struct.outputFile = iprot.readString();
         struct.setOutputFileIsSet(true);
       }
+      if (incoming.get(12)) {
+        struct.propagateDeletes = iprot.readBool();
+        struct.setPropagateDeletesIsSet(true);
+      }
+      if (incoming.get(13)) {
+        struct.kind = org.apache.accumulo.core.tabletserver.thrift.CompactionKind.findByValue(iprot.readI32());
+        struct.setKindIsSet(true);
+      }
     }
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/CompactionKind.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/CompactionKind.java
new file mode 100644
index 0000000..f3599a5
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/CompactionKind.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.12.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.accumulo.core.tabletserver.thrift;
+
+
+public enum CompactionKind implements org.apache.thrift.TEnum {
+  CHOP(0),
+  SELECTOR(1),
+  SYSTEM(2),
+  USER(3);
+
+  private final int value;
+
+  private CompactionKind(int value) {
+    this.value = value;
+  }
+
+  /**
+   * Get the integer value of this enum value, as defined in the Thrift IDL.
+   */
+  public int getValue() {
+    return value;
+  }
+
+  /**
+   * Find a the enum type by its integer value, as defined in the Thrift IDL.
+   * @return null if the value is not found.
+   */
+  @org.apache.thrift.annotation.Nullable
+  public static CompactionKind findByValue(int value) { 
+    switch (value) {
+      case 0:
+        return CHOP;
+      case 1:
+        return SELECTOR;
+      case 2:
+        return SYSTEM;
+      case 3:
+        return USER;
+      default:
+        return null;
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/CompactionStats.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/CompactionStats.java
new file mode 100644
index 0000000..4189a6f
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/CompactionStats.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.12.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.accumulo.core.tabletserver.thrift;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+public class CompactionStats implements org.apache.thrift.TBase<CompactionStats, CompactionStats._Fields>, java.io.Serializable, Cloneable, Comparable<CompactionStats> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("CompactionStats");
+
+  private static final org.apache.thrift.protocol.TField ENTRIES_READ_FIELD_DESC = new org.apache.thrift.protocol.TField("entriesRead", org.apache.thrift.protocol.TType.I64, (short)1);
+  private static final org.apache.thrift.protocol.TField ENTRIES_WRITTEN_FIELD_DESC = new org.apache.thrift.protocol.TField("entriesWritten", org.apache.thrift.protocol.TType.I64, (short)2);
+  private static final org.apache.thrift.protocol.TField FILE_SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("fileSize", org.apache.thrift.protocol.TType.I64, (short)3);
+
+  private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new CompactionStatsStandardSchemeFactory();
+  private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new CompactionStatsTupleSchemeFactory();
+
+  public long entriesRead; // required
+  public long entriesWritten; // required
+  public long fileSize; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    ENTRIES_READ((short)1, "entriesRead"),
+    ENTRIES_WRITTEN((short)2, "entriesWritten"),
+    FILE_SIZE((short)3, "fileSize");
+
+    private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+    static {
+      for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    @org.apache.thrift.annotation.Nullable
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // ENTRIES_READ
+          return ENTRIES_READ;
+        case 2: // ENTRIES_WRITTEN
+          return ENTRIES_WRITTEN;
+        case 3: // FILE_SIZE
+          return FILE_SIZE;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    @org.apache.thrift.annotation.Nullable
+    public static _Fields findByName(java.lang.String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final java.lang.String _fieldName;
+
+    _Fields(short thriftId, java.lang.String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public java.lang.String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __ENTRIESREAD_ISSET_ID = 0;
+  private static final int __ENTRIESWRITTEN_ISSET_ID = 1;
+  private static final int __FILESIZE_ISSET_ID = 2;
+  private byte __isset_bitfield = 0;
+  public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.ENTRIES_READ, new org.apache.thrift.meta_data.FieldMetaData("entriesRead", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    tmpMap.put(_Fields.ENTRIES_WRITTEN, new org.apache.thrift.meta_data.FieldMetaData("entriesWritten", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    tmpMap.put(_Fields.FILE_SIZE, new org.apache.thrift.meta_data.FieldMetaData("fileSize", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CompactionStats.class, metaDataMap);
+  }
+
+  public CompactionStats() {
+  }
+
+  public CompactionStats(
+    long entriesRead,
+    long entriesWritten,
+    long fileSize)
+  {
+    this();
+    this.entriesRead = entriesRead;
+    setEntriesReadIsSet(true);
+    this.entriesWritten = entriesWritten;
+    setEntriesWrittenIsSet(true);
+    this.fileSize = fileSize;
+    setFileSizeIsSet(true);
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public CompactionStats(CompactionStats other) {
+    __isset_bitfield = other.__isset_bitfield;
+    this.entriesRead = other.entriesRead;
+    this.entriesWritten = other.entriesWritten;
+    this.fileSize = other.fileSize;
+  }
+
+  public CompactionStats deepCopy() {
+    return new CompactionStats(this);
+  }
+
+  @Override
+  public void clear() {
+    setEntriesReadIsSet(false);
+    this.entriesRead = 0;
+    setEntriesWrittenIsSet(false);
+    this.entriesWritten = 0;
+    setFileSizeIsSet(false);
+    this.fileSize = 0;
+  }
+
+  public long getEntriesRead() {
+    return this.entriesRead;
+  }
+
+  public CompactionStats setEntriesRead(long entriesRead) {
+    this.entriesRead = entriesRead;
+    setEntriesReadIsSet(true);
+    return this;
+  }
+
+  public void unsetEntriesRead() {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __ENTRIESREAD_ISSET_ID);
+  }
+
+  /** Returns true if field entriesRead is set (has been assigned a value) and false otherwise */
+  public boolean isSetEntriesRead() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __ENTRIESREAD_ISSET_ID);
+  }
+
+  public void setEntriesReadIsSet(boolean value) {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __ENTRIESREAD_ISSET_ID, value);
+  }
+
+  public long getEntriesWritten() {
+    return this.entriesWritten;
+  }
+
+  public CompactionStats setEntriesWritten(long entriesWritten) {
+    this.entriesWritten = entriesWritten;
+    setEntriesWrittenIsSet(true);
+    return this;
+  }
+
+  public void unsetEntriesWritten() {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __ENTRIESWRITTEN_ISSET_ID);
+  }
+
+  /** Returns true if field entriesWritten is set (has been assigned a value) and false otherwise */
+  public boolean isSetEntriesWritten() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __ENTRIESWRITTEN_ISSET_ID);
+  }
+
+  public void setEntriesWrittenIsSet(boolean value) {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __ENTRIESWRITTEN_ISSET_ID, value);
+  }
+
+  public long getFileSize() {
+    return this.fileSize;
+  }
+
+  public CompactionStats setFileSize(long fileSize) {
+    this.fileSize = fileSize;
+    setFileSizeIsSet(true);
+    return this;
+  }
+
+  public void unsetFileSize() {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __FILESIZE_ISSET_ID);
+  }
+
+  /** Returns true if field fileSize is set (has been assigned a value) and false otherwise */
+  public boolean isSetFileSize() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __FILESIZE_ISSET_ID);
+  }
+
+  public void setFileSizeIsSet(boolean value) {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __FILESIZE_ISSET_ID, value);
+  }
+
+  public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
+    switch (field) {
+    case ENTRIES_READ:
+      if (value == null) {
+        unsetEntriesRead();
+      } else {
+        setEntriesRead((java.lang.Long)value);
+      }
+      break;
+
+    case ENTRIES_WRITTEN:
+      if (value == null) {
+        unsetEntriesWritten();
+      } else {
+        setEntriesWritten((java.lang.Long)value);
+      }
+      break;
+
+    case FILE_SIZE:
+      if (value == null) {
+        unsetFileSize();
+      } else {
+        setFileSize((java.lang.Long)value);
+      }
+      break;
+
+    }
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  public java.lang.Object getFieldValue(_Fields field) {
+    switch (field) {
+    case ENTRIES_READ:
+      return getEntriesRead();
+
+    case ENTRIES_WRITTEN:
+      return getEntriesWritten();
+
+    case FILE_SIZE:
+      return getFileSize();
+
+    }
+    throw new java.lang.IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new java.lang.IllegalArgumentException();
+    }
+
+    switch (field) {
+    case ENTRIES_READ:
+      return isSetEntriesRead();
+    case ENTRIES_WRITTEN:
+      return isSetEntriesWritten();
+    case FILE_SIZE:
+      return isSetFileSize();
+    }
+    throw new java.lang.IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(java.lang.Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof CompactionStats)
+      return this.equals((CompactionStats)that);
+    return false;
+  }
+
+  public boolean equals(CompactionStats that) {
+    if (that == null)
+      return false;
+    if (this == that)
+      return true;
+
+    boolean this_present_entriesRead = true;
+    boolean that_present_entriesRead = true;
+    if (this_present_entriesRead || that_present_entriesRead) {
+      if (!(this_present_entriesRead && that_present_entriesRead))
+        return false;
+      if (this.entriesRead != that.entriesRead)
+        return false;
+    }
+
+    boolean this_present_entriesWritten = true;
+    boolean that_present_entriesWritten = true;
+    if (this_present_entriesWritten || that_present_entriesWritten) {
+      if (!(this_present_entriesWritten && that_present_entriesWritten))
+        return false;
+      if (this.entriesWritten != that.entriesWritten)
+        return false;
+    }
+
+    boolean this_present_fileSize = true;
+    boolean that_present_fileSize = true;
+    if (this_present_fileSize || that_present_fileSize) {
+      if (!(this_present_fileSize && that_present_fileSize))
+        return false;
+      if (this.fileSize != that.fileSize)
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int hashCode = 1;
+
+    hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(entriesRead);
+
+    hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(entriesWritten);
+
+    hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(fileSize);
+
+    return hashCode;
+  }
+
+  @Override
+  public int compareTo(CompactionStats other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = java.lang.Boolean.valueOf(isSetEntriesRead()).compareTo(other.isSetEntriesRead());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetEntriesRead()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.entriesRead, other.entriesRead);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = java.lang.Boolean.valueOf(isSetEntriesWritten()).compareTo(other.isSetEntriesWritten());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetEntriesWritten()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.entriesWritten, other.entriesWritten);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = java.lang.Boolean.valueOf(isSetFileSize()).compareTo(other.isSetFileSize());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetFileSize()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.fileSize, other.fileSize);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    scheme(iprot).read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    scheme(oprot).write(oprot, this);
+  }
+
+  @Override
+  public java.lang.String toString() {
+    java.lang.StringBuilder sb = new java.lang.StringBuilder("CompactionStats(");
+    boolean first = true;
+
+    sb.append("entriesRead:");
+    sb.append(this.entriesRead);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("entriesWritten:");
+    sb.append(this.entriesWritten);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("fileSize:");
+    sb.append(this.fileSize);
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+    try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class CompactionStatsStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+    public CompactionStatsStandardScheme getScheme() {
+      return new CompactionStatsStandardScheme();
+    }
+  }
+
+  private static class CompactionStatsStandardScheme extends org.apache.thrift.scheme.StandardScheme<CompactionStats> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, CompactionStats struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // ENTRIES_READ
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.entriesRead = iprot.readI64();
+              struct.setEntriesReadIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // ENTRIES_WRITTEN
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.entriesWritten = iprot.readI64();
+              struct.setEntriesWrittenIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 3: // FILE_SIZE
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.fileSize = iprot.readI64();
+              struct.setFileSizeIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, CompactionStats struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      oprot.writeFieldBegin(ENTRIES_READ_FIELD_DESC);
+      oprot.writeI64(struct.entriesRead);
+      oprot.writeFieldEnd();
+      oprot.writeFieldBegin(ENTRIES_WRITTEN_FIELD_DESC);
+      oprot.writeI64(struct.entriesWritten);
+      oprot.writeFieldEnd();
+      oprot.writeFieldBegin(FILE_SIZE_FIELD_DESC);
+      oprot.writeI64(struct.fileSize);
+      oprot.writeFieldEnd();
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class CompactionStatsTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+    public CompactionStatsTupleScheme getScheme() {
+      return new CompactionStatsTupleScheme();
+    }
+  }
+
+  private static class CompactionStatsTupleScheme extends org.apache.thrift.scheme.TupleScheme<CompactionStats> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, CompactionStats struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+      java.util.BitSet optionals = new java.util.BitSet();
+      if (struct.isSetEntriesRead()) {
+        optionals.set(0);
+      }
+      if (struct.isSetEntriesWritten()) {
+        optionals.set(1);
+      }
+      if (struct.isSetFileSize()) {
+        optionals.set(2);
+      }
+      oprot.writeBitSet(optionals, 3);
+      if (struct.isSetEntriesRead()) {
+        oprot.writeI64(struct.entriesRead);
+      }
+      if (struct.isSetEntriesWritten()) {
+        oprot.writeI64(struct.entriesWritten);
+      }
+      if (struct.isSetFileSize()) {
+        oprot.writeI64(struct.fileSize);
+      }
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, CompactionStats struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+      java.util.BitSet incoming = iprot.readBitSet(3);
+      if (incoming.get(0)) {
+        struct.entriesRead = iprot.readI64();
+        struct.setEntriesReadIsSet(true);
+      }
+      if (incoming.get(1)) {
+        struct.entriesWritten = iprot.readI64();
+        struct.setEntriesWrittenIsSet(true);
+      }
+      if (incoming.get(2)) {
+        struct.fileSize = iprot.readI64();
+        struct.setFileSizeIsSet(true);
+      }
+    }
+  }
+
+  private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+    return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+  }
+  private static void unusedMethod() {}
+}
+
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/InputFile.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/InputFile.java
new file mode 100644
index 0000000..10c0008
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/InputFile.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.12.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.accumulo.core.tabletserver.thrift;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+public class InputFile implements org.apache.thrift.TBase<InputFile, InputFile._Fields>, java.io.Serializable, Cloneable, Comparable<InputFile> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InputFile");
+
+  private static final org.apache.thrift.protocol.TField METADATA_FILE_ENTRY_FIELD_DESC = new org.apache.thrift.protocol.TField("metadataFileEntry", org.apache.thrift.protocol.TType.STRING, (short)1);
+  private static final org.apache.thrift.protocol.TField SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("size", org.apache.thrift.protocol.TType.I64, (short)2);
+  private static final org.apache.thrift.protocol.TField ENTRIES_FIELD_DESC = new org.apache.thrift.protocol.TField("entries", org.apache.thrift.protocol.TType.I64, (short)3);
+  private static final org.apache.thrift.protocol.TField TIMESTAMP_FIELD_DESC = new org.apache.thrift.protocol.TField("timestamp", org.apache.thrift.protocol.TType.I64, (short)4);
+
+  private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new InputFileStandardSchemeFactory();
+  private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new InputFileTupleSchemeFactory();
+
+  public @org.apache.thrift.annotation.Nullable java.lang.String metadataFileEntry; // required
+  public long size; // required
+  public long entries; // required
+  public long timestamp; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    METADATA_FILE_ENTRY((short)1, "metadataFileEntry"),
+    SIZE((short)2, "size"),
+    ENTRIES((short)3, "entries"),
+    TIMESTAMP((short)4, "timestamp");
+
+    private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+    static {
+      for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    @org.apache.thrift.annotation.Nullable
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // METADATA_FILE_ENTRY
+          return METADATA_FILE_ENTRY;
+        case 2: // SIZE
+          return SIZE;
+        case 3: // ENTRIES
+          return ENTRIES;
+        case 4: // TIMESTAMP
+          return TIMESTAMP;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    @org.apache.thrift.annotation.Nullable
+    public static _Fields findByName(java.lang.String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final java.lang.String _fieldName;
+
+    _Fields(short thriftId, java.lang.String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public java.lang.String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __SIZE_ISSET_ID = 0;
+  private static final int __ENTRIES_ISSET_ID = 1;
+  private static final int __TIMESTAMP_ISSET_ID = 2;
+  private byte __isset_bitfield = 0;
+  public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.METADATA_FILE_ENTRY, new org.apache.thrift.meta_data.FieldMetaData("metadataFileEntry", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.SIZE, new org.apache.thrift.meta_data.FieldMetaData("size", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    tmpMap.put(_Fields.ENTRIES, new org.apache.thrift.meta_data.FieldMetaData("entries", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    tmpMap.put(_Fields.TIMESTAMP, new org.apache.thrift.meta_data.FieldMetaData("timestamp", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(InputFile.class, metaDataMap);
+  }
+
+  public InputFile() {
+  }
+
+  public InputFile(
+    java.lang.String metadataFileEntry,
+    long size,
+    long entries,
+    long timestamp)
+  {
+    this();
+    this.metadataFileEntry = metadataFileEntry;
+    this.size = size;
+    setSizeIsSet(true);
+    this.entries = entries;
+    setEntriesIsSet(true);
+    this.timestamp = timestamp;
+    setTimestampIsSet(true);
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public InputFile(InputFile other) {
+    __isset_bitfield = other.__isset_bitfield;
+    if (other.isSetMetadataFileEntry()) {
+      this.metadataFileEntry = other.metadataFileEntry;
+    }
+    this.size = other.size;
+    this.entries = other.entries;
+    this.timestamp = other.timestamp;
+  }
+
+  public InputFile deepCopy() {
+    return new InputFile(this);
+  }
+
+  @Override
+  public void clear() {
+    this.metadataFileEntry = null;
+    setSizeIsSet(false);
+    this.size = 0;
+    setEntriesIsSet(false);
+    this.entries = 0;
+    setTimestampIsSet(false);
+    this.timestamp = 0;
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  public java.lang.String getMetadataFileEntry() {
+    return this.metadataFileEntry;
+  }
+
+  public InputFile setMetadataFileEntry(@org.apache.thrift.annotation.Nullable java.lang.String metadataFileEntry) {
+    this.metadataFileEntry = metadataFileEntry;
+    return this;
+  }
+
+  public void unsetMetadataFileEntry() {
+    this.metadataFileEntry = null;
+  }
+
+  /** Returns true if field metadataFileEntry is set (has been assigned a value) and false otherwise */
+  public boolean isSetMetadataFileEntry() {
+    return this.metadataFileEntry != null;
+  }
+
+  public void setMetadataFileEntryIsSet(boolean value) {
+    if (!value) {
+      this.metadataFileEntry = null;
+    }
+  }
+
+  public long getSize() {
+    return this.size;
+  }
+
+  public InputFile setSize(long size) {
+    this.size = size;
+    setSizeIsSet(true);
+    return this;
+  }
+
+  public void unsetSize() {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __SIZE_ISSET_ID);
+  }
+
+  /** Returns true if field size is set (has been assigned a value) and false otherwise */
+  public boolean isSetSize() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __SIZE_ISSET_ID);
+  }
+
+  public void setSizeIsSet(boolean value) {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __SIZE_ISSET_ID, value);
+  }
+
+  public long getEntries() {
+    return this.entries;
+  }
+
+  public InputFile setEntries(long entries) {
+    this.entries = entries;
+    setEntriesIsSet(true);
+    return this;
+  }
+
+  public void unsetEntries() {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __ENTRIES_ISSET_ID);
+  }
+
+  /** Returns true if field entries is set (has been assigned a value) and false otherwise */
+  public boolean isSetEntries() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __ENTRIES_ISSET_ID);
+  }
+
+  public void setEntriesIsSet(boolean value) {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __ENTRIES_ISSET_ID, value);
+  }
+
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  public InputFile setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+    setTimestampIsSet(true);
+    return this;
+  }
+
+  public void unsetTimestamp() {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __TIMESTAMP_ISSET_ID);
+  }
+
+  /** Returns true if field timestamp is set (has been assigned a value) and false otherwise */
+  public boolean isSetTimestamp() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __TIMESTAMP_ISSET_ID);
+  }
+
+  public void setTimestampIsSet(boolean value) {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __TIMESTAMP_ISSET_ID, value);
+  }
+
+  public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
+    switch (field) {
+    case METADATA_FILE_ENTRY:
+      if (value == null) {
+        unsetMetadataFileEntry();
+      } else {
+        setMetadataFileEntry((java.lang.String)value);
+      }
+      break;
+
+    case SIZE:
+      if (value == null) {
+        unsetSize();
+      } else {
+        setSize((java.lang.Long)value);
+      }
+      break;
+
+    case ENTRIES:
+      if (value == null) {
+        unsetEntries();
+      } else {
+        setEntries((java.lang.Long)value);
+      }
+      break;
+
+    case TIMESTAMP:
+      if (value == null) {
+        unsetTimestamp();
+      } else {
+        setTimestamp((java.lang.Long)value);
+      }
+      break;
+
+    }
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  public java.lang.Object getFieldValue(_Fields field) {
+    switch (field) {
+    case METADATA_FILE_ENTRY:
+      return getMetadataFileEntry();
+
+    case SIZE:
+      return getSize();
+
+    case ENTRIES:
+      return getEntries();
+
+    case TIMESTAMP:
+      return getTimestamp();
+
+    }
+    throw new java.lang.IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new java.lang.IllegalArgumentException();
+    }
+
+    switch (field) {
+    case METADATA_FILE_ENTRY:
+      return isSetMetadataFileEntry();
+    case SIZE:
+      return isSetSize();
+    case ENTRIES:
+      return isSetEntries();
+    case TIMESTAMP:
+      return isSetTimestamp();
+    }
+    throw new java.lang.IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(java.lang.Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof InputFile)
+      return this.equals((InputFile)that);
+    return false;
+  }
+
+  public boolean equals(InputFile that) {
+    if (that == null)
+      return false;
+    if (this == that)
+      return true;
+
+    boolean this_present_metadataFileEntry = true && this.isSetMetadataFileEntry();
+    boolean that_present_metadataFileEntry = true && that.isSetMetadataFileEntry();
+    if (this_present_metadataFileEntry || that_present_metadataFileEntry) {
+      if (!(this_present_metadataFileEntry && that_present_metadataFileEntry))
+        return false;
+      if (!this.metadataFileEntry.equals(that.metadataFileEntry))
+        return false;
+    }
+
+    boolean this_present_size = true;
+    boolean that_present_size = true;
+    if (this_present_size || that_present_size) {
+      if (!(this_present_size && that_present_size))
+        return false;
+      if (this.size != that.size)
+        return false;
+    }
+
+    boolean this_present_entries = true;
+    boolean that_present_entries = true;
+    if (this_present_entries || that_present_entries) {
+      if (!(this_present_entries && that_present_entries))
+        return false;
+      if (this.entries != that.entries)
+        return false;
+    }
+
+    boolean this_present_timestamp = true;
+    boolean that_present_timestamp = true;
+    if (this_present_timestamp || that_present_timestamp) {
+      if (!(this_present_timestamp && that_present_timestamp))
+        return false;
+      if (this.timestamp != that.timestamp)
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int hashCode = 1;
+
+    hashCode = hashCode * 8191 + ((isSetMetadataFileEntry()) ? 131071 : 524287);
+    if (isSetMetadataFileEntry())
+      hashCode = hashCode * 8191 + metadataFileEntry.hashCode();
+
+    hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(size);
+
+    hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(entries);
+
+    hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(timestamp);
+
+    return hashCode;
+  }
+
+  @Override
+  public int compareTo(InputFile other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = java.lang.Boolean.valueOf(isSetMetadataFileEntry()).compareTo(other.isSetMetadataFileEntry());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetMetadataFileEntry()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.metadataFileEntry, other.metadataFileEntry);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = java.lang.Boolean.valueOf(isSetSize()).compareTo(other.isSetSize());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetSize()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.size, other.size);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = java.lang.Boolean.valueOf(isSetEntries()).compareTo(other.isSetEntries());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetEntries()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.entries, other.entries);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = java.lang.Boolean.valueOf(isSetTimestamp()).compareTo(other.isSetTimestamp());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetTimestamp()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.timestamp, other.timestamp);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    scheme(iprot).read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    scheme(oprot).write(oprot, this);
+  }
+
+  @Override
+  public java.lang.String toString() {
+    java.lang.StringBuilder sb = new java.lang.StringBuilder("InputFile(");
+    boolean first = true;
+
+    sb.append("metadataFileEntry:");
+    if (this.metadataFileEntry == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.metadataFileEntry);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("size:");
+    sb.append(this.size);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("entries:");
+    sb.append(this.entries);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("timestamp:");
+    sb.append(this.timestamp);
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+    try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class InputFileStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+    public InputFileStandardScheme getScheme() {
+      return new InputFileStandardScheme();
+    }
+  }
+
+  private static class InputFileStandardScheme extends org.apache.thrift.scheme.StandardScheme<InputFile> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, InputFile struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // METADATA_FILE_ENTRY
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.metadataFileEntry = iprot.readString();
+              struct.setMetadataFileEntryIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // SIZE
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.size = iprot.readI64();
+              struct.setSizeIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 3: // ENTRIES
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.entries = iprot.readI64();
+              struct.setEntriesIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 4: // TIMESTAMP
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.timestamp = iprot.readI64();
+              struct.setTimestampIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, InputFile struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.metadataFileEntry != null) {
+        oprot.writeFieldBegin(METADATA_FILE_ENTRY_FIELD_DESC);
+        oprot.writeString(struct.metadataFileEntry);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldBegin(SIZE_FIELD_DESC);
+      oprot.writeI64(struct.size);
+      oprot.writeFieldEnd();
+      oprot.writeFieldBegin(ENTRIES_FIELD_DESC);
+      oprot.writeI64(struct.entries);
+      oprot.writeFieldEnd();
+      oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC);
+      oprot.writeI64(struct.timestamp);
+      oprot.writeFieldEnd();
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class InputFileTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+    public InputFileTupleScheme getScheme() {
+      return new InputFileTupleScheme();
+    }
+  }
+
+  private static class InputFileTupleScheme extends org.apache.thrift.scheme.TupleScheme<InputFile> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, InputFile struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+      java.util.BitSet optionals = new java.util.BitSet();
+      if (struct.isSetMetadataFileEntry()) {
+        optionals.set(0);
+      }
+      if (struct.isSetSize()) {
+        optionals.set(1);
+      }
+      if (struct.isSetEntries()) {
+        optionals.set(2);
+      }
+      if (struct.isSetTimestamp()) {
+        optionals.set(3);
+      }
+      oprot.writeBitSet(optionals, 4);
+      if (struct.isSetMetadataFileEntry()) {
+        oprot.writeString(struct.metadataFileEntry);
+      }
+      if (struct.isSetSize()) {
+        oprot.writeI64(struct.size);
+      }
+      if (struct.isSetEntries()) {
+        oprot.writeI64(struct.entries);
+      }
+      if (struct.isSetTimestamp()) {
+        oprot.writeI64(struct.timestamp);
+      }
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, InputFile struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+      java.util.BitSet incoming = iprot.readBitSet(4);
+      if (incoming.get(0)) {
+        struct.metadataFileEntry = iprot.readString();
+        struct.setMetadataFileEntryIsSet(true);
+      }
+      if (incoming.get(1)) {
+        struct.size = iprot.readI64();
+        struct.setSizeIsSet(true);
+      }
+      if (incoming.get(2)) {
+        struct.entries = iprot.readI64();
+        struct.setEntriesIsSet(true);
+      }
+      if (incoming.get(3)) {
+        struct.timestamp = iprot.readI64();
+        struct.setTimestampIsSet(true);
+      }
+    }
+  }
+
+  private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+    return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+  }
+  private static void unusedMethod() {}
+}
+
diff --git a/core/src/main/thrift/compaction-coordinator.thrift b/core/src/main/thrift/compaction-coordinator.thrift
index 7cc2caf..1b754cf 100644
--- a/core/src/main/thrift/compaction-coordinator.thrift
+++ b/core/src/main/thrift/compaction-coordinator.thrift
@@ -39,37 +39,6 @@ enum CompactionState {
   CANCELLED
 }
 
-struct InputFile {
-  1:string metadataFileEntry
-  2:i64 size
-  3:i64 entries
-  4:i64 timestamp
-}
-
-enum CompactionKind {
-  CHOP
-  SELECTOR
-  SYSTEM
-  USER
-}
-
-struct CompactionJob {
-  1:trace.TInfo traceInfo
-  2:security.TCredentials credentials
-  3:i64 compactionId
-  5:data.TKeyExtent extent
-  6:list<InputFile> files
-  7:i32 priority
-  8:i32 readRate
-  9:i32 writeRate
-  10:tabletserver.IteratorConfig iteratorSettings
-  11:tabletserver.CompactionType type
-  12:tabletserver.CompactionReason reason
-  13:string outputFile
-  14:bool propagateDeletes
-  15:CompactionKind kind
-}
-
 struct Status {
   1:i64 timestamp
   2:i64 compactionId
@@ -111,7 +80,7 @@ service CompactionCoordinator {
    */
   void compactionCompleted(
     1:tabletserver.CompactionJob job
-    2:data.CompactionStats stats
+    2:tabletserver.CompactionStats stats
   )
      
   /*
diff --git a/core/src/main/thrift/data.thrift b/core/src/main/thrift/data.thrift
index 7ee0b9d..f0426a4 100644
--- a/core/src/main/thrift/data.thrift
+++ b/core/src/main/thrift/data.thrift
@@ -191,9 +191,3 @@ typedef map<TKeyExtent, list<TMutation>> UpdateBatch
 
 typedef map<TKeyExtent, map<string, MapFileInfo>> TabletFiles
 
-struct CompactionStats{
-  1:i64 entriesRead;
-  2:i64 entriesWritten;
-  3:i64 fileSize;
-}
-
diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift
index e5f2194..a59abfa 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -158,12 +158,19 @@ enum TUnloadTabletGoal {
   DELETED
 }
 
+struct InputFile {
+  1:string metadataFileEntry
+  2:i64 size
+  3:i64 entries
+  4:i64 timestamp
+}
+
 struct CompactionJob {
   1:trace.TInfo traceInfo
   2:security.TCredentials credentials
   3:i64 compactionId
   5:data.TKeyExtent extent
-  6:list<string> files
+  6:list<InputFile> files
   7:i32 priority
   8:i32 readRate
   9:i32 writeRate
@@ -172,6 +179,15 @@ struct CompactionJob {
   # Need to add SELECTOR To CompactionReason, delete CompactionKind?
   12:CompactionReason reason
   13:string outputFile
+  14:bool propagateDeletes
+  15:CompactionKind kind
+}
+
+enum CompactionKind {
+  CHOP
+  SELECTOR
+  SYSTEM
+  USER
 }
 
 struct CompactionQueueSummary {
@@ -180,6 +196,12 @@ struct CompactionQueueSummary {
   3:i32 count
 }
 
+struct CompactionStats{
+  1:i64 entriesRead;
+  2:i64 entriesWritten;
+  3:i64 fileSize;
+}
+
 service TabletClientService extends client.ClientService {
 
   // scan a range of keys
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftFunction.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
similarity index 59%
copy from server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftFunction.java
copy to server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
index bfdbb4c..51c743f 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftFunction.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
@@ -16,11 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.compactor;
+package org.apache.accumulo.server.compaction;
 
-import org.apache.thrift.TException;
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.server.ServerContext;
 
-@FunctionalInterface
-public interface RetryableThriftFunction<T> {
-  T execute() throws TException;
+public class ExternalCompactionUtil {
+
+  public static HostAndPort findCompactionCoordinator(ServerContext context) {
+    final String lockPath = context.getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK;
+    byte[] address = context.getZooCache().get(lockPath);
+    String coordinatorAddress = new String(address);
+    return HostAndPort.fromString(coordinatorAddress);
+  }
 }
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftCall.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java
similarity index 96%
rename from server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftCall.java
rename to server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java
index 5ce599e..6981d96 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftCall.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.compactor;
+package org.apache.accumulo.server.compaction;
 
 import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.thrift.TException;
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 public class RetryableThriftCall<T> {
 
   private static final Logger LOG = LoggerFactory.getLogger(RetryableThriftCall.class);
+  public static final long MAX_WAIT_TIME = 60000;
 
   private final long start;
   private final long maxWaitTime;
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftFunction.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftFunction.java
similarity index 95%
rename from server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftFunction.java
rename to server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftFunction.java
index bfdbb4c..e87f020 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftFunction.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftFunction.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.compactor;
+package org.apache.accumulo.server.compaction;
 
 import org.apache.thrift.TException;
 
diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 73670c0..6c747ed 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.coordinator;
 
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
@@ -35,15 +36,16 @@ import java.util.concurrent.TimeUnit;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
 import org.apache.accumulo.core.compaction.thrift.CompactionState;
+import org.apache.accumulo.core.compaction.thrift.Compactor;
 import org.apache.accumulo.core.compaction.thrift.Status;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.dataImpl.thrift.CompactionStats;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.tabletserver.thrift.CompactionJob;
 import org.apache.accumulo.core.tabletserver.thrift.CompactionQueueSummary;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionStats;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.Halt;
@@ -57,6 +59,8 @@ import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.compaction.RetryableThriftCall;
+import org.apache.accumulo.server.compaction.RetryableThriftFunction;
 import org.apache.accumulo.server.manager.LiveTServerSet;
 import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.rpc.ServerAddress;
@@ -200,14 +204,14 @@ public class CompactionCoordinator extends AbstractServer implements
   
   private static class RunningCompaction {
     private final CompactionJob job;
-    private final String compactor;
+    private final String compactorAddress;
     private final TServerInstance tserver;
     private Map<Long, CompactionUpdate> updates = new TreeMap<>();
     private CompactionStats stats = null;
-    public RunningCompaction(CompactionJob job, String compactor, TServerInstance tserver) {
+    public RunningCompaction(CompactionJob job, String compactorAddress, TServerInstance tserver) {
       super();
       this.job = job;
-      this.compactor = compactor;
+      this.compactorAddress = compactorAddress;
       this.tserver = tserver;
     }
     public Map<Long,CompactionUpdate> getUpdates() {
@@ -216,9 +220,6 @@ public class CompactionCoordinator extends AbstractServer implements
     public void addUpdate(Long timestamp, String message, CompactionState state) {
       this.updates.put(timestamp, new CompactionUpdate(timestamp, message, state));
     }
-    public void setUpdates(Map<Long,CompactionUpdate> updates) {
-      this.updates = updates;
-    }
     public CompactionStats getStats() {
       return stats;
     }
@@ -228,8 +229,8 @@ public class CompactionCoordinator extends AbstractServer implements
     public CompactionJob getJob() {
       return job;
     }
-    public String getCompactor() {
-      return compactor;
+    public String getCompactorAddress() {
+      return compactorAddress;
     }
     public TServerInstance getTserver() {
       return tserver;
@@ -293,7 +294,7 @@ public class CompactionCoordinator extends AbstractServer implements
    * @return address of this CompactionCoordinator client service
    * @throws UnknownHostException
    */
-  private ServerAddress startCompactorClientService() throws UnknownHostException {
+  private ServerAddress startCoordinatorClientService() throws UnknownHostException {
     CompactionCoordinator rpcProxy = TraceUtil.wrapService(this);
     final org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<CompactionCoordinator> processor;
     if (getContext().getThriftServerType() == ThriftServerType.SASL) {
@@ -319,7 +320,7 @@ public class CompactionCoordinator extends AbstractServer implements
 
     ServerAddress coordinatorAddress = null;
     try {
-      coordinatorAddress = startCompactorClientService();
+      coordinatorAddress = startCoordinatorClientService();
     } catch (UnknownHostException e1) {
       throw new RuntimeException("Failed to start the coordinator service", e1);
     }
@@ -364,7 +365,7 @@ public class CompactionCoordinator extends AbstractServer implements
   
 
   /**
-   * Callback for the LiveTServerSet object to update us current set of tablet servers, including 
+   * Callback for the LiveTServerSet object to update current set of tablet servers, including 
    * ones that were deleted and added
    * 
    * @param current current set of live tservers
@@ -394,14 +395,14 @@ public class CompactionCoordinator extends AbstractServer implements
   }
 
   /**
-   * Return the next compaction job for the queue to a Compactor
+   * Return the next compaction job from the queue to a Compactor
    * 
    * @param queueName queue
-   * @param compactor compactor address
+   * @param compactorAddress compactor address
    * @return compaction job
    */
   @Override
-  public CompactionJob getCompactionJob(String queueName, String compactor) throws TException {
+  public CompactionJob getCompactionJob(String queueName, String compactorAddress) throws TException {
     String queue = queueName.intern();
     TServerInstance tserver = null;
     Long priority = null;
@@ -434,20 +435,17 @@ public class CompactionCoordinator extends AbstractServer implements
             break;
           }
         }
-      } else {
-        return (CompactionJob) null; //TODO: or should we thrown an error for no tserver for this queue?
       }
     }
-
-    try {
-      TabletClientService.Client client = getTabletServerConnection(tserver);
-      CompactionJob job = client.reserveCompactionJob(queue, priority, compactor);
-      RUNNING.put(job.getCompactionId(), new RunningCompaction(job, compactor, tserver));
-      return job;
-    } catch (TException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+    
+    if (null == tserver) {
+      return null;
     }
+
+    TabletClientService.Client client = getTabletServerConnection(tserver);
+    CompactionJob job = client.reserveCompactionJob(queue, priority, compactorAddress);
+    RUNNING.put(job.getCompactionId(), new RunningCompaction(job, compactorAddress, tserver));
+    return job;
   }
   
   private TabletClientService.Client getTabletServerConnection(TServerInstance tserver) throws TTransportException {
@@ -456,19 +454,43 @@ public class CompactionCoordinator extends AbstractServer implements
     return ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport);
   }
   
+  private Compactor.Client getCompactorConnection(HostAndPort compactorAddress) throws TTransportException {
+    TTransport transport = ThriftTransportPool.getInstance().getTransport(compactorAddress, 0, getContext());
+    return ThriftUtil.createClient(new Compactor.Client.Factory(), transport);
+  }
 
+  /**
+   * Called by the TabletServer to cancel the running compaction.
+   */
   @Override
   public void cancelCompaction(TKeyExtent extent, String queueName, long priority)
       throws TException {
-    // TODO Auto-generated method stub
-    
+    RunningCompaction rc = RUNNING.get(null/* compactionId */); // TODO: Need to change thrift inputs here
+    HostAndPort compactor = HostAndPort.fromString(rc.getCompactorAddress());
+    RetryableThriftCall<Void> cancelThriftCall = new RetryableThriftCall<>(1000,
+        RetryableThriftCall.MAX_WAIT_TIME, 0, new RetryableThriftFunction<Void>() {
+          @Override
+          public Void execute() throws TException {
+            try {
+              getCompactorConnection(compactor).cancel(rc.getJob());
+              return null;
+            } catch (TException e) {
+              throw e;
+            }
+          }
+        });
+    cancelThriftCall.run();
   }
 
   @Override
   public List<Status> getCompactionStatus(TKeyExtent extent, String queueName, long priority)
       throws TException {
-    // TODO Auto-generated method stub
-    return null;
+    RunningCompaction rc = RUNNING.get(null/* compactionId */); // TODO: Need to change thrift inputs here
+    List<Status> status = new ArrayList<>();
+    rc.getUpdates().forEach((k,v) -> {
+      status.add(new Status(v.getTimestamp(), rc.getJob().getCompactionId(), rc.getCompactorAddress(), v.getState(), v.getMessage()));
+    });
+    return status;
   }
 
   /**
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 75d81cf..5eb8cab 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -37,7 +37,6 @@ import java.util.concurrent.atomic.LongAdder;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.compaction.thrift.CompactionCoordinator;
-import org.apache.accumulo.core.compaction.thrift.CompactionJob;
 import org.apache.accumulo.core.compaction.thrift.CompactionState;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
@@ -53,7 +52,9 @@ import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionJob;
 import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionStats;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.HostAndPort;
@@ -72,8 +73,10 @@ import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.ServerOpts;
 import org.apache.accumulo.server.compaction.CompactionInfo;
-import org.apache.accumulo.server.compaction.CompactionStats;
 import org.apache.accumulo.server.compaction.Compactor.CompactionEnv;
+import org.apache.accumulo.server.compaction.ExternalCompactionUtil;
+import org.apache.accumulo.server.compaction.RetryableThriftCall;
+import org.apache.accumulo.server.compaction.RetryableThriftFunction;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
 import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
@@ -173,14 +176,15 @@ public class Compactor extends AbstractServer
 
   private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
   private static final long TIME_BETWEEN_GC_CHECKS = 5000;
-  private static final long MAX_WAIT_TIME = 60000;
+  private static final CompactionJobHolder jobHolder = new CompactionJobHolder();
 
   private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
   private final UUID compactorId = UUID.randomUUID();
   private final AccumuloConfiguration aconf;
   private final String queueName;
-  private final CompactionJobHolder jobHolder;
+  private final AtomicReference<CompactionCoordinator.Client> coordinatorClient = null;
   private ZooLock compactorLock;
+  private ServerAddress compactorAddress = null;
 
   Compactor(CompactorServerOpts opts, String[] args) {
     super("compactor", opts, args);
@@ -188,7 +192,6 @@ public class Compactor extends AbstractServer
     ServerContext context = super.getContext();
     context.setupCrypto();
 
-    this.jobHolder = new CompactionJobHolder();
     aconf = getConfiguration();
     ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(
         () -> gcLogger.logGCInfo(getConfiguration()), 0, TIME_BETWEEN_GC_CHECKS,
@@ -265,26 +268,6 @@ public class Compactor extends AbstractServer
   }
 
   /**
-   * Get the address of the CompactionCoordinator
-   *
-   * @return address of Coordinator
-   */
-  private HostAndPort getCoordinatorAddress() {
-    try {
-      // TODO: Get the coordinator location from ZooKeeper
-      List<String> locations = null;
-      if (locations.isEmpty()) {
-        return null;
-      }
-      return HostAndPort.fromString(locations.get(0));
-    } catch (Exception e) {
-      LOG.warn("Failed to obtain manager host " + e);
-    }
-
-    return null;
-  }
-
-  /**
    * Start this Compactors thrift service to handle incoming client requests
    *
    * @return address of this compactor client service
@@ -341,15 +324,48 @@ public class Compactor extends AbstractServer
    * @param message
    *          updated message
    */
-  private void updateCompactionState(CompactionCoordinator.Client coordinatorClient,
-      CompactionJob job, CompactionState state, String message) {
+  private void updateCompactionState(CompactionJob job, CompactionState state, String message) {
     RetryableThriftCall<Void> thriftCall =
-        new RetryableThriftCall<>(1000, MAX_WAIT_TIME, 25, new RetryableThriftFunction<Void>() {
+        new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<Void>() {
           @Override
           public Void execute() throws TException {
-            coordinatorClient.updateCompactionState(job, state, message,
-                System.currentTimeMillis());
-            return null;
+            try {
+              if (null == coordinatorClient.get()) {
+                coordinatorClient.set(getCoordinatorClient());
+              }
+              coordinatorClient.get().updateCompactionStatus(job, state, message,
+                  System.currentTimeMillis());
+              return null;
+            } catch (TException e) {
+              coordinatorClient.set(null);
+              throw e;
+            }
+          }
+        });
+    thriftCall.run();
+  }
+  
+  /**
+   * Update the coordinator with the stats from the job
+   *  
+   * @param job current compaction job
+   * @param stats compaction stats
+   */
+  private void updateCompactionCompleted(CompactionJob job, CompactionStats stats) {
+    RetryableThriftCall<Void> thriftCall =
+        new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 25, new RetryableThriftFunction<Void>() {
+          @Override
+          public Void execute() throws TException {
+            try {
+              if (null == coordinatorClient.get()) {
+                coordinatorClient.set(getCoordinatorClient());
+              }
+              coordinatorClient.get().compactionCompleted(job, stats);
+              return null;
+            } catch (TException e) {
+              coordinatorClient.set(null);
+              throw e;
+            }
           }
         });
     thriftCall.run();
@@ -364,22 +380,103 @@ public class Compactor extends AbstractServer
    *          address of this Compactor
    * @return CompactionJob
    */
-  private CompactionJob getNextJob(CompactionCoordinator.Client coordinatorClient,
-      String compactorAddress) {
+  private CompactionJob getNextJob() {
     RetryableThriftCall<CompactionJob> nextJobThriftCall = new RetryableThriftCall<>(1000,
-        MAX_WAIT_TIME, 0, new RetryableThriftFunction<CompactionJob>() {
+        RetryableThriftCall.MAX_WAIT_TIME, 0, new RetryableThriftFunction<CompactionJob>() {
           @Override
           public CompactionJob execute() throws TException {
-            return coordinatorClient.getCompactionJob(queueName, compactorAddress);
+            try {
+              if (null == coordinatorClient.get()) {
+                coordinatorClient.set(getCoordinatorClient());
+              }
+              return coordinatorClient.get().getCompactionJob(queueName, getHostPortString(compactorAddress.getAddress()));
+            } catch (TException e) {
+              coordinatorClient.set(null);
+              throw e;
+            }
           }
         });
     return nextJobThriftCall.run();
   }
+  
+  /**
+   * Get the client to the CompactionCoordinator
+   *
+   * @return compaction coordinator client
+   * @throws TTransportException when unable to get client
+   */
+  private CompactionCoordinator.Client getCoordinatorClient() throws TTransportException {
+    HostAndPort coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(getContext());
+    if (null == coordinatorHost) {
+      throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    LOG.info("CompactionCoordinator address is: {}", coordinatorHost);
+      return ThriftUtil.getClient(new CompactionCoordinator.Client.Factory(),
+          coordinatorHost, getContext());
+  }
 
+  /**
+   * Create and return a new CompactionEnv for the current compaction job
+   *
+   * @param job current compaction job
+   * @return new env
+   */
+  private CompactionEnv getCompactionEnvironment(CompactionJob job) {
+    return new CompactionEnv() {
+      @Override
+      public boolean isCompactionEnabled() {
+        return !jobHolder.isCancelled();
+      }
+
+      @Override
+      public IteratorScope getIteratorScope() {
+        return IteratorScope.majc;
+      }
+
+      @Override
+      public RateLimiter getReadLimiter() {
+        return SharedRateLimiterFactory.getInstance(getContext().getConfiguration())
+            .create("read_rate_limiter", () -> job.getReadRate());
+      }
+
+      @Override
+      public RateLimiter getWriteLimiter() {
+        return SharedRateLimiterFactory.getInstance(getContext().getConfiguration())
+            .create("write_rate_limiter", () -> job.getWriteRate());
+      }
+
+      @Override
+      public SystemIteratorEnvironment createIteratorEnv(ServerContext context,
+          AccumuloConfiguration acuTableConf, TableId tableId) {
+        return new TabletIteratorEnvironment(getContext(), IteratorScope.majc,
+            !job.isPropagateDeletes(), acuTableConf, tableId,
+            CompactionKind.valueOf(job.getKind().name()));
+      }
+
+      @Override
+      public SortedKeyValueIterator<Key,Value> getMinCIterator() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public CompactionReason getReason() {
+        switch (job.getKind()) {
+          case USER:
+            return CompactionReason.USER;
+          case CHOP:
+            return CompactionReason.CHOP;
+          case SELECTOR:
+          case SYSTEM:
+          default:
+            return CompactionReason.SYSTEM;
+        }
+      }
+    };  
+  }
+  
   @Override
   public void run() {
 
-    ServerAddress compactorAddress = null;
     try {
       compactorAddress = startCompactorClientService();
     } catch (UnknownHostException e1) {
@@ -393,19 +490,6 @@ public class Compactor extends AbstractServer
       throw new RuntimeException("Erroring registering in ZooKeeper", e);
     }
 
-    HostAndPort coordinatorHost = getCoordinatorAddress();
-    if (null == coordinatorHost) {
-      throw new RuntimeException("Unable to get CompactionCoordinator address from ZooKeeper");
-    }
-    LOG.info("CompactionCoordinator address is: {}", coordinatorHost);
-    CompactionCoordinator.Client coordinatorClient;
-    try {
-      coordinatorClient = ThriftUtil.getClient(new CompactionCoordinator.Client.Factory(),
-          coordinatorHost, getContext());
-    } catch (TTransportException e2) {
-      throw new RuntimeException("Erroring connecting to CompactionCoordinator", e2);
-    }
-
     LOG.info("Compactor started, waiting for work");
     try {
 
@@ -414,8 +498,8 @@ public class Compactor extends AbstractServer
       while (true) {
         err.set(null);
         jobHolder.reset();
-        final CompactionJob job = getNextJob(coordinatorClient, getHostPortString(clientAddress));
-
+        
+        final CompactionJob job = getNextJob();
         LOG.info("Received next compaction job: {}", job);
 
         final LongAdder totalInputSize = new LongAdder();
@@ -425,12 +509,12 @@ public class Compactor extends AbstractServer
 
         Thread compactionThread = Threads.createThread(
             "Compaction job for tablet " + job.getExtent().toString(), new Runnable() {
+              
               @Override
               public void run() {
                 try {
                   LOG.info("Setting up to run compactor");
-                  updateCompactionState(coordinatorClient, job, CompactionState.STARTED,
-                      "Compaction started");
+                  updateCompactionState(job, CompactionState.STARTED, "Compaction started");
 
                   final TableId tableId = TableId.of(new String(job.getExtent().getTable(), UTF_8));
                   final TableConfiguration tConfig = getContext().getTableConfiguration(tableId);
@@ -445,56 +529,7 @@ public class Compactor extends AbstractServer
 
                   final TabletFile outputFile = new TabletFile(new Path(job.getOutputFile()));
 
-                  final CompactionEnv cenv = new CompactionEnv() {
-                    @Override
-                    public boolean isCompactionEnabled() {
-                      return !jobHolder.isCancelled();
-                    }
-
-                    @Override
-                    public IteratorScope getIteratorScope() {
-                      return IteratorScope.majc;
-                    }
-
-                    @Override
-                    public RateLimiter getReadLimiter() {
-                      return SharedRateLimiterFactory.getInstance(getContext().getConfiguration())
-                          .create("read_rate_limiter", () -> job.getReadRate());
-                    }
-
-                    @Override
-                    public RateLimiter getWriteLimiter() {
-                      return SharedRateLimiterFactory.getInstance(getContext().getConfiguration())
-                          .create("write_rate_limiter", () -> job.getWriteRate());
-                    }
-
-                    @Override
-                    public SystemIteratorEnvironment createIteratorEnv(ServerContext context,
-                        AccumuloConfiguration acuTableConf, TableId tableId) {
-                      return new TabletIteratorEnvironment(getContext(), IteratorScope.majc,
-                          !job.isPropagateDeletes(), acuTableConf, tableId,
-                          CompactionKind.valueOf(job.getKind().name()));
-                    }
-
-                    @Override
-                    public SortedKeyValueIterator<Key,Value> getMinCIterator() {
-                      throw new UnsupportedOperationException();
-                    }
-
-                    @Override
-                    public CompactionReason getReason() {
-                      switch (job.getKind()) {
-                        case USER:
-                          return CompactionReason.USER;
-                        case CHOP:
-                          return CompactionReason.CHOP;
-                        case SELECTOR:
-                        case SYSTEM:
-                        default:
-                          return CompactionReason.SYSTEM;
-                      }
-                    }
-                  };
+                  final CompactionEnv cenv = getCompactionEnvironment(job);
 
                   final List<IteratorSetting> iters = new ArrayList<>();
                   job.getIteratorSettings().getIterators()
@@ -507,12 +542,16 @@ public class Compactor extends AbstractServer
 
                   LOG.info("Starting compactor");
                   started.countDown();
-                  jobHolder.setStats(compactor.call());
-
+                  
+                  org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call();
+                  CompactionStats cs = new CompactionStats();
+                  cs.setEntriesRead(stat.getEntriesRead());
+                  cs.setEntriesWritten(stat.getEntriesWritten());
+                  cs.setFileSize(stat.getFileSize());
+                  jobHolder.setStats(cs);
                   LOG.info("Compaction completed successfully");
                   // Update state when completed
-                  updateCompactionState(coordinatorClient, job, CompactionState.SUCCEEDED,
-                      "Compaction completed successfully");
+                  updateCompactionState(job, CompactionState.SUCCEEDED, "Compaction completed successfully");
                 } catch (Exception e) {
                   LOG.error("Compaction failed", e);
                   err.set(e);
@@ -544,16 +583,14 @@ public class Compactor extends AbstractServer
                     info.getEntriesRead(), inputEntries,
                     (info.getEntriesRead() / inputEntries) * 100, info.getEntriesWritten());
                 LOG.info(message);
-                updateCompactionState(coordinatorClient, job, CompactionState.IN_PROGRESS, message);
+                updateCompactionState(job, CompactionState.IN_PROGRESS, message);
               }
             }
-            UtilWaitThread.sleep(MAX_WAIT_TIME);
+            UtilWaitThread.sleep(60000);
           }
           try {
             compactionThread.join();
-            CompactionStats stats = jobHolder.getStats();
-            // TODO: Tell coordinator that we are finished, send stats.
-
+            this.updateCompactionCompleted(job, jobHolder.getStats());
           } catch (InterruptedException e) {
             LOG.error(
                 "Compactor thread was interrupted waiting for compaction to finish, cancelling job",
@@ -570,20 +607,20 @@ public class Compactor extends AbstractServer
 
         if (compactionThread.isInterrupted()) {
           LOG.warn("Compaction thread was interrupted, sending CANCELLED state");
-          updateCompactionState(coordinatorClient, job, CompactionState.CANCELLED,
-              "Compaction cancelled");
+          updateCompactionState(job, CompactionState.CANCELLED, "Compaction cancelled");
         }
 
         Throwable thrown = err.get();
         if (thrown != null) {
-          updateCompactionState(coordinatorClient, job, CompactionState.FAILED,
-              "Compaction failed due to: " + thrown.getMessage());
+          updateCompactionState(job, CompactionState.FAILED, "Compaction failed due to: " + thrown.getMessage());
         }
       }
 
     } finally {
       // close connection to coordinator
-      ThriftUtil.returnClient(coordinatorClient);
+      if (null != coordinatorClient.get()) {
+        ThriftUtil.returnClient(coordinatorClient.get());
+      }
 
       // Shutdown local thrift server
       LOG.debug("Stopping Thrift Servers");