You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/03/02 22:23:51 UTC

[accumulo] branch 1451-external-compactions-feature updated: re #1452: compactor wip

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push:
     new 569c150  re #1452: compactor wip
569c150 is described below

commit 569c150ce9993dc0ce069181c4f17ce35d71873b
Author: Dave Marion <dl...@apache.org>
AuthorDate: Tue Mar 2 22:23:23 2021 +0000

    re #1452: compactor wip
---
 .../java/org/apache/accumulo/core/Constants.java   |   4 +-
 .../compaction/thrift/CompactionCoordinator.java   |  36 +-
 .../core/compaction/thrift/CompactionJob.java      | 274 +++++++-
 .../core/compaction/thrift/CompactionKind.java     |  66 ++
 .../accumulo/core/compaction/thrift/InputFile.java | 687 +++++++++++++++++++++
 .../org/apache/accumulo/core/conf/Property.java    |  25 +-
 core/src/main/thrift/compaction-coordinator.thrift |  33 +-
 pom.xml                                            |   1 +
 .../apache/accumulo/server/fs}/FileManager.java    |   2 +-
 .../accumulo/server/fs}/TooManyFilesException.java |   2 +-
 .../iterators}/TabletIteratorEnvironment.java      |   5 +-
 .../accumulo/server/manager/LiveTServerSet.java    |   2 +-
 server/compactor/.gitignore                        |  28 +
 server/compactor/pom.xml                           |  38 ++
 .../org/apache/accumulo/compactor/Compactor.java   | 616 ++++++++++++++++++
 .../accumulo/compactor/RetryableThriftCall.java    |  92 +++
 .../compactor/RetryableThriftFunction.java}        |  14 +-
 .../accumulo/tserver/ConditionCheckerContext.java  |   1 +
 .../tserver/TabletServerResourceManager.java       |   3 +-
 .../accumulo/tserver/ThriftClientHandler.java      |   1 +
 .../accumulo/tserver/scan/NextBatchTask.java       |   2 +-
 .../accumulo/tserver/tablet/CompactableUtils.java  |   2 +-
 .../accumulo/tserver/tablet/MinorCompactor.java    |   2 +-
 .../accumulo/tserver/tablet/ScanDataSource.java    |   4 +-
 .../org/apache/accumulo/tserver/tablet/Tablet.java |   2 +-
 25 files changed, 1872 insertions(+), 70 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java
index 70353f2..3276eb2 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -59,9 +59,11 @@ public class Constants {
 
   public static final String ZTSERVERS = "/tservers";
 
+  public static final String ZCOMPACTORS = "/compactors";
+
   public static final String ZCOORDINATOR = "/coordinators";
   public static final String ZCOORDINATOR_LOCK = "/coordinators/lock";
-  
+
   public static final String ZDEAD = "/dead";
   public static final String ZDEADTSERVERS = ZDEAD + "/tservers";
 
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinator.java b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinator.java
index 4f4fb31..ffd00f9 100644
--- a/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinator.java
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinator.java
@@ -2594,14 +2594,14 @@ public class CompactionCoordinator {
             case 0: // SUCCESS
               if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
                 {
-                  org.apache.thrift.protocol.TList _list0 = iprot.readListBegin();
-                  struct.success = new java.util.ArrayList<Status>(_list0.size);
-                  @org.apache.thrift.annotation.Nullable Status _elem1;
-                  for (int _i2 = 0; _i2 < _list0.size; ++_i2)
+                  org.apache.thrift.protocol.TList _list8 = iprot.readListBegin();
+                  struct.success = new java.util.ArrayList<Status>(_list8.size);
+                  @org.apache.thrift.annotation.Nullable Status _elem9;
+                  for (int _i10 = 0; _i10 < _list8.size; ++_i10)
                   {
-                    _elem1 = new Status();
-                    _elem1.read(iprot);
-                    struct.success.add(_elem1);
+                    _elem9 = new Status();
+                    _elem9.read(iprot);
+                    struct.success.add(_elem9);
                   }
                   iprot.readListEnd();
                 }
@@ -2629,9 +2629,9 @@ public class CompactionCoordinator {
           oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
           {
             oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.success.size()));
-            for (Status _iter3 : struct.success)
+            for (Status _iter11 : struct.success)
             {
-              _iter3.write(oprot);
+              _iter11.write(oprot);
             }
             oprot.writeListEnd();
           }
@@ -2662,9 +2662,9 @@ public class CompactionCoordinator {
         if (struct.isSetSuccess()) {
           {
             oprot.writeI32(struct.success.size());
-            for (Status _iter4 : struct.success)
+            for (Status _iter12 : struct.success)
             {
-              _iter4.write(oprot);
+              _iter12.write(oprot);
             }
           }
         }
@@ -2676,14 +2676,14 @@ public class CompactionCoordinator {
         java.util.BitSet incoming = iprot.readBitSet(1);
         if (incoming.get(0)) {
           {
-            org.apache.thrift.protocol.TList _list5 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
-            struct.success = new java.util.ArrayList<Status>(_list5.size);
-            @org.apache.thrift.annotation.Nullable Status _elem6;
-            for (int _i7 = 0; _i7 < _list5.size; ++_i7)
+            org.apache.thrift.protocol.TList _list13 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+            struct.success = new java.util.ArrayList<Status>(_list13.size);
+            @org.apache.thrift.annotation.Nullable Status _elem14;
+            for (int _i15 = 0; _i15 < _list13.size; ++_i15)
             {
-              _elem6 = new Status();
-              _elem6.read(iprot);
-              struct.success.add(_elem6);
+              _elem14 = new Status();
+              _elem14.read(iprot);
+              struct.success.add(_elem14);
             }
           }
           struct.setSuccessIsSet(true);
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionJob.java b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionJob.java
index 66fa0ff..ece8893 100644
--- a/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionJob.java
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionJob.java
@@ -40,6 +40,8 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
   private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)11);
   private static final org.apache.thrift.protocol.TField REASON_FIELD_DESC = new org.apache.thrift.protocol.TField("reason", org.apache.thrift.protocol.TType.I32, (short)12);
   private static final org.apache.thrift.protocol.TField OUTPUT_FILE_FIELD_DESC = new org.apache.thrift.protocol.TField("outputFile", org.apache.thrift.protocol.TType.STRING, (short)13);
+  private static final org.apache.thrift.protocol.TField PROPAGATE_DELETES_FIELD_DESC = new org.apache.thrift.protocol.TField("propagateDeletes", org.apache.thrift.protocol.TType.BOOL, (short)14);
+  private static final org.apache.thrift.protocol.TField KIND_FIELD_DESC = new org.apache.thrift.protocol.TField("kind", org.apache.thrift.protocol.TType.I32, (short)15);
 
   private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new CompactionJobStandardSchemeFactory();
   private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new CompactionJobTupleSchemeFactory();
@@ -48,7 +50,7 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
   public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials; // required
   public long compactionId; // required
   public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent; // required
-  public @org.apache.thrift.annotation.Nullable java.util.List<java.lang.String> files; // required
+  public @org.apache.thrift.annotation.Nullable java.util.List<InputFile> files; // required
   public int priority; // required
   public int readRate; // required
   public int writeRate; // required
@@ -64,6 +66,12 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
    */
   public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.tabletserver.thrift.CompactionReason reason; // required
   public @org.apache.thrift.annotation.Nullable java.lang.String outputFile; // required
+  public boolean propagateDeletes; // required
+  /**
+   * 
+   * @see CompactionKind
+   */
+  public @org.apache.thrift.annotation.Nullable CompactionKind kind; // required
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -86,7 +94,13 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
      * @see org.apache.accumulo.core.tabletserver.thrift.CompactionReason
      */
     REASON((short)12, "reason"),
-    OUTPUT_FILE((short)13, "outputFile");
+    OUTPUT_FILE((short)13, "outputFile"),
+    PROPAGATE_DELETES((short)14, "propagateDeletes"),
+    /**
+     * 
+     * @see CompactionKind
+     */
+    KIND((short)15, "kind");
 
     private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
 
@@ -126,6 +140,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
           return REASON;
         case 13: // OUTPUT_FILE
           return OUTPUT_FILE;
+        case 14: // PROPAGATE_DELETES
+          return PROPAGATE_DELETES;
+        case 15: // KIND
+          return KIND;
         default:
           return null;
       }
@@ -171,6 +189,7 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
   private static final int __PRIORITY_ISSET_ID = 1;
   private static final int __READRATE_ISSET_ID = 2;
   private static final int __WRITERATE_ISSET_ID = 3;
+  private static final int __PROPAGATEDELETES_ISSET_ID = 4;
   private byte __isset_bitfield = 0;
   public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
@@ -185,7 +204,7 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
         new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent.class)));
     tmpMap.put(_Fields.FILES, new org.apache.thrift.meta_data.FieldMetaData("files", org.apache.thrift.TFieldRequirementType.DEFAULT, 
         new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, 
-            new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+            new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, InputFile.class))));
     tmpMap.put(_Fields.PRIORITY, new org.apache.thrift.meta_data.FieldMetaData("priority", org.apache.thrift.TFieldRequirementType.DEFAULT, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
     tmpMap.put(_Fields.READ_RATE, new org.apache.thrift.meta_data.FieldMetaData("readRate", org.apache.thrift.TFieldRequirementType.DEFAULT, 
@@ -200,6 +219,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
         new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, org.apache.accumulo.core.tabletserver.thrift.CompactionReason.class)));
     tmpMap.put(_Fields.OUTPUT_FILE, new org.apache.thrift.meta_data.FieldMetaData("outputFile", org.apache.thrift.TFieldRequirementType.DEFAULT, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.PROPAGATE_DELETES, new org.apache.thrift.meta_data.FieldMetaData("propagateDeletes", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
+    tmpMap.put(_Fields.KIND, new org.apache.thrift.meta_data.FieldMetaData("kind", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, CompactionKind.class)));
     metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CompactionJob.class, metaDataMap);
   }
@@ -212,14 +235,16 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
     org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
     long compactionId,
     org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent,
-    java.util.List<java.lang.String> files,
+    java.util.List<InputFile> files,
     int priority,
     int readRate,
     int writeRate,
     org.apache.accumulo.core.tabletserver.thrift.IteratorConfig iteratorSettings,
     org.apache.accumulo.core.tabletserver.thrift.CompactionType type,
     org.apache.accumulo.core.tabletserver.thrift.CompactionReason reason,
-    java.lang.String outputFile)
+    java.lang.String outputFile,
+    boolean propagateDeletes,
+    CompactionKind kind)
   {
     this();
     this.traceInfo = traceInfo;
@@ -238,6 +263,9 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
     this.type = type;
     this.reason = reason;
     this.outputFile = outputFile;
+    this.propagateDeletes = propagateDeletes;
+    setPropagateDeletesIsSet(true);
+    this.kind = kind;
   }
 
   /**
@@ -256,7 +284,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       this.extent = new org.apache.accumulo.core.dataImpl.thrift.TKeyExtent(other.extent);
     }
     if (other.isSetFiles()) {
-      java.util.List<java.lang.String> __this__files = new java.util.ArrayList<java.lang.String>(other.files);
+      java.util.List<InputFile> __this__files = new java.util.ArrayList<InputFile>(other.files.size());
+      for (InputFile other_element : other.files) {
+        __this__files.add(new InputFile(other_element));
+      }
       this.files = __this__files;
     }
     this.priority = other.priority;
@@ -274,6 +305,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
     if (other.isSetOutputFile()) {
       this.outputFile = other.outputFile;
     }
+    this.propagateDeletes = other.propagateDeletes;
+    if (other.isSetKind()) {
+      this.kind = other.kind;
+    }
   }
 
   public CompactionJob deepCopy() {
@@ -298,6 +333,9 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
     this.type = null;
     this.reason = null;
     this.outputFile = null;
+    setPropagateDeletesIsSet(false);
+    this.propagateDeletes = false;
+    this.kind = null;
   }
 
   @org.apache.thrift.annotation.Nullable
@@ -403,23 +441,23 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
   }
 
   @org.apache.thrift.annotation.Nullable
-  public java.util.Iterator<java.lang.String> getFilesIterator() {
+  public java.util.Iterator<InputFile> getFilesIterator() {
     return (this.files == null) ? null : this.files.iterator();
   }
 
-  public void addToFiles(java.lang.String elem) {
+  public void addToFiles(InputFile elem) {
     if (this.files == null) {
-      this.files = new java.util.ArrayList<java.lang.String>();
+      this.files = new java.util.ArrayList<InputFile>();
     }
     this.files.add(elem);
   }
 
   @org.apache.thrift.annotation.Nullable
-  public java.util.List<java.lang.String> getFiles() {
+  public java.util.List<InputFile> getFiles() {
     return this.files;
   }
 
-  public CompactionJob setFiles(@org.apache.thrift.annotation.Nullable java.util.List<java.lang.String> files) {
+  public CompactionJob setFiles(@org.apache.thrift.annotation.Nullable java.util.List<InputFile> files) {
     this.files = files;
     return this;
   }
@@ -624,6 +662,62 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
     }
   }
 
+  public boolean isPropagateDeletes() {
+    return this.propagateDeletes;
+  }
+
+  public CompactionJob setPropagateDeletes(boolean propagateDeletes) {
+    this.propagateDeletes = propagateDeletes;
+    setPropagateDeletesIsSet(true);
+    return this;
+  }
+
+  public void unsetPropagateDeletes() {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __PROPAGATEDELETES_ISSET_ID);
+  }
+
+  /** Returns true if field propagateDeletes is set (has been assigned a value) and false otherwise */
+  public boolean isSetPropagateDeletes() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __PROPAGATEDELETES_ISSET_ID);
+  }
+
+  public void setPropagateDeletesIsSet(boolean value) {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __PROPAGATEDELETES_ISSET_ID, value);
+  }
+
+  /**
+   * 
+   * @see CompactionKind
+   */
+  @org.apache.thrift.annotation.Nullable
+  public CompactionKind getKind() {
+    return this.kind;
+  }
+
+  /**
+   * 
+   * @see CompactionKind
+   */
+  public CompactionJob setKind(@org.apache.thrift.annotation.Nullable CompactionKind kind) {
+    this.kind = kind;
+    return this;
+  }
+
+  public void unsetKind() {
+    this.kind = null;
+  }
+
+  /** Returns true if field kind is set (has been assigned a value) and false otherwise */
+  public boolean isSetKind() {
+    return this.kind != null;
+  }
+
+  public void setKindIsSet(boolean value) {
+    if (!value) {
+      this.kind = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
     switch (field) {
     case TRACE_INFO:
@@ -662,7 +756,7 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       if (value == null) {
         unsetFiles();
       } else {
-        setFiles((java.util.List<java.lang.String>)value);
+        setFiles((java.util.List<InputFile>)value);
       }
       break;
 
@@ -722,6 +816,22 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       }
       break;
 
+    case PROPAGATE_DELETES:
+      if (value == null) {
+        unsetPropagateDeletes();
+      } else {
+        setPropagateDeletes((java.lang.Boolean)value);
+      }
+      break;
+
+    case KIND:
+      if (value == null) {
+        unsetKind();
+      } else {
+        setKind((CompactionKind)value);
+      }
+      break;
+
     }
   }
 
@@ -764,6 +874,12 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
     case OUTPUT_FILE:
       return getOutputFile();
 
+    case PROPAGATE_DELETES:
+      return isPropagateDeletes();
+
+    case KIND:
+      return getKind();
+
     }
     throw new java.lang.IllegalStateException();
   }
@@ -799,6 +915,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       return isSetReason();
     case OUTPUT_FILE:
       return isSetOutputFile();
+    case PROPAGATE_DELETES:
+      return isSetPropagateDeletes();
+    case KIND:
+      return isSetKind();
     }
     throw new java.lang.IllegalStateException();
   }
@@ -926,6 +1046,24 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
         return false;
     }
 
+    boolean this_present_propagateDeletes = true;
+    boolean that_present_propagateDeletes = true;
+    if (this_present_propagateDeletes || that_present_propagateDeletes) {
+      if (!(this_present_propagateDeletes && that_present_propagateDeletes))
+        return false;
+      if (this.propagateDeletes != that.propagateDeletes)
+        return false;
+    }
+
+    boolean this_present_kind = true && this.isSetKind();
+    boolean that_present_kind = true && that.isSetKind();
+    if (this_present_kind || that_present_kind) {
+      if (!(this_present_kind && that_present_kind))
+        return false;
+      if (!this.kind.equals(that.kind))
+        return false;
+    }
+
     return true;
   }
 
@@ -973,6 +1111,12 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
     if (isSetOutputFile())
       hashCode = hashCode * 8191 + outputFile.hashCode();
 
+    hashCode = hashCode * 8191 + ((propagateDeletes) ? 131071 : 524287);
+
+    hashCode = hashCode * 8191 + ((isSetKind()) ? 131071 : 524287);
+    if (isSetKind())
+      hashCode = hashCode * 8191 + kind.getValue();
+
     return hashCode;
   }
 
@@ -1104,6 +1248,26 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
         return lastComparison;
       }
     }
+    lastComparison = java.lang.Boolean.valueOf(isSetPropagateDeletes()).compareTo(other.isSetPropagateDeletes());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetPropagateDeletes()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.propagateDeletes, other.propagateDeletes);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = java.lang.Boolean.valueOf(isSetKind()).compareTo(other.isSetKind());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetKind()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.kind, other.kind);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -1204,6 +1368,18 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       sb.append(this.outputFile);
     }
     first = false;
+    if (!first) sb.append(", ");
+    sb.append("propagateDeletes:");
+    sb.append(this.propagateDeletes);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("kind:");
+    if (this.kind == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.kind);
+    }
+    first = false;
     sb.append(")");
     return sb.toString();
   }
@@ -1300,11 +1476,12 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
             if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
               {
                 org.apache.thrift.protocol.TList _list0 = iprot.readListBegin();
-                struct.files = new java.util.ArrayList<java.lang.String>(_list0.size);
-                @org.apache.thrift.annotation.Nullable java.lang.String _elem1;
+                struct.files = new java.util.ArrayList<InputFile>(_list0.size);
+                @org.apache.thrift.annotation.Nullable InputFile _elem1;
                 for (int _i2 = 0; _i2 < _list0.size; ++_i2)
                 {
-                  _elem1 = iprot.readString();
+                  _elem1 = new InputFile();
+                  _elem1.read(iprot);
                   struct.files.add(_elem1);
                 }
                 iprot.readListEnd();
@@ -1371,6 +1548,22 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 14: // PROPAGATE_DELETES
+            if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+              struct.propagateDeletes = iprot.readBool();
+              struct.setPropagateDeletesIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 15: // KIND
+            if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+              struct.kind = org.apache.accumulo.core.compaction.thrift.CompactionKind.findByValue(iprot.readI32());
+              struct.setKindIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -1407,10 +1600,10 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       if (struct.files != null) {
         oprot.writeFieldBegin(FILES_FIELD_DESC);
         {
-          oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.files.size()));
-          for (java.lang.String _iter3 : struct.files)
+          oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.files.size()));
+          for (InputFile _iter3 : struct.files)
           {
-            oprot.writeString(_iter3);
+            _iter3.write(oprot);
           }
           oprot.writeListEnd();
         }
@@ -1445,6 +1638,14 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
         oprot.writeString(struct.outputFile);
         oprot.writeFieldEnd();
       }
+      oprot.writeFieldBegin(PROPAGATE_DELETES_FIELD_DESC);
+      oprot.writeBool(struct.propagateDeletes);
+      oprot.writeFieldEnd();
+      if (struct.kind != null) {
+        oprot.writeFieldBegin(KIND_FIELD_DESC);
+        oprot.writeI32(struct.kind.getValue());
+        oprot.writeFieldEnd();
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1499,7 +1700,13 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       if (struct.isSetOutputFile()) {
         optionals.set(11);
       }
-      oprot.writeBitSet(optionals, 12);
+      if (struct.isSetPropagateDeletes()) {
+        optionals.set(12);
+      }
+      if (struct.isSetKind()) {
+        optionals.set(13);
+      }
+      oprot.writeBitSet(optionals, 14);
       if (struct.isSetTraceInfo()) {
         struct.traceInfo.write(oprot);
       }
@@ -1515,9 +1722,9 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       if (struct.isSetFiles()) {
         {
           oprot.writeI32(struct.files.size());
-          for (java.lang.String _iter4 : struct.files)
+          for (InputFile _iter4 : struct.files)
           {
-            oprot.writeString(_iter4);
+            _iter4.write(oprot);
           }
         }
       }
@@ -1542,12 +1749,18 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       if (struct.isSetOutputFile()) {
         oprot.writeString(struct.outputFile);
       }
+      if (struct.isSetPropagateDeletes()) {
+        oprot.writeBool(struct.propagateDeletes);
+      }
+      if (struct.isSetKind()) {
+        oprot.writeI32(struct.kind.getValue());
+      }
     }
 
     @Override
     public void read(org.apache.thrift.protocol.TProtocol prot, CompactionJob struct) throws org.apache.thrift.TException {
       org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
-      java.util.BitSet incoming = iprot.readBitSet(12);
+      java.util.BitSet incoming = iprot.readBitSet(14);
       if (incoming.get(0)) {
         struct.traceInfo = new org.apache.accumulo.core.trace.thrift.TInfo();
         struct.traceInfo.read(iprot);
@@ -1569,12 +1782,13 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
       }
       if (incoming.get(4)) {
         {
-          org.apache.thrift.protocol.TList _list5 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
-          struct.files = new java.util.ArrayList<java.lang.String>(_list5.size);
-          @org.apache.thrift.annotation.Nullable java.lang.String _elem6;
+          org.apache.thrift.protocol.TList _list5 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+          struct.files = new java.util.ArrayList<InputFile>(_list5.size);
+          @org.apache.thrift.annotation.Nullable InputFile _elem6;
           for (int _i7 = 0; _i7 < _list5.size; ++_i7)
           {
-            _elem6 = iprot.readString();
+            _elem6 = new InputFile();
+            _elem6.read(iprot);
             struct.files.add(_elem6);
           }
         }
@@ -1609,6 +1823,14 @@ public class CompactionJob implements org.apache.thrift.TBase<CompactionJob, Com
         struct.outputFile = iprot.readString();
         struct.setOutputFileIsSet(true);
       }
+      if (incoming.get(12)) {
+        struct.propagateDeletes = iprot.readBool();
+        struct.setPropagateDeletesIsSet(true);
+      }
+      if (incoming.get(13)) {
+        struct.kind = org.apache.accumulo.core.compaction.thrift.CompactionKind.findByValue(iprot.readI32());
+        struct.setKindIsSet(true);
+      }
     }
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionKind.java b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionKind.java
new file mode 100644
index 0000000..4d05032
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/CompactionKind.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.12.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.accumulo.core.compaction.thrift;
+
+
+public enum CompactionKind implements org.apache.thrift.TEnum {
+  CHOP(0),
+  SELECTOR(1),
+  SYSTEM(2),
+  USER(3);
+
+  private final int value;
+
+  private CompactionKind(int value) {
+    this.value = value;
+  }
+
+  /**
+   * Get the integer value of this enum value, as defined in the Thrift IDL.
+   */
+  public int getValue() {
+    return value;
+  }
+
+  /**
+   * Find a the enum type by its integer value, as defined in the Thrift IDL.
+   * @return null if the value is not found.
+   */
+  @org.apache.thrift.annotation.Nullable
+  public static CompactionKind findByValue(int value) { 
+    switch (value) {
+      case 0:
+        return CHOP;
+      case 1:
+        return SELECTOR;
+      case 2:
+        return SYSTEM;
+      case 3:
+        return USER;
+      default:
+        return null;
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/compaction/thrift/InputFile.java b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/InputFile.java
new file mode 100644
index 0000000..d191a8b
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/compaction/thrift/InputFile.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.12.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.accumulo.core.compaction.thrift;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+public class InputFile implements org.apache.thrift.TBase<InputFile, InputFile._Fields>, java.io.Serializable, Cloneable, Comparable<InputFile> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InputFile");
+
+  private static final org.apache.thrift.protocol.TField METADATA_FILE_ENTRY_FIELD_DESC = new org.apache.thrift.protocol.TField("metadataFileEntry", org.apache.thrift.protocol.TType.STRING, (short)1);
+  private static final org.apache.thrift.protocol.TField SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("size", org.apache.thrift.protocol.TType.I64, (short)2);
+  private static final org.apache.thrift.protocol.TField ENTRIES_FIELD_DESC = new org.apache.thrift.protocol.TField("entries", org.apache.thrift.protocol.TType.I64, (short)3);
+  private static final org.apache.thrift.protocol.TField TIMESTAMP_FIELD_DESC = new org.apache.thrift.protocol.TField("timestamp", org.apache.thrift.protocol.TType.I64, (short)4);
+
+  private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new InputFileStandardSchemeFactory();
+  private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new InputFileTupleSchemeFactory();
+
+  public @org.apache.thrift.annotation.Nullable java.lang.String metadataFileEntry; // required
+  public long size; // required
+  public long entries; // required
+  public long timestamp; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    METADATA_FILE_ENTRY((short)1, "metadataFileEntry"),
+    SIZE((short)2, "size"),
+    ENTRIES((short)3, "entries"),
+    TIMESTAMP((short)4, "timestamp");
+
+    private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+    static {
+      for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    @org.apache.thrift.annotation.Nullable
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // METADATA_FILE_ENTRY
+          return METADATA_FILE_ENTRY;
+        case 2: // SIZE
+          return SIZE;
+        case 3: // ENTRIES
+          return ENTRIES;
+        case 4: // TIMESTAMP
+          return TIMESTAMP;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    @org.apache.thrift.annotation.Nullable
+    public static _Fields findByName(java.lang.String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final java.lang.String _fieldName;
+
+    _Fields(short thriftId, java.lang.String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public java.lang.String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __SIZE_ISSET_ID = 0;
+  private static final int __ENTRIES_ISSET_ID = 1;
+  private static final int __TIMESTAMP_ISSET_ID = 2;
+  private byte __isset_bitfield = 0;
+  public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.METADATA_FILE_ENTRY, new org.apache.thrift.meta_data.FieldMetaData("metadataFileEntry", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.SIZE, new org.apache.thrift.meta_data.FieldMetaData("size", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    tmpMap.put(_Fields.ENTRIES, new org.apache.thrift.meta_data.FieldMetaData("entries", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    tmpMap.put(_Fields.TIMESTAMP, new org.apache.thrift.meta_data.FieldMetaData("timestamp", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(InputFile.class, metaDataMap);
+  }
+
+  public InputFile() {
+  }
+
+  public InputFile(
+    java.lang.String metadataFileEntry,
+    long size,
+    long entries,
+    long timestamp)
+  {
+    this();
+    this.metadataFileEntry = metadataFileEntry;
+    this.size = size;
+    setSizeIsSet(true);
+    this.entries = entries;
+    setEntriesIsSet(true);
+    this.timestamp = timestamp;
+    setTimestampIsSet(true);
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public InputFile(InputFile other) {
+    __isset_bitfield = other.__isset_bitfield;
+    if (other.isSetMetadataFileEntry()) {
+      this.metadataFileEntry = other.metadataFileEntry;
+    }
+    this.size = other.size;
+    this.entries = other.entries;
+    this.timestamp = other.timestamp;
+  }
+
+  public InputFile deepCopy() {
+    return new InputFile(this);
+  }
+
+  @Override
+  public void clear() {
+    this.metadataFileEntry = null;
+    setSizeIsSet(false);
+    this.size = 0;
+    setEntriesIsSet(false);
+    this.entries = 0;
+    setTimestampIsSet(false);
+    this.timestamp = 0;
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  public java.lang.String getMetadataFileEntry() {
+    return this.metadataFileEntry;
+  }
+
+  public InputFile setMetadataFileEntry(@org.apache.thrift.annotation.Nullable java.lang.String metadataFileEntry) {
+    this.metadataFileEntry = metadataFileEntry;
+    return this;
+  }
+
+  public void unsetMetadataFileEntry() {
+    this.metadataFileEntry = null;
+  }
+
+  /** Returns true if field metadataFileEntry is set (has been assigned a value) and false otherwise */
+  public boolean isSetMetadataFileEntry() {
+    return this.metadataFileEntry != null;
+  }
+
+  public void setMetadataFileEntryIsSet(boolean value) {
+    if (!value) {
+      this.metadataFileEntry = null;
+    }
+  }
+
+  public long getSize() {
+    return this.size;
+  }
+
+  public InputFile setSize(long size) {
+    this.size = size;
+    setSizeIsSet(true);
+    return this;
+  }
+
+  public void unsetSize() {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __SIZE_ISSET_ID);
+  }
+
+  /** Returns true if field size is set (has been assigned a value) and false otherwise */
+  public boolean isSetSize() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __SIZE_ISSET_ID);
+  }
+
+  public void setSizeIsSet(boolean value) {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __SIZE_ISSET_ID, value);
+  }
+
+  public long getEntries() {
+    return this.entries;
+  }
+
+  public InputFile setEntries(long entries) {
+    this.entries = entries;
+    setEntriesIsSet(true);
+    return this;
+  }
+
+  public void unsetEntries() {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __ENTRIES_ISSET_ID);
+  }
+
+  /** Returns true if field entries is set (has been assigned a value) and false otherwise */
+  public boolean isSetEntries() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __ENTRIES_ISSET_ID);
+  }
+
+  public void setEntriesIsSet(boolean value) {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __ENTRIES_ISSET_ID, value);
+  }
+
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  public InputFile setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+    setTimestampIsSet(true);
+    return this;
+  }
+
+  public void unsetTimestamp() {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __TIMESTAMP_ISSET_ID);
+  }
+
+  /** Returns true if field timestamp is set (has been assigned a value) and false otherwise */
+  public boolean isSetTimestamp() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __TIMESTAMP_ISSET_ID);
+  }
+
+  public void setTimestampIsSet(boolean value) {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __TIMESTAMP_ISSET_ID, value);
+  }
+
+  public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
+    switch (field) {
+    case METADATA_FILE_ENTRY:
+      if (value == null) {
+        unsetMetadataFileEntry();
+      } else {
+        setMetadataFileEntry((java.lang.String)value);
+      }
+      break;
+
+    case SIZE:
+      if (value == null) {
+        unsetSize();
+      } else {
+        setSize((java.lang.Long)value);
+      }
+      break;
+
+    case ENTRIES:
+      if (value == null) {
+        unsetEntries();
+      } else {
+        setEntries((java.lang.Long)value);
+      }
+      break;
+
+    case TIMESTAMP:
+      if (value == null) {
+        unsetTimestamp();
+      } else {
+        setTimestamp((java.lang.Long)value);
+      }
+      break;
+
+    }
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  public java.lang.Object getFieldValue(_Fields field) {
+    switch (field) {
+    case METADATA_FILE_ENTRY:
+      return getMetadataFileEntry();
+
+    case SIZE:
+      return getSize();
+
+    case ENTRIES:
+      return getEntries();
+
+    case TIMESTAMP:
+      return getTimestamp();
+
+    }
+    throw new java.lang.IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new java.lang.IllegalArgumentException();
+    }
+
+    switch (field) {
+    case METADATA_FILE_ENTRY:
+      return isSetMetadataFileEntry();
+    case SIZE:
+      return isSetSize();
+    case ENTRIES:
+      return isSetEntries();
+    case TIMESTAMP:
+      return isSetTimestamp();
+    }
+    throw new java.lang.IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(java.lang.Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof InputFile)
+      return this.equals((InputFile)that);
+    return false;
+  }
+
+  public boolean equals(InputFile that) {
+    if (that == null)
+      return false;
+    if (this == that)
+      return true;
+
+    boolean this_present_metadataFileEntry = true && this.isSetMetadataFileEntry();
+    boolean that_present_metadataFileEntry = true && that.isSetMetadataFileEntry();
+    if (this_present_metadataFileEntry || that_present_metadataFileEntry) {
+      if (!(this_present_metadataFileEntry && that_present_metadataFileEntry))
+        return false;
+      if (!this.metadataFileEntry.equals(that.metadataFileEntry))
+        return false;
+    }
+
+    boolean this_present_size = true;
+    boolean that_present_size = true;
+    if (this_present_size || that_present_size) {
+      if (!(this_present_size && that_present_size))
+        return false;
+      if (this.size != that.size)
+        return false;
+    }
+
+    boolean this_present_entries = true;
+    boolean that_present_entries = true;
+    if (this_present_entries || that_present_entries) {
+      if (!(this_present_entries && that_present_entries))
+        return false;
+      if (this.entries != that.entries)
+        return false;
+    }
+
+    boolean this_present_timestamp = true;
+    boolean that_present_timestamp = true;
+    if (this_present_timestamp || that_present_timestamp) {
+      if (!(this_present_timestamp && that_present_timestamp))
+        return false;
+      if (this.timestamp != that.timestamp)
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int hashCode = 1;
+
+    hashCode = hashCode * 8191 + ((isSetMetadataFileEntry()) ? 131071 : 524287);
+    if (isSetMetadataFileEntry())
+      hashCode = hashCode * 8191 + metadataFileEntry.hashCode();
+
+    hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(size);
+
+    hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(entries);
+
+    hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(timestamp);
+
+    return hashCode;
+  }
+
+  @Override
+  public int compareTo(InputFile other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = java.lang.Boolean.valueOf(isSetMetadataFileEntry()).compareTo(other.isSetMetadataFileEntry());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetMetadataFileEntry()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.metadataFileEntry, other.metadataFileEntry);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = java.lang.Boolean.valueOf(isSetSize()).compareTo(other.isSetSize());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetSize()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.size, other.size);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = java.lang.Boolean.valueOf(isSetEntries()).compareTo(other.isSetEntries());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetEntries()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.entries, other.entries);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = java.lang.Boolean.valueOf(isSetTimestamp()).compareTo(other.isSetTimestamp());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetTimestamp()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.timestamp, other.timestamp);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    scheme(iprot).read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    scheme(oprot).write(oprot, this);
+  }
+
+  @Override
+  public java.lang.String toString() {
+    java.lang.StringBuilder sb = new java.lang.StringBuilder("InputFile(");
+    boolean first = true;
+
+    sb.append("metadataFileEntry:");
+    if (this.metadataFileEntry == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.metadataFileEntry);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("size:");
+    sb.append(this.size);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("entries:");
+    sb.append(this.entries);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("timestamp:");
+    sb.append(this.timestamp);
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+    try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class InputFileStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+    public InputFileStandardScheme getScheme() {
+      return new InputFileStandardScheme();
+    }
+  }
+
+  private static class InputFileStandardScheme extends org.apache.thrift.scheme.StandardScheme<InputFile> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, InputFile struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // METADATA_FILE_ENTRY
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.metadataFileEntry = iprot.readString();
+              struct.setMetadataFileEntryIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // SIZE
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.size = iprot.readI64();
+              struct.setSizeIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 3: // ENTRIES
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.entries = iprot.readI64();
+              struct.setEntriesIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 4: // TIMESTAMP
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.timestamp = iprot.readI64();
+              struct.setTimestampIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, InputFile struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.metadataFileEntry != null) {
+        oprot.writeFieldBegin(METADATA_FILE_ENTRY_FIELD_DESC);
+        oprot.writeString(struct.metadataFileEntry);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldBegin(SIZE_FIELD_DESC);
+      oprot.writeI64(struct.size);
+      oprot.writeFieldEnd();
+      oprot.writeFieldBegin(ENTRIES_FIELD_DESC);
+      oprot.writeI64(struct.entries);
+      oprot.writeFieldEnd();
+      oprot.writeFieldBegin(TIMESTAMP_FIELD_DESC);
+      oprot.writeI64(struct.timestamp);
+      oprot.writeFieldEnd();
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class InputFileTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+    public InputFileTupleScheme getScheme() {
+      return new InputFileTupleScheme();
+    }
+  }
+
+  private static class InputFileTupleScheme extends org.apache.thrift.scheme.TupleScheme<InputFile> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, InputFile struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+      java.util.BitSet optionals = new java.util.BitSet();
+      if (struct.isSetMetadataFileEntry()) {
+        optionals.set(0);
+      }
+      if (struct.isSetSize()) {
+        optionals.set(1);
+      }
+      if (struct.isSetEntries()) {
+        optionals.set(2);
+      }
+      if (struct.isSetTimestamp()) {
+        optionals.set(3);
+      }
+      oprot.writeBitSet(optionals, 4);
+      if (struct.isSetMetadataFileEntry()) {
+        oprot.writeString(struct.metadataFileEntry);
+      }
+      if (struct.isSetSize()) {
+        oprot.writeI64(struct.size);
+      }
+      if (struct.isSetEntries()) {
+        oprot.writeI64(struct.entries);
+      }
+      if (struct.isSetTimestamp()) {
+        oprot.writeI64(struct.timestamp);
+      }
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, InputFile struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+      java.util.BitSet incoming = iprot.readBitSet(4);
+      if (incoming.get(0)) {
+        struct.metadataFileEntry = iprot.readString();
+        struct.setMetadataFileEntryIsSet(true);
+      }
+      if (incoming.get(1)) {
+        struct.size = iprot.readI64();
+        struct.setSizeIsSet(true);
+      }
+      if (incoming.get(2)) {
+        struct.entries = iprot.readI64();
+        struct.setEntriesIsSet(true);
+      }
+      if (incoming.get(3)) {
+        struct.timestamp = iprot.readI64();
+        struct.setTimestampIsSet(true);
+      }
+    }
+  }
+
+  private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+    return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+  }
+  private static void unusedMethod() {}
+}
+
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index f05794f..48b983b 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1024,6 +1024,21 @@ public enum Property {
   REPLICATION_RPC_TIMEOUT("replication.rpc.timeout", "2m", PropertyType.TIMEDURATION,
       "Amount of time for a single replication RPC call to last before failing"
           + " the attempt. See replication.work.attempts."),
+  // Compactor properties
+  COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX,
+      "Properties in this category affect the behavior of the accumulo compactor server."),
+  COMPACTOR_PORTSEARCH("compactor.port.search", "false", PropertyType.BOOLEAN,
+      "if the ports above are in use, search higher ports until one is available"),
+  COMPACTOR_CLIENTPORT("compactor.port.client", "9100", PropertyType.PORT,
+      "The port used for handling client connections on the compactor servers"),
+  COMPACTOR_MINTHREADS("compactor.server.threads.minimum", "1", PropertyType.COUNT,
+      "The minimum number of threads to use to handle incoming requests."),
+  COMPACTOR_MINTHREADS_TIMEOUT("compactor.server.threads.timeout", "0s", PropertyType.TIMEDURATION,
+      "The time after which incoming request threads terminate with no work available.  Zero (0) will keep the threads alive indefinitely."),
+  COMPACTOR_THREADCHECK("compactor.server.threadcheck.time", "1s", PropertyType.TIMEDURATION,
+      "The time between adjustments of the server thread pool."),
+  COMPACTOR_MAX_MESSAGE_SIZE("compactor.server.message.size.max", "10M", PropertyType.BYTES,
+      "The maximum size of a message that can be sent to a tablet server."),
   // CompactionCoordinator properties
   COORDINATOR_PREFIX("coordinator.", null, PropertyType.PREFIX,
       "Properties in this category affect the behavior of the accumulo compaction coordinator server."),
@@ -1033,12 +1048,18 @@ public enum Property {
       "The port used for handling client connections on the compactor servers"),
   COORDINATOR_MINTHREADS("coordinator.server.threads.minimum", "1", PropertyType.COUNT,
       "The minimum number of threads to use to handle incoming requests."),
-  COORDINATOR_MINTHREADS_TIMEOUT("coordinator.server.threads.timeout", "0s", PropertyType.TIMEDURATION,
+  COORDINATOR_MINTHREADS_TIMEOUT("coordinator.server.threads.timeout", "0s",
+      PropertyType.TIMEDURATION,
       "The time after which incoming request threads terminate with no work available.  Zero (0) will keep the threads alive indefinitely."),
   COORDINATOR_THREADCHECK("coordinator.server.threadcheck.time", "1s", PropertyType.TIMEDURATION,
       "The time between adjustments of the server thread pool."),
   COORDINATOR_MAX_MESSAGE_SIZE("coordinator.server.message.size.max", "10M", PropertyType.BYTES,
-      "The maximum size of a message that can be sent to a tablet server."),  // deprecated properties grouped at the end to reference property that replaces them
+      "The maximum size of a message that can be sent to a tablet server."), // deprecated
+                                                                             // properties grouped
+                                                                             // at the end to
+                                                                             // reference property
+                                                                             // that replaces them
+  // deprecated properties grouped at the end to reference property that replaces them
   @Deprecated(since = "1.6.0")
   @ReplacedBy(property = INSTANCE_VOLUMES)
   INSTANCE_DFS_URI("instance.dfs.uri", "", PropertyType.URI,
diff --git a/core/src/main/thrift/compaction-coordinator.thrift b/core/src/main/thrift/compaction-coordinator.thrift
index cc702bc..7cc2caf 100644
--- a/core/src/main/thrift/compaction-coordinator.thrift
+++ b/core/src/main/thrift/compaction-coordinator.thrift
@@ -39,6 +39,37 @@ enum CompactionState {
   CANCELLED
 }
 
+struct InputFile {
+  1:string metadataFileEntry
+  2:i64 size
+  3:i64 entries
+  4:i64 timestamp
+}
+
+enum CompactionKind {
+  CHOP
+  SELECTOR
+  SYSTEM
+  USER
+}
+
+struct CompactionJob {
+  1:trace.TInfo traceInfo
+  2:security.TCredentials credentials
+  3:i64 compactionId
+  5:data.TKeyExtent extent
+  6:list<InputFile> files
+  7:i32 priority
+  8:i32 readRate
+  9:i32 writeRate
+  10:tabletserver.IteratorConfig iteratorSettings
+  11:tabletserver.CompactionType type
+  12:tabletserver.CompactionReason reason
+  13:string outputFile
+  14:bool propagateDeletes
+  15:CompactionKind kind
+}
+
 struct Status {
   1:i64 timestamp
   2:i64 compactionId
@@ -104,4 +135,4 @@ service Compactor {
     1:tabletserver.CompactionJob compaction
   )
 
-}
\ No newline at end of file
+}
diff --git a/pom.xml b/pom.xml
index d2bd521..1d74ca2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,7 @@
     <module>iterator-test-harness</module>
     <module>minicluster</module>
     <module>server/base</module>
+    <module>server/compactor</module>
     <module>server/compaction-coordinator</module>
     <module>server/gc</module>
     <module>server/manager</module>
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
similarity index 99%
rename from server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
rename to server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
index e0d70fc..1335e44 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.tserver;
+package org.apache.accumulo.server.fs;
 
 import java.io.IOException;
 import java.util.ArrayList;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java b/server/base/src/main/java/org/apache/accumulo/server/fs/TooManyFilesException.java
similarity index 96%
copy from server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java
copy to server/base/src/main/java/org/apache/accumulo/server/fs/TooManyFilesException.java
index 243d36c..7adae11 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/TooManyFilesException.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.tserver;
+package org.apache.accumulo.server.fs;
 
 import java.io.IOException;
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java b/server/base/src/main/java/org/apache/accumulo/server/iterators/TabletIteratorEnvironment.java
similarity index 97%
rename from server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
rename to server/base/src/main/java/org/apache/accumulo/server/iterators/TabletIteratorEnvironment.java
index 9d4467d..6311200 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/iterators/TabletIteratorEnvironment.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.tserver;
+package org.apache.accumulo.server.iterators;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -41,8 +41,7 @@ import org.apache.accumulo.core.spi.common.ServiceEnvironment;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.ServiceEnvironmentImpl;
-import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
-import org.apache.accumulo.tserver.FileManager.ScanFileManager;
+import org.apache.accumulo.server.fs.FileManager.ScanFileManager;
 import org.apache.hadoop.fs.Path;
 
 public class TabletIteratorEnvironment implements SystemIteratorEnvironment {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index b318835..4f8e894 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@ -77,7 +77,7 @@ public class LiveTServerSet implements Watcher {
     public TServerConnection(HostAndPort addr) {
       address = addr;
     }
-    
+
     public HostAndPort getAddress() {
       return address;
     }
diff --git a/server/compactor/.gitignore b/server/compactor/.gitignore
new file mode 100644
index 0000000..e77a822
--- /dev/null
+++ b/server/compactor/.gitignore
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Maven ignores
+/target/
+
+# IDE ignores
+/.settings/
+/.project
+/.classpath
+/.pydevproject
+/.idea
+/*.iml
+/nbproject/
+/nbactions.xml
+/nb-configuration.xml
diff --git a/server/compactor/pom.xml b/server/compactor/pom.xml
new file mode 100644
index 0000000..1ed866f
--- /dev/null
+++ b/server/compactor/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.accumulo</groupId>
+    <artifactId>accumulo-project</artifactId>
+    <version>2.1.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+  <artifactId>accumulo-compactor</artifactId>
+  <name>Apache Accumulo Compactor</name>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-server-base</artifactId>
+    </dependency>
+  </dependencies>
+</project>
\ No newline at end of file
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
new file mode 100644
index 0000000..75d81cf
--- /dev/null
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.compactor;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.compaction.thrift.CompactionCoordinator;
+import org.apache.accumulo.core.compaction.thrift.CompactionJob;
+import org.apache.accumulo.core.compaction.thrift.CompactionState;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.TabletFile;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.core.util.threads.Threads;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ZooLock;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.AbstractServer;
+import org.apache.accumulo.server.GarbageCollectionLogger;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.compaction.CompactionInfo;
+import org.apache.accumulo.server.compaction.CompactionStats;
+import org.apache.accumulo.server.compaction.Compactor.CompactionEnv;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
+import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.hadoop.fs.Path;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+public class Compactor extends AbstractServer
+    implements org.apache.accumulo.core.compaction.thrift.Compactor.Iface {
+
+  private static class CompactorServerOpts extends ServerOpts {
+    @Parameter(required = true, names = {"-q", "--queue"}, description = "compaction queue name")
+    private String queueName = null;
+
+    public String getQueueName() {
+      return queueName;
+    }
+  }
+
+  /**
+   * Object used to hold information about the current compaction
+   */
+  private static class CompactionJobHolder {
+    private CompactionJob job;
+    private Thread compactionThread;
+    private volatile Boolean cancelled = Boolean.FALSE;
+    private CompactionStats stats = null;
+
+    public CompactionJobHolder() {}
+
+    public void reset() {
+      job = null;
+      compactionThread = null;
+      cancelled = Boolean.FALSE;
+      stats = null;
+    }
+
+    public CompactionJob getJob() {
+      return job;
+    }
+
+    public Thread getThread() {
+      return compactionThread;
+    }
+
+    public CompactionStats getStats() {
+      return stats;
+    }
+
+    public void setStats(CompactionStats stats) {
+      this.stats = stats;
+    }
+
+    public void cancel() {
+      cancelled = Boolean.TRUE;
+    }
+
+    public boolean isCancelled() {
+      return cancelled;
+    }
+
+    public boolean isSet() {
+      return (null != this.job);
+    }
+
+    public void set(CompactionJob job, Thread compactionThread) {
+      Objects.requireNonNull(job, "CompactionJob is null");
+      Objects.requireNonNull(compactionThread, "Compaction thread is null");
+      this.job = job;
+      this.compactionThread = compactionThread;
+    }
+
+  }
+
+  /**
+   * Utility for returning the address in the form host:port
+   *
+   * @return host and port for Compactor client connections
+   */
+  private static String getHostPortString(HostAndPort address) {
+    if (address == null) {
+      return null;
+    }
+    return address.getHost() + ":" + address.getPort();
+  }
+
+  public static final String COMPACTOR_SERVICE = "COMPACTOR_SVC";
+
+  private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
+  private static final long TIME_BETWEEN_GC_CHECKS = 5000;
+  private static final long MAX_WAIT_TIME = 60000;
+
+  private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
+  private final UUID compactorId = UUID.randomUUID();
+  private final AccumuloConfiguration aconf;
+  private final String queueName;
+  private final CompactionJobHolder jobHolder;
+  private ZooLock compactorLock;
+
+  Compactor(CompactorServerOpts opts, String[] args) {
+    super("compactor", opts, args);
+    queueName = opts.getQueueName();
+    ServerContext context = super.getContext();
+    context.setupCrypto();
+
+    this.jobHolder = new CompactionJobHolder();
+    aconf = getConfiguration();
+    ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(
+        () -> gcLogger.logGCInfo(getConfiguration()), 0, TIME_BETWEEN_GC_CHECKS,
+        TimeUnit.MILLISECONDS);
+    LOG.info("Version " + Constants.VERSION);
+    LOG.info("Instance " + context.getInstanceID());
+  }
+
+  /**
+   * Set up nodes and locks in ZooKeeper for this Compactor
+   *
+   * @param clientAddress
+   *          address of this Compactor
+   *
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  private void announceExistence(HostAndPort clientAddress)
+      throws KeeperException, InterruptedException {
+
+    String hostPort = getHostPortString(clientAddress);
+
+    ZooReaderWriter zoo = getContext().getZooReaderWriter();
+    String compactorQueuePath =
+        getContext().getZooKeeperRoot() + Constants.ZCOMPACTORS + "/" + this.queueName;
+    String zPath = compactorQueuePath + "/" + hostPort;
+
+    try {
+      zoo.mkdirs(compactorQueuePath);
+      zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP);
+    } catch (KeeperException e) {
+      if (e.code() == KeeperException.Code.NOAUTH) {
+        LOG.error("Failed to write to ZooKeeper. Ensure that"
+            + " accumulo.properties, specifically instance.secret, is consistent.");
+      }
+      throw e;
+    }
+
+    compactorLock = new ZooLock(getContext().getSiteConfiguration(), zPath, compactorId);
+    LockWatcher lw = new LockWatcher() {
+      @Override
+      public void lostLock(final LockLossReason reason) {
+        Halt.halt(1, () -> {
+          LOG.error("Compactor lost lock (reason = {}), exiting.", reason);
+          gcLogger.logGCInfo(getConfiguration());
+        });
+      }
+
+      @Override
+      public void unableToMonitorLockNode(final Exception e) {
+        Halt.halt(1, () -> LOG.error("Lost ability to monitor Compactor lock, exiting.", e));
+      }
+    };
+
+    try {
+      byte[] lockContent = (hostPort + "=" + COMPACTOR_SERVICE).getBytes(UTF_8);
+      for (int i = 0; i < 25; i++) {
+        zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.SKIP);
+
+        if (compactorLock.tryLock(lw, lockContent)) {
+          LOG.debug("Obtained Compactor lock {}", compactorLock.getLockPath());
+          return;
+        }
+        LOG.info("Waiting for Compactor lock");
+        sleepUninterruptibly(5, TimeUnit.SECONDS);
+      }
+      String msg = "Too many retries, exiting.";
+      LOG.info(msg);
+      throw new RuntimeException(msg);
+    } catch (Exception e) {
+      LOG.info("Could not obtain tablet server lock, exiting.", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Get the address of the CompactionCoordinator
+   *
+   * @return address of Coordinator
+   */
+  private HostAndPort getCoordinatorAddress() {
+    try {
+      // TODO: Get the coordinator location from ZooKeeper
+      List<String> locations = null;
+      if (locations.isEmpty()) {
+        return null;
+      }
+      return HostAndPort.fromString(locations.get(0));
+    } catch (Exception e) {
+      LOG.warn("Failed to obtain manager host " + e);
+    }
+
+    return null;
+  }
+
+  /**
+   * Start this Compactors thrift service to handle incoming client requests
+   *
+   * @return address of this compactor client service
+   * @throws UnknownHostException
+   */
+  private ServerAddress startCompactorClientService() throws UnknownHostException {
+    Compactor rpcProxy = TraceUtil.wrapService(this);
+    final org.apache.accumulo.core.compaction.thrift.Compactor.Processor<Compactor> processor;
+    if (getContext().getThriftServerType() == ThriftServerType.SASL) {
+      Compactor tcredProxy =
+          TCredentialsUpdatingWrapper.service(rpcProxy, Compactor.class, getConfiguration());
+      processor = new org.apache.accumulo.core.compaction.thrift.Compactor.Processor<>(tcredProxy);
+    } else {
+      processor = new org.apache.accumulo.core.compaction.thrift.Compactor.Processor<>(rpcProxy);
+    }
+    Property maxMessageSizeProperty = (aconf.get(Property.COMPACTOR_MAX_MESSAGE_SIZE) != null
+        ? Property.COMPACTOR_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
+    ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), getContext(), getHostname(),
+        Property.COMPACTOR_CLIENTPORT, processor, this.getClass().getSimpleName(),
+        "Thrift Client Server", Property.COMPACTOR_PORTSEARCH, Property.COMPACTOR_MINTHREADS,
+        Property.COMPACTOR_MINTHREADS_TIMEOUT, Property.COMPACTOR_THREADCHECK,
+        maxMessageSizeProperty);
+    LOG.info("address = {}", sp.address);
+    return sp;
+  }
+
+  /**
+   * Called by a thrift client to cancel the currently running compaction if it matches the supplied
+   * job
+   *
+   * @param compactionJob
+   *          job
+   */
+  @Override
+  public void cancel(CompactionJob compactionJob) {
+    synchronized (jobHolder) {
+      if (jobHolder.isSet() && jobHolder.getJob().equals(compactionJob)) {
+        LOG.info("Cancel requested for compaction job {}", compactionJob);
+        jobHolder.cancel();
+        jobHolder.getThread().interrupt();
+      }
+    }
+  }
+
+  /**
+   * Send an update to the coordinator for this job
+   * 
+   * @param coordinatorClient
+   *          address of the CompactionCoordinator
+   * @param job
+   *          compactionJob
+   * @param state
+   *          updated state
+   * @param message
+   *          updated message
+   */
+  private void updateCompactionState(CompactionCoordinator.Client coordinatorClient,
+      CompactionJob job, CompactionState state, String message) {
+    RetryableThriftCall<Void> thriftCall =
+        new RetryableThriftCall<>(1000, MAX_WAIT_TIME, 25, new RetryableThriftFunction<Void>() {
+          @Override
+          public Void execute() throws TException {
+            coordinatorClient.updateCompactionState(job, state, message,
+                System.currentTimeMillis());
+            return null;
+          }
+        });
+    thriftCall.run();
+  }
+
+  /**
+   * Get the next job to run
+   *
+   * @param coordinatorClient
+   *          address of the CompactionCoordinator
+   * @param compactorAddress
+   *          address of this Compactor
+   * @return CompactionJob
+   */
+  private CompactionJob getNextJob(CompactionCoordinator.Client coordinatorClient,
+      String compactorAddress) {
+    RetryableThriftCall<CompactionJob> nextJobThriftCall = new RetryableThriftCall<>(1000,
+        MAX_WAIT_TIME, 0, new RetryableThriftFunction<CompactionJob>() {
+          @Override
+          public CompactionJob execute() throws TException {
+            return coordinatorClient.getCompactionJob(queueName, compactorAddress);
+          }
+        });
+    return nextJobThriftCall.run();
+  }
+
+  @Override
+  public void run() {
+
+    ServerAddress compactorAddress = null;
+    try {
+      compactorAddress = startCompactorClientService();
+    } catch (UnknownHostException e1) {
+      throw new RuntimeException("Failed to start the compactor client service", e1);
+    }
+    final HostAndPort clientAddress = compactorAddress.address;
+
+    try {
+      announceExistence(clientAddress);
+    } catch (KeeperException | InterruptedException e) {
+      throw new RuntimeException("Erroring registering in ZooKeeper", e);
+    }
+
+    HostAndPort coordinatorHost = getCoordinatorAddress();
+    if (null == coordinatorHost) {
+      throw new RuntimeException("Unable to get CompactionCoordinator address from ZooKeeper");
+    }
+    LOG.info("CompactionCoordinator address is: {}", coordinatorHost);
+    CompactionCoordinator.Client coordinatorClient;
+    try {
+      coordinatorClient = ThriftUtil.getClient(new CompactionCoordinator.Client.Factory(),
+          coordinatorHost, getContext());
+    } catch (TTransportException e2) {
+      throw new RuntimeException("Erroring connecting to CompactionCoordinator", e2);
+    }
+
+    LOG.info("Compactor started, waiting for work");
+    try {
+
+      final AtomicReference<Throwable> err = new AtomicReference<>();
+
+      while (true) {
+        err.set(null);
+        jobHolder.reset();
+        final CompactionJob job = getNextJob(coordinatorClient, getHostPortString(clientAddress));
+
+        LOG.info("Received next compaction job: {}", job);
+
+        final LongAdder totalInputSize = new LongAdder();
+        final LongAdder totalInputEntries = new LongAdder();
+        final CountDownLatch started = new CountDownLatch(1);
+        final CountDownLatch stopped = new CountDownLatch(1);
+
+        Thread compactionThread = Threads.createThread(
+            "Compaction job for tablet " + job.getExtent().toString(), new Runnable() {
+              @Override
+              public void run() {
+                try {
+                  LOG.info("Setting up to run compactor");
+                  updateCompactionState(coordinatorClient, job, CompactionState.STARTED,
+                      "Compaction started");
+
+                  final TableId tableId = TableId.of(new String(job.getExtent().getTable(), UTF_8));
+                  final TableConfiguration tConfig = getContext().getTableConfiguration(tableId);
+
+                  final Map<StoredTabletFile,DataFileValue> files = new TreeMap<>();
+                  job.getFiles().forEach(f -> {
+                    files.put(new StoredTabletFile(f.getMetadataFileEntry()),
+                        new DataFileValue(f.getSize(), f.getEntries(), f.getTimestamp()));
+                    totalInputSize.add(f.getSize());
+                    totalInputEntries.add(f.getEntries());
+                  });
+
+                  final TabletFile outputFile = new TabletFile(new Path(job.getOutputFile()));
+
+                  final CompactionEnv cenv = new CompactionEnv() {
+                    @Override
+                    public boolean isCompactionEnabled() {
+                      return !jobHolder.isCancelled();
+                    }
+
+                    @Override
+                    public IteratorScope getIteratorScope() {
+                      return IteratorScope.majc;
+                    }
+
+                    @Override
+                    public RateLimiter getReadLimiter() {
+                      return SharedRateLimiterFactory.getInstance(getContext().getConfiguration())
+                          .create("read_rate_limiter", () -> job.getReadRate());
+                    }
+
+                    @Override
+                    public RateLimiter getWriteLimiter() {
+                      return SharedRateLimiterFactory.getInstance(getContext().getConfiguration())
+                          .create("write_rate_limiter", () -> job.getWriteRate());
+                    }
+
+                    @Override
+                    public SystemIteratorEnvironment createIteratorEnv(ServerContext context,
+                        AccumuloConfiguration acuTableConf, TableId tableId) {
+                      return new TabletIteratorEnvironment(getContext(), IteratorScope.majc,
+                          !job.isPropagateDeletes(), acuTableConf, tableId,
+                          CompactionKind.valueOf(job.getKind().name()));
+                    }
+
+                    @Override
+                    public SortedKeyValueIterator<Key,Value> getMinCIterator() {
+                      throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    public CompactionReason getReason() {
+                      switch (job.getKind()) {
+                        case USER:
+                          return CompactionReason.USER;
+                        case CHOP:
+                          return CompactionReason.CHOP;
+                        case SELECTOR:
+                        case SYSTEM:
+                        default:
+                          return CompactionReason.SYSTEM;
+                      }
+                    }
+                  };
+
+                  final List<IteratorSetting> iters = new ArrayList<>();
+                  job.getIteratorSettings().getIterators()
+                      .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis)));
+
+                  org.apache.accumulo.server.compaction.Compactor compactor =
+                      new org.apache.accumulo.server.compaction.Compactor(getContext(),
+                          KeyExtent.fromThrift(job.getExtent()), files, outputFile,
+                          job.isPropagateDeletes(), cenv, iters, tConfig);
+
+                  LOG.info("Starting compactor");
+                  started.countDown();
+                  jobHolder.setStats(compactor.call());
+
+                  LOG.info("Compaction completed successfully");
+                  // Update state when completed
+                  updateCompactionState(coordinatorClient, job, CompactionState.SUCCEEDED,
+                      "Compaction completed successfully");
+                } catch (Exception e) {
+                  LOG.error("Compaction failed", e);
+                  err.set(e);
+                  throw new RuntimeException("Compaction failed", e);
+                } finally {
+                  stopped.countDown();
+                  // TODO: Any cleanup
+                }
+              }
+            });
+
+        synchronized (jobHolder) {
+          jobHolder.set(job, compactionThread);
+        }
+
+        compactionThread.start(); // start the compactionThread
+        try {
+          started.await(); // wait until the compactor is started
+          long inputEntries = totalInputEntries.sum();
+          while (stopped.getCount() > 0) {
+            List<CompactionInfo> running =
+                org.apache.accumulo.server.compaction.Compactor.getRunningCompactions();
+            if (!running.isEmpty()) {
+              // Compaction has started. There should only be one in the list
+              CompactionInfo info = running.get(0);
+              if (info != null) {
+                String message = String.format(
+                    "Compaction in progress, read %d of %d input entries (%f %), written %d entries",
+                    info.getEntriesRead(), inputEntries,
+                    (info.getEntriesRead() / inputEntries) * 100, info.getEntriesWritten());
+                LOG.info(message);
+                updateCompactionState(coordinatorClient, job, CompactionState.IN_PROGRESS, message);
+              }
+            }
+            UtilWaitThread.sleep(MAX_WAIT_TIME);
+          }
+          try {
+            compactionThread.join();
+            CompactionStats stats = jobHolder.getStats();
+            // TODO: Tell coordinator that we are finished, send stats.
+
+          } catch (InterruptedException e) {
+            LOG.error(
+                "Compactor thread was interrupted waiting for compaction to finish, cancelling job",
+                e);
+            cancel(job);
+          }
+
+        } catch (InterruptedException e1) {
+          LOG.error(
+              "Compactor thread was interrupted waiting for compaction to start, cancelling job",
+              e1);
+          cancel(job);
+        }
+
+        if (compactionThread.isInterrupted()) {
+          LOG.warn("Compaction thread was interrupted, sending CANCELLED state");
+          updateCompactionState(coordinatorClient, job, CompactionState.CANCELLED,
+              "Compaction cancelled");
+        }
+
+        Throwable thrown = err.get();
+        if (thrown != null) {
+          updateCompactionState(coordinatorClient, job, CompactionState.FAILED,
+              "Compaction failed due to: " + thrown.getMessage());
+        }
+      }
+
+    } finally {
+      // close connection to coordinator
+      ThriftUtil.returnClient(coordinatorClient);
+
+      // Shutdown local thrift server
+      LOG.debug("Stopping Thrift Servers");
+      TServerUtils.stopTServer(compactorAddress.server);
+
+      try {
+        LOG.debug("Closing filesystems");
+        getContext().getVolumeManager().close();
+      } catch (IOException e) {
+        LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
+      }
+
+      gcLogger.logGCInfo(getConfiguration());
+      LOG.info("stop requested. exiting ... ");
+      try {
+        compactorLock.unlock();
+      } catch (Exception e) {
+        LOG.warn("Failed to release compactor lock", e);
+      }
+    }
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    try (Compactor compactor = new Compactor(new CompactorServerOpts(), args)) {
+      compactor.runServer();
+    }
+  }
+
+}
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftCall.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftCall.java
new file mode 100644
index 0000000..5ce599e
--- /dev/null
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftCall.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.compactor;
+
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RetryableThriftCall<T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RetryableThriftCall.class);
+
+  private final long start;
+  private final long maxWaitTime;
+  private int maxNumRetries;
+  private final RetryableThriftFunction<T> function;
+  private final boolean retryForever;
+
+  /**
+   * RetryableThriftCall constructor
+   *
+   * @param start
+   *          initial wait time
+   * @param maxWaitTime
+   *          max wait time
+   * @param maxNumRetries
+   *          number of times to retry, 0 to retry forever
+   * @param function
+   *          function to execute
+   */
+  public RetryableThriftCall(long start, long maxWaitTime, int maxNumRetries,
+      RetryableThriftFunction<T> function) {
+    this.start = start;
+    this.maxWaitTime = maxWaitTime;
+    this.maxNumRetries = maxNumRetries;
+    this.function = function;
+    this.retryForever = (maxNumRetries == 0);
+  }
+
+  /**
+   * Attempts to call the function, waiting and retrying when TException is thrown. Wait time is
+   * initially set to the start time and doubled each time, up to the maximum wait time. If
+   * maxNumRetries is 0, then this will retry forever. If maxNumRetries is non-zero, then a
+   * RuntimeException is thrown when it has exceeded he maxNumRetries parameter.
+   *
+   * @return T
+   * @throws RuntimeException
+   *           when maximum number of retries has been exceeded
+   */
+  public T run() {
+    long waitTime = start;
+    int numRetries = 0;
+    T result = null;
+    do {
+      try {
+        result = function.execute();
+      } catch (TException e) {
+        LOG.error("Error in Thrift function talking to Coordinator, retrying in {}ms", waitTime);
+        if (!retryForever) {
+          numRetries++;
+          if (numRetries > maxNumRetries) {
+            throw new RuntimeException(
+                "Maximum number of retries (" + this.maxNumRetries + ") attempted.");
+          }
+        }
+      }
+      UtilWaitThread.sleep(waitTime);
+      if (waitTime != maxWaitTime) {
+        waitTime = Math.max(waitTime * 2, maxWaitTime);
+      }
+    } while (null == result);
+    return result;
+  }
+
+}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftFunction.java
similarity index 77%
rename from server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java
rename to server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftFunction.java
index 243d36c..bfdbb4c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/RetryableThriftFunction.java
@@ -16,15 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.tserver;
+package org.apache.accumulo.compactor;
 
-import java.io.IOException;
+import org.apache.thrift.TException;
 
-public class TooManyFilesException extends IOException {
-
-  private static final long serialVersionUID = 1L;
-
-  public TooManyFilesException(String msg) {
-    super(msg);
-  }
+@FunctionalInterface
+public interface RetryableThriftFunction<T> {
+  T execute() throws TException;
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
index 7a8e180..9ccfcab 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
@@ -46,6 +46,7 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.conf.TableConfiguration.ParsedIteratorConfig;
+import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
 import org.apache.accumulo.tserver.data.ServerConditionalMutation;
 import org.apache.hadoop.io.Text;
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 300275e..77ae15b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -67,7 +67,8 @@ import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.ServiceEnvironmentImpl;
-import org.apache.accumulo.tserver.FileManager.ScanFileManager;
+import org.apache.accumulo.server.fs.FileManager;
+import org.apache.accumulo.server.fs.FileManager.ScanFileManager;
 import org.apache.accumulo.tserver.memory.LargestFirstMemoryManager;
 import org.apache.accumulo.tserver.memory.TabletMemoryReport;
 import org.apache.accumulo.tserver.session.ScanSession;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
index fb108e9..7aff8d3 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
@@ -122,6 +122,7 @@ import org.apache.accumulo.server.compaction.CompactionInfo;
 import org.apache.accumulo.server.compaction.Compactor;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.fs.TooManyFilesException;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
index a950290..6ff2a4c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
@@ -23,8 +23,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.client.SampleNotPresentException;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.server.fs.TooManyFilesException;
 import org.apache.accumulo.tserver.TabletServer;
-import org.apache.accumulo.tserver.TooManyFilesException;
 import org.apache.accumulo.tserver.session.SingleScanSession;
 import org.apache.accumulo.tserver.tablet.ScanBatch;
 import org.apache.accumulo.tserver.tablet.Tablet;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
index aae18e0..d6da2f8 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
@@ -83,8 +83,8 @@ import org.apache.accumulo.server.compaction.Compactor.CompactionCanceledExcepti
 import org.apache.accumulo.server.compaction.Compactor.CompactionEnv;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
+import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
 import org.apache.accumulo.server.util.MetadataTableUtil;
-import org.apache.accumulo.tserver.TabletIteratorEnvironment;
 import org.apache.accumulo.tserver.compaction.CompactionPlan;
 import org.apache.accumulo.tserver.compaction.CompactionStrategy;
 import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
index 75a424b..886cc68 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
@@ -46,12 +46,12 @@ import org.apache.accumulo.server.compaction.CompactionStats;
 import org.apache.accumulo.server.compaction.Compactor;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
+import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
 import org.apache.accumulo.tserver.InMemoryMap;
 import org.apache.accumulo.tserver.MinorCompactionReason;
-import org.apache.accumulo.tserver.TabletIteratorEnvironment;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
index 1a9a98c..25d6c22 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
@@ -45,9 +45,9 @@ import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.conf.TableConfiguration.ParsedIteratorConfig;
-import org.apache.accumulo.tserver.FileManager.ScanFileManager;
+import org.apache.accumulo.server.fs.FileManager.ScanFileManager;
+import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
 import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
-import org.apache.accumulo.tserver.TabletIteratorEnvironment;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.scan.ScanParameters;
 import org.slf4j.Logger;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 6a83ea5..b6867f0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -93,6 +93,7 @@ import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.compaction.CompactionStats;
 import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.TooManyFilesException;
 import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
 import org.apache.accumulo.server.fs.VolumeUtil;
 import org.apache.accumulo.server.fs.VolumeUtil.TabletFiles;
@@ -114,7 +115,6 @@ import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
 import org.apache.accumulo.tserver.TabletStatsKeeper;
 import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
-import org.apache.accumulo.tserver.TooManyFilesException;
 import org.apache.accumulo.tserver.TservConstraintEnv;
 import org.apache.accumulo.tserver.compactions.Compactable;
 import org.apache.accumulo.tserver.constraints.ConstraintChecker;