You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2022/03/01 15:37:21 UTC

[accumulo] branch main updated: Modified Compression to allow for user-defined algorithms (#2518)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new b24fb8d  Modified Compression to allow for user-defined algorithms (#2518)
b24fb8d is described below

commit b24fb8d7cfaf13bc60794867425ebb2ba39e8558
Author: Dave Marion <dl...@apache.org>
AuthorDate: Tue Mar 1 10:37:14 2022 -0500

    Modified Compression to allow for user-defined algorithms (#2518)
    
    Refactored Compression.java into three pieces. The configuration properties of the
    various codes were moved into an spi package so that users can define their own
    compression algorithms by specifying the codec class and some other defaults. The
    logic for the creating the codecs, compression streams, decompression streams, etc
    was moved to a class called CompressionAlgorithm. Compression.java now  uses the
    ServiceLoader to find compression algorithm configurations and initializes a new
    CompressionAlgorithm for each configuration. Compression.java maintains that static
    list and methods for retrieving the CompressionAlgorithm's by name.
---
 .../accumulo/core/file/rfile/CreateEmpty.java      |   3 +-
 .../accumulo/core/file/rfile/bcfile/BCFile.java    |  34 +-
 .../core/file/rfile/bcfile/Compression.java        | 834 +--------------------
 .../file/rfile/bcfile/CompressionAlgorithm.java    | 352 +++++++++
 .../core/file/rfile/bcfile/IdentityCodec.java      | 303 ++++++++
 .../core/spi/file/rfile/compression/Bzip2.java     |  51 ++
 .../CompressionAlgorithmConfiguration.java         |  57 ++
 .../core/spi/file/rfile/compression/Gz.java        |  55 ++
 .../core/spi/file/rfile/compression/Lz4.java       |  51 ++
 .../core/spi/file/rfile/compression/Lzo.java       |  51 ++
 .../spi/file/rfile/compression/NoCompression.java  |  53 ++
 .../core/spi/file/rfile/compression/Snappy.java    |  56 ++
 .../core/spi/file/rfile/compression/ZStandard.java |  56 ++
 .../core/file/rfile/bcfile/CompressionTest.java    |  88 ++-
 .../tserver/log/SortedLogRecoveryTest.java         |   3 +-
 15 files changed, 1180 insertions(+), 867 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
index a787bfa..2f82260 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.crypto.CryptoServiceFactory;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.file.rfile.bcfile.Compression;
+import org.apache.accumulo.core.spi.file.rfile.compression.NoCompression;
 import org.apache.accumulo.start.spi.KeywordExecutable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -69,7 +70,7 @@ public class CreateEmpty implements KeywordExecutable {
   static class Opts extends Help {
     @Parameter(names = {"-c", "--codec"}, description = "the compression codec to use.",
         validateWith = IsSupportedCompressionAlgorithm.class)
-    String codec = Compression.COMPRESSION_NONE;
+    String codec = new NoCompression().getName();
     @Parameter(
         description = " <path> { <path> ... } Each path given is a URL."
             + " Relative paths are resolved according to the default filesystem defined in"
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index c1e3b23..c82713a 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -36,7 +36,6 @@ import java.util.TreeMap;
 
 import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl;
 import org.apache.accumulo.core.crypto.CryptoUtils;
-import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
 import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version;
 import org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream;
 import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
@@ -129,7 +128,7 @@ public final class BCFile {
      * Intermediate class that maintain the state of a Writable Compression Block.
      */
     private static final class WBlockState {
-      private final Algorithm compressAlgo;
+      private final CompressionAlgorithm compressAlgo;
       private Compressor compressor; // !null only if using native
       // Hadoop compression
       private final RateLimitedOutputStream fsOut;
@@ -138,7 +137,7 @@ public final class BCFile {
       private final SimpleBufferedOutputStream fsBufferedOutput;
       private OutputStream out;
 
-      public WBlockState(Algorithm compressionAlgo, RateLimitedOutputStream fsOut,
+      public WBlockState(CompressionAlgorithm compressionAlgo, RateLimitedOutputStream fsOut,
           BytesWritable fsOutputBuffer, Configuration conf, FileEncrypter encrypter)
           throws IOException {
         this.compressAlgo = compressionAlgo;
@@ -373,11 +372,11 @@ public final class BCFile {
       }
     }
 
-    private Algorithm getDefaultCompressionAlgorithm() {
+    private CompressionAlgorithm getDefaultCompressionAlgorithm() {
       return dataIndex.getDefaultCompressionAlgorithm();
     }
 
-    private BlockAppender prepareMetaBlock(String name, Algorithm compressAlgo)
+    private BlockAppender prepareMetaBlock(String name, CompressionAlgorithm compressAlgo)
         throws IOException, MetaBlockAlreadyExists {
       if (blkInProgress) {
         throw new IllegalStateException("Cannot create Meta Block until previous block is closed.");
@@ -439,9 +438,9 @@ public final class BCFile {
      */
     private class MetaBlockRegister {
       private final String name;
-      private final Algorithm compressAlgo;
+      private final CompressionAlgorithm compressAlgo;
 
-      MetaBlockRegister(String name, Algorithm compressAlgo) {
+      MetaBlockRegister(String name, CompressionAlgorithm compressAlgo) {
         this.name = name;
         this.compressAlgo = compressAlgo;
       }
@@ -470,15 +469,15 @@ public final class BCFile {
      * Intermediate class that maintain the state of a Readable Compression Block.
      */
     private static final class RBlockState {
-      private final Algorithm compressAlgo;
+      private final CompressionAlgorithm compressAlgo;
       private Decompressor decompressor;
       private final BlockRegion region;
       private final InputStream in;
       private volatile boolean closed;
 
-      public <InputStreamType extends InputStream & Seekable> RBlockState(Algorithm compressionAlgo,
-          InputStreamType fsin, BlockRegion region, Configuration conf, FileDecrypter decrypter)
-          throws IOException {
+      public <InputStreamType extends InputStream & Seekable> RBlockState(
+          CompressionAlgorithm compressionAlgo, InputStreamType fsin, BlockRegion region,
+          Configuration conf, FileDecrypter decrypter) throws IOException {
         this.compressAlgo = compressionAlgo;
         this.region = region;
         this.decompressor = compressionAlgo.getDecompressor();
@@ -747,7 +746,7 @@ public final class BCFile {
       return dataIndex.getBlockRegionList().get(blockIndex).getRawSize();
     }
 
-    private BlockReader createReader(Algorithm compressAlgo, BlockRegion region)
+    private BlockReader createReader(CompressionAlgorithm compressAlgo, BlockRegion region)
         throws IOException {
       RBlockState rbs = new RBlockState(compressAlgo, in, region, conf, decrypter);
       return new BlockReader(rbs);
@@ -799,7 +798,7 @@ public final class BCFile {
    */
   static final class MetaIndexEntry {
     private final String metaName;
-    private final Algorithm compressionAlgorithm;
+    private final CompressionAlgorithm compressionAlgorithm;
     private static final String defaultPrefix = "data:";
 
     private final BlockRegion region;
@@ -816,7 +815,8 @@ public final class BCFile {
       region = new BlockRegion(in);
     }
 
-    public MetaIndexEntry(String metaName, Algorithm compressionAlgorithm, BlockRegion region) {
+    public MetaIndexEntry(String metaName, CompressionAlgorithm compressionAlgorithm,
+        BlockRegion region) {
       this.metaName = metaName;
       this.compressionAlgorithm = compressionAlgorithm;
       this.region = region;
@@ -826,7 +826,7 @@ public final class BCFile {
       return metaName;
     }
 
-    public Algorithm getCompressionAlgorithm() {
+    public CompressionAlgorithm getCompressionAlgorithm() {
       return compressionAlgorithm;
     }
 
@@ -848,7 +848,7 @@ public final class BCFile {
   static class DataIndex {
     static final String BLOCK_NAME = "BCFile.index";
 
-    private final Algorithm defaultCompressionAlgorithm;
+    private final CompressionAlgorithm defaultCompressionAlgorithm;
 
     // for data blocks, each entry specifies a block's offset, compressed size
     // and raw size
@@ -874,7 +874,7 @@ public final class BCFile {
       listRegions = new ArrayList<>();
     }
 
-    public Algorithm getDefaultCompressionAlgorithm() {
+    public CompressionAlgorithm getDefaultCompressionAlgorithm() {
       return defaultCompressionAlgorithm;
     }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
index 6679610..8b73e6b 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
@@ -18,41 +18,20 @@
  */
 package org.apache.accumulo.core.file.rfile.bcfile;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
+import org.apache.accumulo.core.spi.file.rfile.compression.CompressionAlgorithmConfiguration;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Maps;
 
 /**
  * Compression related stuff.
  */
 public final class Compression {
 
-  private static final Logger log = LoggerFactory.getLogger(Compression.class);
-
   /**
    * Prevent the instantiation of this class.
    */
@@ -60,801 +39,32 @@ public final class Compression {
     throw new UnsupportedOperationException();
   }
 
-  static class FinishOnFlushCompressionStream extends FilterOutputStream {
-
-    FinishOnFlushCompressionStream(CompressionOutputStream cout) {
-      super(cout);
-    }
-
-    @Override
-    public void write(byte[] b, int off, int len) throws IOException {
-      out.write(b, off, len);
-    }
-
-    @Override
-    public void flush() throws IOException {
-      CompressionOutputStream cout = (CompressionOutputStream) out;
-      cout.finish();
-      cout.flush();
-      cout.resetState();
-    }
-  }
-
-  /**
-   * Compression: bzip2
-   */
-  public static final String COMPRESSION_BZIP2 = "bzip2";
-
-  /**
-   * Compression: zStandard
-   */
-  public static final String COMPRESSION_ZSTD = "zstd";
-
-  /**
-   * Compression: snappy
-   **/
-  public static final String COMPRESSION_SNAPPY = "snappy";
-
-  /**
-   * Compression: gzip
-   */
-  public static final String COMPRESSION_GZ = "gz";
-
-  /**
-   * Compression: lzo
-   */
-  public static final String COMPRESSION_LZO = "lzo";
-
-  /**
-   * Compression: lz4
-   */
-  public static final String COMPRESSION_LZ4 = "lz4";
-
-  /**
-   * compression: none
-   */
-  public static final String COMPRESSION_NONE = "none";
-
-  /**
-   * Compression algorithms. There is a static initializer, below the values defined in the
-   * enumeration, that calls the initializer of all defined codecs within {@link Algorithm}. This
-   * promotes a model of the following call graph of initialization by the static initializer,
-   * followed by calls to {@link #getCodec()},
-   * {@link #createCompressionStream(OutputStream, Compressor, int)}, and
-   * {@link #createDecompressionStream(InputStream, Decompressor, int)}. In some cases, the
-   * compression and decompression call methods will include a different buffer size for the stream.
-   * Note that if the compressed buffer size requested in these calls is zero, we will not set the
-   * buffer size for that algorithm. Instead, we will use the default within the codec.
-   * <p>
-   * The buffer size is configured in the Codec by way of a Hadoop {@link Configuration} reference.
-   * One approach may be to use the same Configuration object, but when calls are made to
-   * {@code createCompressionStream} and {@code createDecompressionStream} with non default buffer
-   * sizes, the configuration object must be changed. In this case, concurrent calls to
-   * {@code createCompressionStream} and {@code createDecompressionStream} would mutate the
-   * configuration object beneath each other, requiring synchronization to avoid undesirable
-   * activity via co-modification. To avoid synchronization entirely, we will create Codecs with
-   * their own Configuration object and cache them for re-use. A default codec will be statically
-   * created, as mentioned above to ensure we always have a codec available at loader
-   * initialization.
-   * <p>
-   * There is a Guava cache defined within Algorithm that allows us to cache Codecs for re-use.
-   * Since they will have their own configuration object and thus do not need to be mutable, there
-   * is no concern for using them concurrently; however, the Guava cache exists to ensure a maximal
-   * size of the cache and efficient and concurrent read/write access to the cache itself.
-   * <p>
-   * To provide Algorithm specific details and to describe what is in code:
-   * <p>
-   * LZO will always have the default LZO codec because the buffer size is never overridden within
-   * it.
-   * <p>
-   * LZ4 will always have the default LZ4 codec because the buffer size is never overridden within
-   * it.
-   * <p>
-   * GZ will use the default GZ codec for the compression stream, but can potentially use a
-   * different codec instance for the decompression stream if the requested buffer size does not
-   * match the default GZ buffer size of 32k.
-   * <p>
-   * Snappy will use the default Snappy codec with the default buffer size of 64k for the
-   * compression stream, but will use a cached codec if the buffer size differs from the default.
-   */
-  public enum Algorithm {
-
-    BZIP2(COMPRESSION_BZIP2) {
-
-      /**
-       * The default codec class.
-       */
-      private static final String DEFAULT_CLAZZ = "org.apache.hadoop.io.compress.BZip2Codec";
-
-      /**
-       * Configuration option for BZip2 buffer size. Uses the default FS buffer size.
-       */
-      private static final String BUFFER_SIZE_OPT = "io.file.buffer.size";
-
-      /**
-       * Default buffer size. Changed from default of 4096.
-       */
-      private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
-
-      /**
-       * Whether or not the codec status has been checked. Ensures the default codec is not
-       * recreated.
-       */
-      private final AtomicBoolean checked = new AtomicBoolean(false);
-
-      private transient CompressionCodec codec = null;
-
-      @Override
-      public boolean isSupported() {
-        return codec != null;
-      }
-
-      @Override
-      public void initializeDefaultCodec() {
-        codec = initCodec(checked, DEFAULT_BUFFER_SIZE, codec);
-      }
-
-      @Override
-      CompressionCodec createNewCodec(int bufferSize) {
-        return createNewCodec(CONF_BZIP2_CLASS, DEFAULT_CLAZZ, bufferSize, BUFFER_SIZE_OPT);
-      }
-
-      @Override
-      CompressionCodec getCodec() {
-        return codec;
-      }
-
-      @Override
-      public InputStream createDecompressionStream(InputStream downStream,
-          Decompressor decompressor, int downStreamBufferSize) throws IOException {
-        if (!isSupported()) {
-          throw new IOException("BZip2 codec class not specified. Did you forget to set property "
-              + CONF_BZIP2_CLASS + "?");
-        }
-        InputStream bis = bufferStream(downStream, downStreamBufferSize);
-        CompressionInputStream cis = codec.createInputStream(bis, decompressor);
-        return new BufferedInputStream(cis, DATA_IBUF_SIZE);
-      }
-
-      @Override
-      public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
-          int downStreamBufferSize) throws IOException {
-        if (!isSupported()) {
-          throw new IOException("BZip2 codec class not specified. Did you forget to set property "
-              + CONF_BZIP2_CLASS + "?");
-        }
-        return createFinishedOnFlushCompressionStream(downStream, compressor, downStreamBufferSize);
-      }
-
-    },
-
-    LZO(COMPRESSION_LZO) {
-
-      /**
-       * The default codec class.
-       */
-      private static final String DEFAULT_CLAZZ = "org.apache.hadoop.io.compress.LzoCodec";
-
-      /**
-       * Configuration option for LZO buffer size.
-       */
-      private static final String BUFFER_SIZE_OPT = "io.compression.codec.lzo.buffersize";
-
-      /**
-       * Default buffer size.
-       */
-      private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
-
-      /**
-       * Whether or not the codec status has been checked. Ensures the default codec is not
-       * recreated.
-       */
-      private final AtomicBoolean checked = new AtomicBoolean(false);
-
-      private transient CompressionCodec codec = null;
-
-      @Override
-      public boolean isSupported() {
-        return codec != null;
-      }
-
-      @Override
-      public void initializeDefaultCodec() {
-        codec = initCodec(checked, DEFAULT_BUFFER_SIZE, codec);
-      }
-
-      @Override
-      CompressionCodec createNewCodec(int bufferSize) {
-        return createNewCodec(CONF_LZO_CLASS, DEFAULT_CLAZZ, bufferSize, BUFFER_SIZE_OPT);
-      }
-
-      @Override
-      CompressionCodec getCodec() {
-        return codec;
-      }
-
-      @Override
-      public InputStream createDecompressionStream(InputStream downStream,
-          Decompressor decompressor, int downStreamBufferSize) throws IOException {
-        if (!isSupported()) {
-          throw new IOException("LZO codec class not specified. Did you forget to set property "
-              + CONF_LZO_CLASS + "?");
-        }
-        InputStream bis = bufferStream(downStream, downStreamBufferSize);
-        CompressionInputStream cis = codec.createInputStream(bis, decompressor);
-        return new BufferedInputStream(cis, DATA_IBUF_SIZE);
-      }
-
-      @Override
-      public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
-          int downStreamBufferSize) throws IOException {
-        if (!isSupported()) {
-          throw new IOException("LZO codec class not specified. Did you forget to set property "
-              + CONF_LZO_CLASS + "?");
-        }
-        return createFinishedOnFlushCompressionStream(downStream, compressor, downStreamBufferSize);
-      }
-
-    },
-
-    LZ4(COMPRESSION_LZ4) {
-
-      /**
-       * The default codec class.
-       */
-      private static final String DEFAULT_CLAZZ = "org.apache.hadoop.io.compress.Lz4Codec";
-
-      /**
-       * Configuration option for LZ4 buffer size.
-       */
-      private static final String BUFFER_SIZE_OPT = "io.compression.codec.lz4.buffersize";
-
-      /**
-       * Default buffer size.
-       */
-      private static final int DEFAULT_BUFFER_SIZE = 256 * 1024;
-
-      /**
-       * Whether or not the codec status has been checked. Ensures the default codec is not
-       * recreated.
-       */
-      private final AtomicBoolean checked = new AtomicBoolean(false);
-
-      private transient CompressionCodec codec = null;
-
-      @Override
-      public boolean isSupported() {
-        return codec != null;
-      }
-
-      @Override
-      public void initializeDefaultCodec() {
-        codec = initCodec(checked, DEFAULT_BUFFER_SIZE, codec);
-      }
-
-      @Override
-      CompressionCodec createNewCodec(int bufferSize) {
-        return createNewCodec(CONF_LZ4_CLASS, DEFAULT_CLAZZ, bufferSize, BUFFER_SIZE_OPT);
-      }
-
-      @Override
-      CompressionCodec getCodec() {
-        return codec;
-      }
-
-      @Override
-      public InputStream createDecompressionStream(InputStream downStream,
-          Decompressor decompressor, int downStreamBufferSize) throws IOException {
-        if (!isSupported()) {
-          throw new IOException("LZ4 codec class not specified. Did you forget to set property "
-              + CONF_LZ4_CLASS + "?");
-        }
-        InputStream bis = bufferStream(downStream, downStreamBufferSize);
-        CompressionInputStream cis = codec.createInputStream(bis, decompressor);
-        return new BufferedInputStream(cis, DATA_IBUF_SIZE);
-      }
-
-      @Override
-      public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
-          int downStreamBufferSize) throws IOException {
-        if (!isSupported()) {
-          throw new IOException("LZ4 codec class not specified. Did you forget to set property "
-              + CONF_LZ4_CLASS + "?");
-        }
-        return createFinishedOnFlushCompressionStream(downStream, compressor, downStreamBufferSize);
-      }
-
-    },
-
-    GZ(COMPRESSION_GZ) {
-
-      private transient DefaultCodec codec = null;
-
-      /**
-       * Configuration option for gz buffer size
-       */
-      private static final String BUFFER_SIZE_OPT = "io.file.buffer.size";
-
-      /**
-       * Default buffer size
-       */
-      private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
-
-      @Override
-      CompressionCodec getCodec() {
-        return codec;
-      }
-
-      @Override
-      public void initializeDefaultCodec() {
-        codec = (DefaultCodec) createNewCodec(DEFAULT_BUFFER_SIZE);
-      }
-
-      /**
-       * Creates a new GZ codec
-       */
-      @Override
-      protected CompressionCodec createNewCodec(final int bufferSize) {
-        Configuration newConfig = new Configuration(conf);
-        updateBuffer(conf, BUFFER_SIZE_OPT, bufferSize);
-        DefaultCodec newCodec = new DefaultCodec();
-        newCodec.setConf(newConfig);
-        return newCodec;
-      }
-
-      @Override
-      public InputStream createDecompressionStream(InputStream downStream,
-          Decompressor decompressor, int downStreamBufferSize) throws IOException {
-        return createDecompressionStream(downStream, decompressor, downStreamBufferSize,
-            DEFAULT_BUFFER_SIZE, GZ, codec);
-      }
-
-      @Override
-      public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
-          int downStreamBufferSize) throws IOException {
-        return createFinishedOnFlushCompressionStream(downStream, compressor, downStreamBufferSize);
-      }
-
-      @Override
-      public boolean isSupported() {
-        return true;
-      }
-    },
-
-    NONE(COMPRESSION_NONE) {
-      @Override
-      CompressionCodec getCodec() {
-        return null;
-      }
-
-      @Override
-      public InputStream createDecompressionStream(InputStream downStream,
-          Decompressor decompressor, int downStreamBufferSize) {
-        return bufferStream(downStream, downStreamBufferSize);
-      }
-
-      @Override
-      public void initializeDefaultCodec() {}
-
-      @Override
-      protected CompressionCodec createNewCodec(final int bufferSize) {
-        return null;
-      }
-
-      @Override
-      public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
-          int downStreamBufferSize) {
-        return bufferStream(downStream, downStreamBufferSize);
-      }
-
-      @Override
-      public boolean isSupported() {
-        return true;
-      }
-    },
-
-    SNAPPY(COMPRESSION_SNAPPY) {
-
-      /**
-       * The default codec class.
-       */
-      private static final String DEFAULT_CLAZZ = "org.apache.hadoop.io.compress.SnappyCodec";
-
-      /**
-       * Configuration option for LZO buffer size.
-       */
-      private static final String BUFFER_SIZE_OPT = "io.compression.codec.snappy.buffersize";
-
-      /**
-       * Default buffer size.
-       */
-      private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
-
-      /**
-       * Whether or not the codec status has been checked. Ensures the default codec is not
-       * recreated.
-       */
-      private final AtomicBoolean checked = new AtomicBoolean(false);
-
-      private transient CompressionCodec codec = null;
-
-      @Override
-      public CompressionCodec getCodec() {
-        return codec;
-      }
-
-      @Override
-      public void initializeDefaultCodec() {
-        codec = initCodec(checked, DEFAULT_BUFFER_SIZE, codec);
-      }
-
-      /**
-       * Creates a new snappy codec.
-       */
-      @Override
-      protected CompressionCodec createNewCodec(final int bufferSize) {
-        return createNewCodec(CONF_SNAPPY_CLASS, DEFAULT_CLAZZ, bufferSize, BUFFER_SIZE_OPT);
-      }
-
-      @Override
-      public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
-          int downStreamBufferSize) throws IOException {
-        if (!isSupported()) {
-          throw new IOException("SNAPPY codec class not specified. Did you forget to set property "
-              + CONF_SNAPPY_CLASS + "?");
-        }
-        return createFinishedOnFlushCompressionStream(downStream, compressor, downStreamBufferSize);
-      }
-
-      @Override
-      public InputStream createDecompressionStream(InputStream downStream,
-          Decompressor decompressor, int downStreamBufferSize) throws IOException {
-        if (!isSupported()) {
-          throw new IOException("SNAPPY codec class not specified. Did you forget to set property "
-              + CONF_SNAPPY_CLASS + "?");
-        }
-        return createDecompressionStream(downStream, decompressor, downStreamBufferSize,
-            DEFAULT_BUFFER_SIZE, SNAPPY, codec);
-      }
-
-      @Override
-      public boolean isSupported() {
-        return codec != null;
-      }
-    },
-
-    ZSTANDARD(COMPRESSION_ZSTD) {
-
-      /**
-       * The default codec class.
-       */
-      private static final String DEFAULT_CLAZZ = "org.apache.hadoop.io.compress.ZStandardCodec";
-
-      /**
-       * Configuration option for LZO buffer size.
-       */
-      private static final String BUFFER_SIZE_OPT = "io.compression.codec.zstd.buffersize";
-
-      /**
-       * Default buffer size.
-       */
-      private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
-
-      /**
-       * Whether or not the codec status has been checked. Ensures the default codec is not
-       * recreated.
-       */
-      private final AtomicBoolean checked = new AtomicBoolean(false);
-
-      private transient CompressionCodec codec = null;
-
-      @Override
-      public CompressionCodec getCodec() {
-        return codec;
-      }
-
-      @Override
-      public void initializeDefaultCodec() {
-        codec = initCodec(checked, DEFAULT_BUFFER_SIZE, codec);
-      }
-
-      /**
-       * Creates a new ZStandard codec.
-       */
-      @Override
-      protected CompressionCodec createNewCodec(final int bufferSize) {
-        return createNewCodec(CONF_ZSTD_CLASS, DEFAULT_CLAZZ, bufferSize, BUFFER_SIZE_OPT);
-      }
-
-      @Override
-      public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
-          int downStreamBufferSize) throws IOException {
-        if (!isSupported()) {
-          throw new IOException(
-              "ZStandard codec class not specified. Did you forget to set property "
-                  + CONF_ZSTD_CLASS + "?");
-        }
-        return createFinishedOnFlushCompressionStream(downStream, compressor, downStreamBufferSize);
-      }
-
-      @Override
-      public InputStream createDecompressionStream(InputStream downStream,
-          Decompressor decompressor, int downStreamBufferSize) throws IOException {
-        if (!isSupported()) {
-          throw new IOException(
-              "ZStandard codec class not specified. Did you forget to set property "
-                  + CONF_ZSTD_CLASS + "?");
-        }
-        return createDecompressionStream(downStream, decompressor, downStreamBufferSize,
-            DEFAULT_BUFFER_SIZE, ZSTANDARD, codec);
-      }
-
-      @Override
-      public boolean isSupported() {
-        return codec != null;
-      }
-    };
-
-    /**
-     * Guava cache to have a limited factory pattern defined in the Algorithm enum.
-     */
-    private static LoadingCache<Entry<Algorithm,Integer>,CompressionCodec> codecCache =
-        CacheBuilder.newBuilder().maximumSize(25).build(new CacheLoader<>() {
-          @Override
-          public CompressionCodec load(Entry<Algorithm,Integer> key) {
-            return key.getKey().createNewCodec(key.getValue());
-          }
-        });
+  // All compression-related settings are required to be configured statically in the
+  // Configuration object.
+  protected static final Configuration conf = new Configuration();
 
-    public static final String CONF_BZIP2_CLASS = "io.compression.codec.bzip2.class";
-    public static final String CONF_LZO_CLASS = "io.compression.codec.lzo.class";
-    public static final String CONF_LZ4_CLASS = "io.compression.codec.lz4.class";
-    public static final String CONF_SNAPPY_CLASS = "io.compression.codec.snappy.class";
-    public static final String CONF_ZSTD_CLASS = "io.compression.codec.zstd.class";
+  private static final ServiceLoader<CompressionAlgorithmConfiguration> COMPRESSION_ALGORITHMS =
+      ServiceLoader.load(CompressionAlgorithmConfiguration.class);
 
-    // All compression-related settings are required to be configured statically in the
-    // Configuration object.
-    protected static final Configuration conf;
-
-    // The model defined by the static block below creates a singleton for each defined codec in the
-    // Algorithm enumeration. By creating the codecs, each call to isSupported shall return
-    // true/false depending on if the codec singleton is defined. The static initializer, below,
-    // will ensure this occurs when the Enumeration is loaded. Furthermore, calls to getCodec will
-    // return the singleton, whether it is null or not.
-    //
-    // Calls to createCompressionStream and createDecompressionStream may return a different codec
-    // than getCodec, if the incoming downStreamBufferSize is different than the default. In such a
-    // case, we will place the resulting codec into the codecCache, defined below, to ensure we have
-    // cache codecs.
-    //
-    // Since codecs are immutable, there is no concern about concurrent access to the
-    // CompressionCodec objects within the guava cache.
-    static {
-      conf = new Configuration();
-      for (final Algorithm al : Algorithm.values()) {
-        al.initializeDefaultCodec();
-      }
-    }
-
-    // Data input buffer size to absorb small reads from application.
-    private static final int DATA_IBUF_SIZE = 1024;
-
-    // Data output buffer size to absorb small writes from application.
-    private static final int DATA_OBUF_SIZE = 4 * 1024;
-
-    // The name of the compression algorithm.
-    private final String name;
-
-    Algorithm(String name) {
-      this.name = name;
-    }
-
-    public abstract InputStream createDecompressionStream(InputStream downStream,
-        Decompressor decompressor, int downStreamBufferSize) throws IOException;
-
-    public abstract OutputStream createCompressionStream(OutputStream downStream,
-        Compressor compressor, int downStreamBufferSize) throws IOException;
-
-    public abstract boolean isSupported();
-
-    abstract CompressionCodec getCodec();
-
-    /**
-     * Create the default codec object.
-     */
-    abstract void initializeDefaultCodec();
-
-    /**
-     * Shared function to create new codec objects. It is expected that if buffersize is invalid, a
-     * codec will be created with the default buffer size.
-     */
-    abstract CompressionCodec createNewCodec(int bufferSize);
-
-    public Compressor getCompressor() {
-      CompressionCodec codec = getCodec();
-      if (codec != null) {
-        Compressor compressor = CodecPool.getCompressor(codec);
-        if (compressor != null) {
-          if (compressor.finished()) {
-            // Somebody returns the compressor to CodecPool but is still using it.
-            log.warn("Compressor obtained from CodecPool already finished()");
-          } else {
-            log.trace("Got a compressor: {}", compressor.hashCode());
-          }
-          // The following statement is necessary to get around bugs in 0.18 where a compressor is
-          // referenced after it's
-          // returned back to the codec pool.
-          compressor.reset();
-        }
-        return compressor;
-      }
-      return null;
-    }
-
-    public void returnCompressor(final Compressor compressor) {
-      if (compressor != null) {
-        log.trace("Return a compressor: {}", compressor.hashCode());
-        CodecPool.returnCompressor(compressor);
-      }
-    }
-
-    public Decompressor getDecompressor() {
-      CompressionCodec codec = getCodec();
-      if (codec != null) {
-        Decompressor decompressor = CodecPool.getDecompressor(codec);
-        if (decompressor != null) {
-          if (decompressor.finished()) {
-            // Somebody returns the decompressor to CodecPool but is still using it.
-            log.warn("Decompressor obtained from CodecPool already finished()");
-          } else {
-            log.trace("Got a decompressor: {}", decompressor.hashCode());
-          }
-          // The following statement is necessary to get around bugs in 0.18 where a decompressor is
-          // referenced after
-          // it's returned back to the codec pool.
-          decompressor.reset();
-        }
-        return decompressor;
-      }
-      return null;
-    }
-
-    /**
-     * Returns the specified {@link Decompressor} to the codec cache if it is not null.
-     */
-    public void returnDecompressor(final Decompressor decompressor) {
-      if (decompressor != null) {
-        log.trace("Returned a decompressor: {}", decompressor.hashCode());
-        CodecPool.returnDecompressor(decompressor);
-      }
-    }
-
-    /**
-     * Returns the name of the compression algorithm.
-     *
-     * @return the name
-     */
-    public String getName() {
-      return name;
-    }
-
-    /**
-     * Initializes and returns a new codec with the specified buffer size if and only if the
-     * specified {@link AtomicBoolean} has a value of false, or returns the specified original coded
-     * otherwise.
-     */
-    CompressionCodec initCodec(final AtomicBoolean checked, final int bufferSize,
-        final CompressionCodec originalCodec) {
-      if (!checked.get()) {
-        checked.set(true);
-        return createNewCodec(bufferSize);
-      }
-      return originalCodec;
-    }
-
-    /**
-     * Returns a new {@link CompressionCodec} of the specified type, or the default type if no
-     * primary type is specified. If the specified buffer size is greater than 0, the specified
-     * buffer size configuration option will be updated in the codec's configuration with the buffer
-     * size. If the neither the specified codec type or the default codec type can be found, null
-     * will be returned.
-     */
-    CompressionCodec createNewCodec(final String codecClazzProp, final String defaultClazz,
-        final int bufferSize, final String bufferSizeConfigOpt) {
-      String extClazz =
-          (conf.get(codecClazzProp) == null ? System.getProperty(codecClazzProp) : null);
-      String clazz = (extClazz != null) ? extClazz : defaultClazz;
-      try {
-        log.info("Trying to load codec class {} for {}", clazz, codecClazzProp);
-        Configuration config = new Configuration(conf);
-        updateBuffer(config, bufferSizeConfigOpt, bufferSize);
-        return (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), config);
-      } catch (ClassNotFoundException e) {
-        // This is okay.
-      }
-      return null;
-    }
-
-    InputStream createDecompressionStream(final InputStream stream, final Decompressor decompressor,
-        final int bufferSize, final int defaultBufferSize, final Algorithm algorithm,
-        CompressionCodec codec) throws IOException {
-      // If the default buffer size is not being used, pull from the loading cache.
-      if (bufferSize != defaultBufferSize) {
-        Entry<Algorithm,Integer> sizeOpt = Maps.immutableEntry(algorithm, bufferSize);
-        try {
-          codec = codecCache.get(sizeOpt);
-        } catch (ExecutionException e) {
-          throw new IOException(e);
-        }
-      }
-      CompressionInputStream cis = codec.createInputStream(stream, decompressor);
-      return new BufferedInputStream(cis, DATA_IBUF_SIZE);
-    }
-
-    /**
-     * Returns a new {@link FinishOnFlushCompressionStream} initialized for the specified output
-     * stream and compressor.
-     */
-    OutputStream createFinishedOnFlushCompressionStream(final OutputStream downStream,
-        final Compressor compressor, final int downStreamBufferSize) throws IOException {
-      OutputStream out = bufferStream(downStream, downStreamBufferSize);
-      CompressionOutputStream cos = getCodec().createOutputStream(out, compressor);
-      return new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
-    }
-
-    /**
-     * Return the given stream wrapped as a {@link BufferedOutputStream} with the given buffer size
-     * if the buffer size is greater than 0, or return the original stream otherwise.
-     */
-    OutputStream bufferStream(final OutputStream stream, final int bufferSize) {
-      if (bufferSize > 0) {
-        return new BufferedOutputStream(stream, bufferSize);
-      }
-      return stream;
-    }
-
-    /**
-     * Return the given stream wrapped as a {@link BufferedInputStream} with the given buffer size
-     * if the buffer size is greater than 0, or return the original stream otherwise.
-     */
-    InputStream bufferStream(final InputStream stream, final int bufferSize) {
-      if (bufferSize > 0) {
-        return new BufferedInputStream(stream, bufferSize);
-      }
-      return stream;
-    }
-
-    /**
-     * Updates the value of the specified buffer size opt in the given {@link Configuration} if the
-     * new buffer size is greater than 0.
-     */
-    void updateBuffer(final Configuration config, final String bufferSizeOpt,
-        final int bufferSize) {
-      // Use the buffersize only if it is greater than 0, otherwise use the default defined within
-      // the codec.
-      if (bufferSize > 0) {
-        config.setInt(bufferSizeOpt, bufferSize);
-      }
-    }
-  }
+  private static final Map<String,CompressionAlgorithm> CONFIGURED_ALGORITHMS =
+      StreamSupport.stream(COMPRESSION_ALGORITHMS.spliterator(), false)
+          .map(a -> new CompressionAlgorithm(a, conf))
+          .collect(Collectors.toMap(algo -> algo.getName(), algo -> algo));
 
   public static String[] getSupportedAlgorithms() {
-    Algorithm[] algorithms = Algorithm.class.getEnumConstants();
     ArrayList<String> supportedAlgorithms = new ArrayList<>();
-    for (Algorithm algorithm : algorithms) {
-      if (algorithm.isSupported()) {
-        supportedAlgorithms.add(algorithm.getName());
+    CONFIGURED_ALGORITHMS.forEach((k, v) -> {
+      if (v.isSupported()) {
+        supportedAlgorithms.add(k);
       }
-    }
+    });
     return supportedAlgorithms.toArray(new String[0]);
   }
 
-  public static Algorithm getCompressionAlgorithmByName(final String name) {
-    Algorithm[] algorithms = Algorithm.class.getEnumConstants();
-    for (Algorithm algorithm : algorithms) {
-      if (algorithm.getName().equals(name)) {
-        return algorithm;
-      }
+  public static CompressionAlgorithm getCompressionAlgorithmByName(final String name) {
+    CompressionAlgorithm algorithm = CONFIGURED_ALGORITHMS.get(name);
+    if (algorithm != null) {
+      return algorithm;
     }
     throw new IllegalArgumentException("Unsupported compression algorithm name: " + name);
   }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionAlgorithm.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionAlgorithm.java
new file mode 100644
index 0000000..9443348
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionAlgorithm.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.spi.file.rfile.compression.CompressionAlgorithmConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Maps;
+
+/**
+ * There is a static initializer in {@link Compression} that finds all implementations of
+ * {@link CompressionAlgorithmConfiguration} and initializes a {@link CompressionAlgorithm}
+ * instance. This promotes a model of the following call graph of initialization by the static
+ * initializer, followed by calls to {@link #getCodec()},
+ * {@link #createCompressionStream(OutputStream, Compressor, int)}, and
+ * {@link #createDecompressionStream(InputStream, Decompressor, int)}. In some cases, the
+ * compression and decompression call methods will include a different buffer size for the stream.
+ * Note that if the compressed buffer size requested in these calls is zero, we will not set the
+ * buffer size for that algorithm. Instead, we will use the default within the codec.
+ * <p>
+ * The buffer size is configured in the Codec by way of a Hadoop {@link Configuration} reference.
+ * One approach may be to use the same Configuration object, but when calls are made to
+ * {@code createCompressionStream} and {@code createDecompressionStream} with non default buffer
+ * sizes, the configuration object must be changed. In this case, concurrent calls to
+ * {@code createCompressionStream} and {@code createDecompressionStream} would mutate the
+ * configuration object beneath each other, requiring synchronization to avoid undesirable activity
+ * via co-modification. To avoid synchronization entirely, we will create Codecs with their own
+ * Configuration object and cache them for re-use. A default codec will be statically created, as
+ * mentioned above to ensure we always have a codec available at loader initialization.
+ * <p>
+ * There is a Guava cache defined within Algorithm that allows us to cache Codecs for re-use. Since
+ * they will have their own configuration object and thus do not need to be mutable, there is no
+ * concern for using them concurrently; however, the Guava cache exists to ensure a maximal size of
+ * the cache and efficient and concurrent read/write access to the cache itself.
+ * <p>
+ * To provide Algorithm specific details and to describe what is in code:
+ * <p>
+ * LZO will always have the default LZO codec because the buffer size is never overridden within it.
+ * <p>
+ * LZ4 will always have the default LZ4 codec because the buffer size is never overridden within it.
+ * <p>
+ * GZ will use the default GZ codec for the compression stream, but can potentially use a different
+ * codec instance for the decompression stream if the requested buffer size does not match the
+ * default GZ buffer size of 32k.
+ * <p>
+ * Snappy will use the default Snappy codec with the default buffer size of 64k for the compression
+ * stream, but will use a cached codec if the buffer size differs from the default.
+ */
+public class CompressionAlgorithm extends Configured {
+
+  public static class FinishOnFlushCompressionStream extends FilterOutputStream {
+
+    FinishOnFlushCompressionStream(CompressionOutputStream cout) {
+      super(cout);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      out.write(b, off, len);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      CompressionOutputStream cout = (CompressionOutputStream) out;
+      cout.finish();
+      cout.flush();
+      cout.resetState();
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(CompressionAlgorithm.class);
+
+  /**
+   * Guava cache to have a limited factory pattern defined in the Algorithm enum.
+   */
+  private static LoadingCache<Entry<CompressionAlgorithm,Integer>,CompressionCodec> codecCache =
+      CacheBuilder.newBuilder().maximumSize(25).build(new CacheLoader<>() {
+        @Override
+        public CompressionCodec load(Entry<CompressionAlgorithm,Integer> key) {
+          return key.getKey().createNewCodec(key.getValue());
+        }
+      });
+
+  // Data input buffer size to absorb small reads from application.
+  protected static final int DATA_IBUF_SIZE = 1024;
+
+  // Data output buffer size to absorb small writes from application.
+  protected static final int DATA_OBUF_SIZE = 4 * 1024;
+
+  // The name of the compression algorithm.
+  private final CompressionAlgorithmConfiguration algorithm;
+
+  private final AtomicBoolean checked = new AtomicBoolean(false);
+
+  private transient CompressionCodec codec = null;
+
+  public CompressionAlgorithm(CompressionAlgorithmConfiguration algorithm, Configuration conf) {
+    this.algorithm = algorithm;
+    setConf(conf);
+    codec = initCodec(checked, algorithm.getDefaultBufferSize(), codec);
+  }
+
+  /**
+   * Shared function to create new codec objects. It is expected that if buffersize is invalid, a
+   * codec will be created with the default buffer size.
+   */
+  CompressionCodec createNewCodec(int bufferSize) {
+    return createNewCodec(algorithm.getCodecClassNameProperty(), algorithm.getCodecClassName(),
+        bufferSize, algorithm.getBufferSizeProperty());
+  }
+
+  public InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor,
+      int downStreamBufferSize) throws IOException {
+    if (!isSupported()) {
+      throw new IOException("codec class not specified. Did you forget to set property "
+          + algorithm.getCodecClassNameProperty() + "?");
+    }
+    if (algorithm.cacheCodecsWithNonDefaultSizes()) {
+      return createDecompressionStream(downStream, decompressor, downStreamBufferSize,
+          algorithm.getDefaultBufferSize(), this, codec);
+    } else {
+      InputStream bis = bufferStream(downStream, downStreamBufferSize);
+      CompressionInputStream cis = codec.createInputStream(bis, decompressor);
+      return new BufferedInputStream(cis, DATA_IBUF_SIZE);
+    }
+  }
+
+  private InputStream createDecompressionStream(final InputStream stream,
+      final Decompressor decompressor, final int bufferSize, final int defaultBufferSize,
+      final CompressionAlgorithm algorithm, CompressionCodec codec) throws IOException {
+    // If the default buffer size is not being used, pull from the loading cache.
+    if (bufferSize != defaultBufferSize) {
+      Entry<CompressionAlgorithm,Integer> sizeOpt = Maps.immutableEntry(algorithm, bufferSize);
+      try {
+        codec = codecCache.get(sizeOpt);
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+    }
+    CompressionInputStream cis = codec.createInputStream(stream, decompressor);
+    return new BufferedInputStream(cis, DATA_IBUF_SIZE);
+  }
+
+  public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
+      int downStreamBufferSize) throws IOException {
+    if (!isSupported()) {
+      throw new IOException("codec class not specified. Did you forget to set property "
+          + algorithm.getCodecClassNameProperty() + "?");
+    }
+    return createFinishedOnFlushCompressionStream(downStream, compressor, downStreamBufferSize);
+
+  }
+
+  boolean isSupported() {
+    return codec != null;
+  }
+
+  CompressionCodec getCodec() {
+    return codec;
+  }
+
+  public Compressor getCompressor() {
+    CompressionCodec codec = getCodec();
+    if (codec != null) {
+      Compressor compressor = CodecPool.getCompressor(codec);
+      if (compressor != null) {
+        if (compressor.finished()) {
+          // Somebody returns the compressor to CodecPool but is still using it.
+          LOG.warn("Compressor obtained from CodecPool already finished()");
+        } else {
+          LOG.trace("Got a compressor: {}", compressor.hashCode());
+        }
+        // The following statement is necessary to get around bugs in 0.18 where a compressor is
+        // referenced after it's
+        // returned back to the codec pool.
+        compressor.reset();
+      }
+      return compressor;
+    }
+    return null;
+  }
+
+  public void returnCompressor(final Compressor compressor) {
+    if (compressor != null) {
+      LOG.trace("Return a compressor: {}", compressor.hashCode());
+      CodecPool.returnCompressor(compressor);
+    }
+  }
+
+  public Decompressor getDecompressor() {
+    CompressionCodec codec = getCodec();
+    if (codec != null) {
+      Decompressor decompressor = CodecPool.getDecompressor(codec);
+      if (decompressor != null) {
+        if (decompressor.finished()) {
+          // Somebody returns the decompressor to CodecPool but is still using it.
+          LOG.warn("Decompressor obtained from CodecPool already finished()");
+        } else {
+          LOG.trace("Got a decompressor: {}", decompressor.hashCode());
+        }
+        // The following statement is necessary to get around bugs in 0.18 where a decompressor is
+        // referenced after
+        // it's returned back to the codec pool.
+        decompressor.reset();
+      }
+      return decompressor;
+    }
+    return null;
+  }
+
+  /**
+   * Returns the specified {@link Decompressor} to the codec cache if it is not null.
+   */
+  public void returnDecompressor(final Decompressor decompressor) {
+    if (decompressor != null) {
+      LOG.trace("Returned a decompressor: {}", decompressor.hashCode());
+      CodecPool.returnDecompressor(decompressor);
+    }
+  }
+
+  /**
+   * Returns the name of the compression algorithm.
+   *
+   * @return the name
+   */
+  public String getName() {
+    return algorithm.getName();
+  }
+
+  /**
+   * Initializes and returns a new codec with the specified buffer size if and only if the specified
+   * {@link AtomicBoolean} has a value of false, or returns the specified original coded otherwise.
+   */
+  private CompressionCodec initCodec(final AtomicBoolean checked, final int bufferSize,
+      final CompressionCodec originalCodec) {
+    if (!checked.get()) {
+      checked.set(true);
+      return createNewCodec(bufferSize);
+    }
+    return originalCodec;
+  }
+
+  /**
+   * Returns a new {@link CompressionCodec} of the specified type, or the default type if no primary
+   * type is specified. If the specified buffer size is greater than 0, the specified buffer size
+   * configuration option will be updated in the codec's configuration with the buffer size. If the
+   * neither the specified codec type or the default codec type can be found, null will be returned.
+   */
+  private CompressionCodec createNewCodec(final String codecClazzProp, final String defaultClazz,
+      final int bufferSize, final String bufferSizeConfigOpt) {
+    String extClazz = null;
+    if (codecClazzProp != null) {
+      extClazz =
+          (getConf().get(codecClazzProp) == null ? System.getProperty(codecClazzProp) : null);
+    }
+    String clazz = (extClazz != null) ? extClazz : defaultClazz;
+    try {
+      LOG.info("Trying to load codec class {}", clazz);
+      Configuration config = new Configuration(getConf());
+      updateBuffer(config, bufferSizeConfigOpt, bufferSize);
+      return (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), config);
+    } catch (ClassNotFoundException e) {
+      // This is okay.
+    }
+    return null;
+  }
+
+  /**
+   * Returns a new {@link FinishOnFlushCompressionStream} initialized for the specified output
+   * stream and compressor.
+   */
+  private OutputStream createFinishedOnFlushCompressionStream(final OutputStream downStream,
+      final Compressor compressor, final int downStreamBufferSize) throws IOException {
+    OutputStream out = bufferStream(downStream, downStreamBufferSize);
+    CompressionOutputStream cos = getCodec().createOutputStream(out, compressor);
+    return new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
+  }
+
+  /**
+   * Return the given stream wrapped as a {@link BufferedOutputStream} with the given buffer size if
+   * the buffer size is greater than 0, or return the original stream otherwise.
+   */
+  private OutputStream bufferStream(final OutputStream stream, final int bufferSize) {
+    if (bufferSize > 0) {
+      return new BufferedOutputStream(stream, bufferSize);
+    }
+    return stream;
+  }
+
+  /**
+   * Return the given stream wrapped as a {@link BufferedInputStream} with the given buffer size if
+   * the buffer size is greater than 0, or return the original stream otherwise.
+   */
+  private InputStream bufferStream(final InputStream stream, final int bufferSize) {
+    if (bufferSize > 0) {
+      return new BufferedInputStream(stream, bufferSize);
+    }
+    return stream;
+  }
+
+  /**
+   * Updates the value of the specified buffer size opt in the given {@link Configuration} if the
+   * new buffer size is greater than 0.
+   */
+  private void updateBuffer(final Configuration config, final String bufferSizeOpt,
+      final int bufferSize) {
+    // Use the buffersize only if it is greater than 0, otherwise use the default defined within
+    // the codec.
+    if (bufferSize > 0) {
+      config.setInt(bufferSizeOpt, bufferSize);
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/IdentityCodec.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/IdentityCodec.java
new file mode 100644
index 0000000..ad9f58c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/IdentityCodec.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+public class IdentityCodec implements CompressionCodec {
+
+  /*
+   * Copied from org/apache/hadoop/io/compress/FakeCompressor.java
+   */
+  public static class IdentityCompressor implements Compressor {
+
+    private boolean finish;
+    private boolean finished;
+    private int nread;
+    private int nwrite;
+
+    private byte[] userBuf;
+    private int userBufOff;
+    private int userBufLen;
+
+    @Override
+    public int compress(byte[] b, int off, int len) throws IOException {
+      int n = Math.min(len, userBufLen);
+      if (userBuf != null && b != null)
+        System.arraycopy(userBuf, userBufOff, b, off, n);
+      userBufOff += n;
+      userBufLen -= n;
+      nwrite += n;
+
+      if (finish && userBufLen <= 0)
+        finished = true;
+
+      return n;
+    }
+
+    @Override
+    public void end() {
+      // nop
+    }
+
+    @Override
+    public void finish() {
+      finish = true;
+    }
+
+    @Override
+    public boolean finished() {
+      return finished;
+    }
+
+    @Override
+    public long getBytesRead() {
+      return nread;
+    }
+
+    @Override
+    public long getBytesWritten() {
+      return nwrite;
+    }
+
+    @Override
+    public boolean needsInput() {
+      return userBufLen <= 0;
+    }
+
+    @Override
+    public void reset() {
+      finish = false;
+      finished = false;
+      nread = 0;
+      nwrite = 0;
+      userBuf = null;
+      userBufOff = 0;
+      userBufLen = 0;
+    }
+
+    @Override
+    public void setDictionary(byte[] b, int off, int len) {
+      // nop
+    }
+
+    @Override
+    public void setInput(byte[] b, int off, int len) {
+      nread += len;
+      userBuf = b;
+      userBufOff = off;
+      userBufLen = len;
+    }
+
+    @Override
+    public void reinit(Configuration conf) {
+      // nop
+    }
+
+  }
+
+  /*
+   * Copied from org/apache/hadoop/io/compress/FakeDecompressor.java
+   */
+  public static class IdentityDecompressor implements Decompressor {
+
+    private boolean finish;
+    private boolean finished;
+    private int nread;
+    private int nwrite;
+
+    private byte[] userBuf;
+    private int userBufOff;
+    private int userBufLen;
+
+    @Override
+    public int decompress(byte[] b, int off, int len) throws IOException {
+      int n = Math.min(len, userBufLen);
+      if (userBuf != null && b != null)
+        System.arraycopy(userBuf, userBufOff, b, off, n);
+      userBufOff += n;
+      userBufLen -= n;
+      nwrite += n;
+
+      if (finish && userBufLen <= 0)
+        finished = true;
+
+      return n;
+    }
+
+    @Override
+    public void end() {
+      // nop
+    }
+
+    @Override
+    public boolean finished() {
+      return finished;
+    }
+
+    public long getBytesRead() {
+      return nread;
+    }
+
+    public long getBytesWritten() {
+      return nwrite;
+    }
+
+    @Override
+    public boolean needsDictionary() {
+      return false;
+    }
+
+    @Override
+    public boolean needsInput() {
+      return userBufLen <= 0;
+    }
+
+    @Override
+    public void reset() {
+      finish = false;
+      finished = false;
+      nread = 0;
+      nwrite = 0;
+      userBuf = null;
+      userBufOff = 0;
+      userBufLen = 0;
+    }
+
+    @Override
+    public void setDictionary(byte[] b, int off, int len) {
+      // nop
+    }
+
+    @Override
+    public void setInput(byte[] b, int off, int len) {
+      nread += len;
+      userBuf = b;
+      userBufOff = off;
+      userBufLen = len;
+    }
+
+    @Override
+    public int getRemaining() {
+      return 0;
+    }
+
+  }
+
+  public static class IdentityCompressionInputStream extends CompressionInputStream {
+
+    protected IdentityCompressionInputStream(InputStream in) throws IOException {
+      super(in);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      return in.read(b, off, len);
+    }
+
+    @Override
+    public void resetState() throws IOException {}
+
+    @Override
+    public int read() throws IOException {
+      return in.read();
+    }
+
+  }
+
+  public static class IdentityCompressionOutputStream extends CompressionOutputStream {
+
+    public IdentityCompressionOutputStream(OutputStream out) {
+      super(out);
+    }
+
+    @Override
+    public void finish() throws IOException {}
+
+    @Override
+    public void resetState() throws IOException {}
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      out.write(b, off, len);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      out.write(b);
+    }
+  }
+
+  @Override
+  public Compressor createCompressor() {
+    return new IdentityCompressor();
+  }
+
+  @Override
+  public Decompressor createDecompressor() {
+    return new IdentityDecompressor();
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in) throws IOException {
+    return new IdentityCompressionInputStream(in);
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
+      throws IOException {
+    return new IdentityCompressionInputStream(in);
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream os) throws IOException {
+    return new IdentityCompressionOutputStream(os);
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream os, Compressor c)
+      throws IOException {
+    return new IdentityCompressionOutputStream(os);
+  }
+
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    return IdentityCompressor.class;
+  }
+
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    return IdentityDecompressor.class;
+  }
+
+  @Override
+  public String getDefaultExtension() {
+    return ".identity";
+  }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/Bzip2.java b/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/Bzip2.java
new file mode 100644
index 0000000..8f53f97
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/Bzip2.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.file.rfile.compression;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(CompressionAlgorithmConfiguration.class)
+public class Bzip2 implements CompressionAlgorithmConfiguration {
+
+  @Override
+  public String getName() {
+    return "bzip2";
+  }
+
+  @Override
+  public String getCodecClassName() {
+    return "org.apache.hadoop.io.compress.BZip2Codec";
+  }
+
+  @Override
+  public String getCodecClassNameProperty() {
+    return "io.compression.codec.bzip2.class";
+  }
+
+  @Override
+  public int getDefaultBufferSize() {
+    return 64 * 1024;
+  }
+
+  @Override
+  public String getBufferSizeProperty() {
+    return "io.file.buffer.size";
+  }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/CompressionAlgorithmConfiguration.java b/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/CompressionAlgorithmConfiguration.java
new file mode 100644
index 0000000..669a034
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/CompressionAlgorithmConfiguration.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.file.rfile.compression;
+
+public interface CompressionAlgorithmConfiguration {
+
+  /**
+   * @return algorithm alias
+   */
+  String getName();
+
+  /**
+   * @return name of property that can be specified in configuration or in system properties to
+   *         override class name of codec
+   */
+  String getCodecClassName();
+
+  /**
+   * @return fully qualified class name of codec
+   */
+  String getCodecClassNameProperty();
+
+  /**
+   * @return default buffer size for compression algorithm
+   */
+  int getDefaultBufferSize();
+
+  /**
+   * @return name of property that can be specified in configuration or in system properties to
+   *         override default buffer size
+   */
+  String getBufferSizeProperty();
+
+  /**
+   * @return true if codecs with non-default buffer sizes should be cached
+   */
+  default boolean cacheCodecsWithNonDefaultSizes() {
+    return false;
+  }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/Gz.java b/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/Gz.java
new file mode 100644
index 0000000..220eb00
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/Gz.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.file.rfile.compression;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(CompressionAlgorithmConfiguration.class)
+public class Gz implements CompressionAlgorithmConfiguration {
+
+  @Override
+  public String getName() {
+    return "gz";
+  }
+
+  @Override
+  public String getCodecClassName() {
+    return "org.apache.hadoop.io.compress.DefaultCodec";
+  }
+
+  @Override
+  public String getCodecClassNameProperty() {
+    return null;
+  }
+
+  @Override
+  public int getDefaultBufferSize() {
+    return 32 * 1024;
+  }
+
+  @Override
+  public String getBufferSizeProperty() {
+    return "io.file.buffer.size";
+  }
+
+  @Override
+  public boolean cacheCodecsWithNonDefaultSizes() {
+    return true;
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/Lz4.java b/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/Lz4.java
new file mode 100644
index 0000000..7ee111c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/Lz4.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.file.rfile.compression;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(CompressionAlgorithmConfiguration.class)
+public class Lz4 implements CompressionAlgorithmConfiguration {
+
+  @Override
+  public String getName() {
+    return "lz4";
+  }
+
+  @Override
+  public String getCodecClassName() {
+    return "org.apache.hadoop.io.compress.Lz4Codec";
+  }
+
+  @Override
+  public String getCodecClassNameProperty() {
+    return "io.compression.codec.lz4.class";
+  }
+
+  @Override
+  public int getDefaultBufferSize() {
+    return 256 * 1024;
+  }
+
+  @Override
+  public String getBufferSizeProperty() {
+    return "io.compression.codec.lz4.buffersize";
+  }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/Lzo.java b/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/Lzo.java
new file mode 100644
index 0000000..ebacaa9
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/Lzo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.file.rfile.compression;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(CompressionAlgorithmConfiguration.class)
+public class Lzo implements CompressionAlgorithmConfiguration {
+
+  @Override
+  public String getName() {
+    return "lzo";
+  }
+
+  @Override
+  public String getCodecClassName() {
+    return "org.apache.hadoop.io.compress.LzoCodec";
+  }
+
+  @Override
+  public String getCodecClassNameProperty() {
+    return "io.compression.codec.lzo.class";
+  }
+
+  @Override
+  public int getDefaultBufferSize() {
+    return 64 * 1024;
+  }
+
+  @Override
+  public String getBufferSizeProperty() {
+    return "io.compression.codec.lzo.buffersize";
+  }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/NoCompression.java b/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/NoCompression.java
new file mode 100644
index 0000000..fcbae48
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/NoCompression.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.file.rfile.compression;
+
+import org.apache.accumulo.core.file.rfile.bcfile.IdentityCodec;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(CompressionAlgorithmConfiguration.class)
+public class NoCompression implements CompressionAlgorithmConfiguration {
+
+  @Override
+  public String getName() {
+    return "none";
+  }
+
+  @Override
+  public String getCodecClassName() {
+    return IdentityCodec.class.getName();
+  }
+
+  @Override
+  public String getCodecClassNameProperty() {
+    return null;
+  }
+
+  @Override
+  public int getDefaultBufferSize() {
+    return 32 * 1024;
+  }
+
+  @Override
+  public String getBufferSizeProperty() {
+    return "io.file.buffer.size";
+  }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/Snappy.java b/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/Snappy.java
new file mode 100644
index 0000000..78e3738
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/Snappy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.file.rfile.compression;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(CompressionAlgorithmConfiguration.class)
+public class Snappy implements CompressionAlgorithmConfiguration {
+
+  @Override
+  public String getName() {
+    return "snappy";
+  }
+
+  @Override
+  public String getCodecClassName() {
+    return "org.apache.hadoop.io.compress.SnappyCodec";
+  }
+
+  @Override
+  public String getCodecClassNameProperty() {
+    return "io.compression.codec.snappy.class";
+  }
+
+  @Override
+  public int getDefaultBufferSize() {
+    return 64 * 1024;
+  }
+
+  @Override
+  public String getBufferSizeProperty() {
+    return "io.compression.codec.snappy.buffersize";
+  }
+
+  @Override
+  public boolean cacheCodecsWithNonDefaultSizes() {
+    return true;
+  }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/ZStandard.java b/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/ZStandard.java
new file mode 100644
index 0000000..2ef1542
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/spi/file/rfile/compression/ZStandard.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.file.rfile.compression;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(CompressionAlgorithmConfiguration.class)
+public class ZStandard implements CompressionAlgorithmConfiguration {
+
+  @Override
+  public String getName() {
+    return "zstd";
+  }
+
+  @Override
+  public String getCodecClassName() {
+    return "org.apache.hadoop.io.compress.ZStandardCodec";
+  }
+
+  @Override
+  public String getCodecClassNameProperty() {
+    return "io.compression.codec.zstd.class";
+  }
+
+  @Override
+  public int getDefaultBufferSize() {
+    return 64 * 1024;
+  }
+
+  @Override
+  public String getBufferSizeProperty() {
+    return "io.compression.codec.zstd.buffersize";
+  }
+
+  @Override
+  public boolean cacheCodecsWithNonDefaultSizes() {
+    return true;
+  }
+
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
index 518e878..98f1201 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/bcfile/CompressionTest.java
@@ -33,7 +33,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+import org.apache.accumulo.core.spi.file.rfile.compression.Bzip2;
+import org.apache.accumulo.core.spi.file.rfile.compression.Gz;
+import org.apache.accumulo.core.spi.file.rfile.compression.Lz4;
+import org.apache.accumulo.core.spi.file.rfile.compression.Lzo;
+import org.apache.accumulo.core.spi.file.rfile.compression.Snappy;
+import org.apache.accumulo.core.spi.file.rfile.compression.ZStandard;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -43,79 +48,85 @@ import org.junit.jupiter.api.Timeout;
 
 public class CompressionTest {
 
-  HashMap<Compression.Algorithm,Boolean> isSupported = new HashMap<>();
+  HashMap<CompressionAlgorithm,Boolean> isSupported = new HashMap<>();
 
   @BeforeEach
-  public void testSupport() {
-    // we can safely assert that GZ exists by virtue of it being the DefaultCodec
-    isSupported.put(Compression.Algorithm.GZ, true);
-
+  public void testSupport() throws ClassNotFoundException {
     Configuration myConf = new Configuration();
 
-    String extClazz = System.getProperty(Compression.Algorithm.CONF_LZO_CLASS);
-    String clazz = (extClazz != null) ? extClazz : "org.apache.hadoop.io.compress.LzoCodec";
+    Gz gz = new Gz();
+    String extClazz = gz.getCodecClassNameProperty();
+    String clazz = (extClazz != null) ? extClazz : gz.getCodecClassName();
+    CompressionCodec codec =
+        (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+
+    assertNotNull(codec);
+    isSupported.put(new CompressionAlgorithm(gz, myConf), true);
+
+    Lzo lzo = new Lzo();
+    extClazz = lzo.getCodecClassNameProperty();
+    clazz = (extClazz != null) ? extClazz : lzo.getCodecClassName();
     try {
-      CompressionCodec codec =
-          (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+      codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
 
       assertNotNull(codec);
-      isSupported.put(Compression.Algorithm.LZO, true);
+      isSupported.put(new CompressionAlgorithm(lzo, myConf), true);
 
     } catch (ClassNotFoundException e) {
       // that is okay
     }
 
-    extClazz = System.getProperty(Compression.Algorithm.CONF_LZ4_CLASS);
-    clazz = (extClazz != null) ? extClazz : "org.apache.hadoop.io.compress.Lz4Codec";
+    Lz4 lz4 = new Lz4();
+    extClazz = lz4.getCodecClassNameProperty();
+    clazz = (extClazz != null) ? extClazz : lz4.getCodecClassName();
     try {
-      CompressionCodec codec =
-          (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+      codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
 
       assertNotNull(codec);
 
-      isSupported.put(Compression.Algorithm.LZ4, true);
+      isSupported.put(new CompressionAlgorithm(lz4, myConf), true);
 
     } catch (ClassNotFoundException e) {
       // that is okay
     }
 
-    extClazz = System.getProperty(Compression.Algorithm.CONF_BZIP2_CLASS);
-    clazz = (extClazz != null) ? extClazz : "org.apache.hadoop.io.compress.BZip2Codec";
+    Bzip2 bzip = new Bzip2();
+    extClazz = bzip.getCodecClassNameProperty();
+    clazz = (extClazz != null) ? extClazz : bzip.getCodecClassName();
     try {
-      CompressionCodec codec =
-          (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+      codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
 
       assertNotNull(codec);
 
-      isSupported.put(Compression.Algorithm.BZIP2, true);
+      isSupported.put(new CompressionAlgorithm(bzip, myConf), true);
 
     } catch (ClassNotFoundException e) {
       // that is okay
     }
 
-    extClazz = System.getProperty(Compression.Algorithm.CONF_SNAPPY_CLASS);
-    clazz = (extClazz != null) ? extClazz : "org.apache.hadoop.io.compress.SnappyCodec";
+    Snappy snappy = new Snappy();
+    extClazz = snappy.getCodecClassNameProperty();
+    clazz = (extClazz != null) ? extClazz : snappy.getCodecClassName();
     try {
-      CompressionCodec codec =
-          (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+      codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
 
       assertNotNull(codec);
 
-      isSupported.put(Compression.Algorithm.SNAPPY, true);
+      isSupported.put(new CompressionAlgorithm(snappy, myConf), true);
 
     } catch (ClassNotFoundException e) {
       // that is okay
     }
 
-    extClazz = System.getProperty(Compression.Algorithm.CONF_ZSTD_CLASS);
-    clazz = (extClazz != null) ? extClazz : "org.apache.hadoop.io.compress.ZStandardCodec";
+    ZStandard zstd = new ZStandard();
+    extClazz = zstd.getCodecClassNameProperty();
+    clazz = (extClazz != null) ? extClazz : zstd.getCodecClassName();
     try {
-      CompressionCodec codec =
-          (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
+      codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
 
       assertNotNull(codec);
 
-      isSupported.put(Compression.Algorithm.ZSTANDARD, true);
+      isSupported.put(new CompressionAlgorithm(zstd, myConf), true);
 
     } catch (ClassNotFoundException e) {
       // that is okay
@@ -126,7 +137,8 @@ public class CompressionTest {
   @Test
   public void testSingle() {
 
-    for (final Algorithm al : Algorithm.values()) {
+    for (final String name : Compression.getSupportedAlgorithms()) {
+      CompressionAlgorithm al = Compression.getCompressionAlgorithmByName(name);
       if (isSupported.get(al) != null && isSupported.get(al)) {
 
         // first call to isSupported should be true
@@ -142,7 +154,8 @@ public class CompressionTest {
   @Test
   public void testSingleNoSideEffect() {
 
-    for (final Algorithm al : Algorithm.values()) {
+    for (final String name : Compression.getSupportedAlgorithms()) {
+      CompressionAlgorithm al = Compression.getCompressionAlgorithmByName(name);
       if (isSupported.get(al) != null && isSupported.get(al)) {
 
         assertTrue(al.isSupported(), al + " is not supported, but should be");
@@ -162,7 +175,8 @@ public class CompressionTest {
   @Timeout(60)
   public void testManyStartNotNull() throws InterruptedException, ExecutionException {
 
-    for (final Algorithm al : Algorithm.values()) {
+    for (final String name : Compression.getSupportedAlgorithms()) {
+      CompressionAlgorithm al = Compression.getCompressionAlgorithmByName(name);
       if (isSupported.get(al) != null && isSupported.get(al)) {
 
         // first call to isSupported should be true
@@ -205,7 +219,8 @@ public class CompressionTest {
   @Timeout(60)
   public void testManyDontStartUntilThread() throws InterruptedException, ExecutionException {
 
-    for (final Algorithm al : Algorithm.values()) {
+    for (final String name : Compression.getSupportedAlgorithms()) {
+      CompressionAlgorithm al = Compression.getCompressionAlgorithmByName(name);
       if (isSupported.get(al) != null && isSupported.get(al)) {
 
         // first call to isSupported should be true
@@ -242,7 +257,8 @@ public class CompressionTest {
   @Timeout(60)
   public void testThereCanBeOnlyOne() throws InterruptedException, ExecutionException {
 
-    for (final Algorithm al : Algorithm.values()) {
+    for (final String name : Compression.getSupportedAlgorithms()) {
+      CompressionAlgorithm al = Compression.getCompressionAlgorithmByName(name);
       if (isSupported.get(al) != null && isSupported.get(al)) {
 
         // first call to isSupported should be true
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index 0cd265d..415b7ed 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@ -53,6 +53,7 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.file.rfile.bcfile.Compression;
+import org.apache.accumulo.core.file.rfile.bcfile.CompressionAlgorithm;
 import org.apache.accumulo.core.file.rfile.bcfile.Utils;
 import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
 import org.apache.accumulo.core.util.Pair;
@@ -1141,7 +1142,7 @@ public class SortedLogRecoveryTest {
    */
   private final Utils.Version API_VERSION_3 = new Utils.Version((short) 3, (short) 0);
 
-  private Compression.Algorithm getCompressionFromRFile(FSDataInputStream fsin, long fileLength)
+  private CompressionAlgorithm getCompressionFromRFile(FSDataInputStream fsin, long fileLength)
       throws IOException {
     try (var in = new SeekableDataInputStream(fsin)) {
       int magicNumberSize = 16; // BCFile.Magic.size();