You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2019/07/16 19:01:12 UTC

[accumulo] branch master updated: Fix #483 Refactor Compression.java removing duplicate code (#1271)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 2610d55  Fix #483 Refactor Compression.java removing duplicate code (#1271)
2610d55 is described below

commit 2610d553bd1402a2e2414fb9f8c0482958425dd1
Author: Laura Schanno <lb...@gmail.com>
AuthorDate: Tue Jul 16 15:01:07 2019 -0400

    Fix #483 Refactor Compression.java removing duplicate code (#1271)
    
    Refactor Compression.java in order to remove duplicate code and
    restructure the class to make it easier to read overall.
---
 .../core/file/rfile/bcfile/Compression.java        | 616 ++++++++++-----------
 1 file changed, 294 insertions(+), 322 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
index 6c13240..9b0b580 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
@@ -48,17 +48,19 @@ import com.google.common.collect.Maps;
  * Compression related stuff.
  */
 public final class Compression {
-  static final Logger log = LoggerFactory.getLogger(Compression.class);
+
+  private static final Logger log = LoggerFactory.getLogger(Compression.class);
 
   /**
-   * Prevent the instantiation of class.
+   * Prevent the instantiation of this class.
    */
   private Compression() {
-    // nothing
+    throw new UnsupportedOperationException();
   }
 
   static class FinishOnFlushCompressionStream extends FilterOutputStream {
-    public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
+
+    FinishOnFlushCompressionStream(CompressionOutputStream cout) {
       super(cout);
     }
 
@@ -76,73 +78,97 @@ public final class Compression {
     }
   }
 
-  /** compression: zStandard */
+  /**
+   * Compression: zStandard
+   */
   public static final String COMPRESSION_ZSTD = "zstd";
-  /** snappy codec **/
+
+  /**
+   * Compression: snappy
+   **/
   public static final String COMPRESSION_SNAPPY = "snappy";
-  /** compression: gzip */
+
+  /**
+   * Compression: gzip
+   */
   public static final String COMPRESSION_GZ = "gz";
-  /** compression: lzo */
+
+  /**
+   * Compression: lzo
+   */
   public static final String COMPRESSION_LZO = "lzo";
-  /** compression: none */
+
+  /**
+   * compression: none
+   */
   public static final String COMPRESSION_NONE = "none";
 
   /**
    * Compression algorithms. There is a static initializer, below the values defined in the
-   * enumeration, that calls the initializer of all defined codecs within the Algorithm enum. This
+   * enumeration, that calls the initializer of all defined codecs within {@link Algorithm}. This
    * promotes a model of the following call graph of initialization by the static initializer,
-   * followed by calls to getCodec() and createCompressionStream/DecompressionStream. In some cases,
-   * the compression and decompression call methods will include a different buffer size for the
-   * stream. Note that if the compressed buffer size requested in these calls is zero, we will not
-   * set the buffer size for that algorithm. Instead, we will use the default within the codec.
-   *
-   * The buffer size is configured in the Codec by way of a Hadoop Configuration reference. One
-   * approach may be to use the same Configuration object, but when calls are made to
-   * createCompressionStream and DecompressionStream, with non default buffer sizes, the
-   * configuration object must be changed. In this case, concurrent calls to createCompressionStream
-   * and DecompressionStream would mutate the configuration object beneath each other, requiring
-   * synchronization to avoid undesirable activity via co-modification. To avoid synchronization
-   * entirely, we will create Codecs with their own Configuration object and cache them for re-use.
-   * A default codec will be statically created, as mentioned above to ensure we always have a codec
-   * available at loader initialization.
-   *
+   * followed by calls to {@link #getCodec()},
+   * {@link #createCompressionStream(OutputStream, Compressor, int)}, and
+   * {@link #createDecompressionStream(InputStream, Decompressor, int)}. In some cases, the
+   * compression and decompression call methods will include a different buffer size for the stream.
+   * Note that if the compressed buffer size requested in these calls is zero, we will not set the
+   * buffer size for that algorithm. Instead, we will use the default within the codec.
+   * <p>
+   * The buffer size is configured in the Codec by way of a Hadoop {@link Configuration} reference.
+   * One approach may be to use the same Configuration object, but when calls are made to
+   * {@code createCompressionStream} and {@code createDecompressionStream} with non default buffer
+   * sizes, the configuration object must be changed. In this case, concurrent calls to
+   * {@code createCompressionStream} and {@code createDecompressionStream} would mutate the
+   * configuration object beneath each other, requiring synchronization to avoid undesirable
+   * activity via co-modification. To avoid synchronization entirely, we will create Codecs with
+   * their own Configuration object and cache them for re-use. A default codec will be statically
+   * created, as mentioned above to ensure we always have a codec available at loader
+   * initialization.
+   * <p>
    * There is a Guava cache defined within Algorithm that allows us to cache Codecs for re-use.
    * Since they will have their own configuration object and thus do not need to be mutable, there
    * is no concern for using them concurrently; however, the Guava cache exists to ensure a maximal
    * size of the cache and efficient and concurrent read/write access to the cache itself.
-   *
+   * <p>
    * To provide Algorithm specific details and to describe what is in code:
-   *
+   * <p>
    * LZO will always have the default LZO codec because the buffer size is never overridden within
    * it.
-   *
+   * <p>
    * GZ will use the default GZ codec for the compression stream, but can potentially use a
    * different codec instance for the decompression stream if the requested buffer size does not
    * match the default GZ buffer size of 32k.
-   *
+   * <p>
    * Snappy will use the default Snappy codec with the default buffer size of 64k for the
    * compression stream, but will use a cached codec if the buffer size differs from the default.
    */
-  public static enum Algorithm {
+  public enum Algorithm {
 
     LZO(COMPRESSION_LZO) {
+
       /**
-       * determines if we've checked the codec status. ensures we don't recreate the default codec
+       * The default codec class.
        */
-      private final AtomicBoolean checked = new AtomicBoolean(false);
-      private static final String defaultClazz = "org.apache.hadoop.io.compress.LzoCodec";
-      private transient CompressionCodec codec = null;
+      private static final String DEFAULT_CLAZZ = "org.apache.hadoop.io.compress.LzoCodec";
 
       /**
-       * Configuration option for LZO buffer size
+       * Configuration option for LZO buffer size.
        */
       private static final String BUFFER_SIZE_OPT = "io.compression.codec.lzo.buffersize";
 
       /**
-       * Default buffer size
+       * Default buffer size.
        */
       private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
 
+      /**
+       * Whether or not the codec status has been checked. Ensures the default codec is not
+       * recreated.
+       */
+      private final AtomicBoolean checked = new AtomicBoolean(false);
+
+      private transient CompressionCodec codec = null;
+
       @Override
       public boolean isSupported() {
         return codec != null;
@@ -150,29 +176,12 @@ public final class Compression {
 
       @Override
       public void initializeDefaultCodec() {
-        if (!checked.get()) {
-          checked.set(true);
-          codec = createNewCodec(DEFAULT_BUFFER_SIZE);
-        }
+        codec = initCodec(checked, DEFAULT_BUFFER_SIZE, codec);
       }
 
       @Override
       CompressionCodec createNewCodec(int bufferSize) {
-        String extClazz =
-            (conf.get(CONF_LZO_CLASS) == null ? System.getProperty(CONF_LZO_CLASS) : null);
-        String clazz = (extClazz != null) ? extClazz : defaultClazz;
-        try {
-          log.info("Trying to load Lzo codec class: {}", clazz);
-          Configuration myConf = new Configuration(conf);
-          // only use the buffersize if > 0, otherwise we'll use
-          // the default defined within the codec
-          if (bufferSize > 0)
-            myConf.setInt(BUFFER_SIZE_OPT, bufferSize);
-          return (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
-        } catch (ClassNotFoundException e) {
-          // that is okay
-        }
-        return null;
+        return createNewCodec(CONF_LZO_CLASS, DEFAULT_CLAZZ, bufferSize, BUFFER_SIZE_OPT);
       }
 
       @Override
@@ -187,13 +196,8 @@ public final class Compression {
           throw new IOException("LZO codec class not specified. Did you forget to set property "
               + CONF_LZO_CLASS + "?");
         }
-        InputStream bis1 = null;
-        if (downStreamBufferSize > 0) {
-          bis1 = new BufferedInputStream(downStream, downStreamBufferSize);
-        } else {
-          bis1 = downStream;
-        }
-        CompressionInputStream cis = codec.createInputStream(bis1, decompressor);
+        InputStream bis = bufferStream(downStream, downStreamBufferSize);
+        CompressionInputStream cis = codec.createInputStream(bis, decompressor);
         return new BufferedInputStream(cis, DATA_IBUF_SIZE);
       }
 
@@ -204,14 +208,7 @@ public final class Compression {
           throw new IOException("LZO codec class not specified. Did you forget to set property "
               + CONF_LZO_CLASS + "?");
         }
-        OutputStream bos1 = null;
-        if (downStreamBufferSize > 0) {
-          bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
-        } else {
-          bos1 = downStream;
-        }
-        CompressionOutputStream cos = codec.createOutputStream(bos1, compressor);
-        return new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
+        return createFinishedOnFlushCompressionStream(downStream, compressor, downStreamBufferSize);
       }
 
     },
@@ -241,54 +238,28 @@ public final class Compression {
       }
 
       /**
-       * Create a new GZ codec
-       *
-       * @param bufferSize
-       *          buffer size to for GZ
-       * @return created codec
+       * Creates a new GZ codec
        */
       @Override
       protected CompressionCodec createNewCodec(final int bufferSize) {
-        DefaultCodec myCodec = new DefaultCodec();
-        Configuration myConf = new Configuration(conf);
-        // only use the buffersize if > 0, otherwise we'll use
-        // the default defined within the codec
-        if (bufferSize > 0)
-          myConf.setInt(BUFFER_SIZE_OPT, bufferSize);
-        myCodec.setConf(myConf);
-        return myCodec;
+        Configuration newConfig = new Configuration(conf);
+        updateBuffer(conf, BUFFER_SIZE_OPT, bufferSize);
+        DefaultCodec newCodec = new DefaultCodec();
+        newCodec.setConf(newConfig);
+        return newCodec;
       }
 
       @Override
       public InputStream createDecompressionStream(InputStream downStream,
           Decompressor decompressor, int downStreamBufferSize) throws IOException {
-        // Set the internal buffer size to read from down stream.
-        CompressionCodec decomCodec = codec;
-        // if we're not using the default, let's pull from the loading cache
-        if (downStreamBufferSize != DEFAULT_BUFFER_SIZE) {
-          Entry<Algorithm,Integer> sizeOpt = Maps.immutableEntry(GZ, downStreamBufferSize);
-          try {
-            decomCodec = codecCache.get(sizeOpt);
-          } catch (ExecutionException e) {
-            throw new IOException(e);
-          }
-        }
-        CompressionInputStream cis = decomCodec.createInputStream(downStream, decompressor);
-        return new BufferedInputStream(cis, DATA_IBUF_SIZE);
+        return createDecompressionStream(downStream, decompressor, downStreamBufferSize,
+            DEFAULT_BUFFER_SIZE, GZ, codec);
       }
 
       @Override
       public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
           int downStreamBufferSize) throws IOException {
-        OutputStream bos1 = null;
-        if (downStreamBufferSize > 0) {
-          bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
-        } else {
-          bos1 = downStream;
-        }
-        // always uses the default buffer size
-        CompressionOutputStream cos = codec.createOutputStream(bos1, compressor);
-        return new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
+        return createFinishedOnFlushCompressionStream(downStream, compressor, downStreamBufferSize);
       }
 
       @Override
@@ -306,16 +277,11 @@ public final class Compression {
       @Override
       public InputStream createDecompressionStream(InputStream downStream,
           Decompressor decompressor, int downStreamBufferSize) {
-        if (downStreamBufferSize > 0) {
-          return new BufferedInputStream(downStream, downStreamBufferSize);
-        }
-        return downStream;
+        return bufferStream(downStream, downStreamBufferSize);
       }
 
       @Override
-      public void initializeDefaultCodec() {
-
-      }
+      public void initializeDefaultCodec() {}
 
       @Override
       protected CompressionCodec createNewCodec(final int bufferSize) {
@@ -325,11 +291,7 @@ public final class Compression {
       @Override
       public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
           int downStreamBufferSize) {
-        if (downStreamBufferSize > 0) {
-          return new BufferedOutputStream(downStream, downStreamBufferSize);
-        }
-
-        return downStream;
+        return bufferStream(downStream, downStreamBufferSize);
       }
 
       @Override
@@ -339,85 +301,56 @@ public final class Compression {
     },
 
     SNAPPY(COMPRESSION_SNAPPY) {
-      // Use base type to avoid compile-time dependencies.
-      private transient CompressionCodec snappyCodec = null;
+
       /**
-       * determines if we've checked the codec status. ensures we don't recreate the default codec
+       * The default codec class.
        */
-      private final AtomicBoolean checked = new AtomicBoolean(false);
-      private static final String defaultClazz = "org.apache.hadoop.io.compress.SnappyCodec";
+      private static final String DEFAULT_CLAZZ = "org.apache.hadoop.io.compress.SnappyCodec";
 
       /**
-       * Buffer size option
+       * Configuration option for LZO buffer size.
        */
       private static final String BUFFER_SIZE_OPT = "io.compression.codec.snappy.buffersize";
 
       /**
-       * Default buffer size value
+       * Default buffer size.
        */
       private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
 
+      /**
+       * Whether or not the codec status has been checked. Ensures the default codec is not
+       * recreated.
+       */
+      private final AtomicBoolean checked = new AtomicBoolean(false);
+
+      private transient CompressionCodec codec = null;
+
       @Override
       public CompressionCodec getCodec() {
-        return snappyCodec;
+        return codec;
       }
 
       @Override
       public void initializeDefaultCodec() {
-        if (!checked.get()) {
-          checked.set(true);
-          snappyCodec = createNewCodec(DEFAULT_BUFFER_SIZE);
-        }
+        codec = initCodec(checked, DEFAULT_BUFFER_SIZE, codec);
       }
 
       /**
        * Creates a new snappy codec.
-       *
-       * @param bufferSize
-       *          incoming buffer size
-       * @return new codec or null, depending on if installed
        */
       @Override
       protected CompressionCodec createNewCodec(final int bufferSize) {
-
-        String extClazz =
-            (conf.get(CONF_SNAPPY_CLASS) == null ? System.getProperty(CONF_SNAPPY_CLASS) : null);
-        String clazz = (extClazz != null) ? extClazz : defaultClazz;
-        try {
-          log.info("Trying to load snappy codec class: {}", clazz);
-
-          Configuration myConf = new Configuration(conf);
-          // only use the buffersize if > 0, otherwise we'll use
-          // the default defined within the codec
-          if (bufferSize > 0)
-            myConf.setInt(BUFFER_SIZE_OPT, bufferSize);
-
-          return (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
-
-        } catch (ClassNotFoundException e) {
-          // that is okay
-        }
-
-        return null;
+        return createNewCodec(CONF_SNAPPY_CLASS, DEFAULT_CLAZZ, bufferSize, BUFFER_SIZE_OPT);
       }
 
       @Override
       public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
           int downStreamBufferSize) throws IOException {
-
         if (!isSupported()) {
           throw new IOException("SNAPPY codec class not specified. Did you forget to set property "
               + CONF_SNAPPY_CLASS + "?");
         }
-        OutputStream bos1 = null;
-        if (downStreamBufferSize > 0) {
-          bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
-        } else {
-          bos1 = downStream;
-        }
-        // use the default codec
-        CompressionOutputStream cos = snappyCodec.createOutputStream(bos1, compressor);
-        return new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
+        return createFinishedOnFlushCompressionStream(downStream, compressor, downStreamBufferSize);
       }
 
       @Override
@@ -427,112 +360,68 @@ public final class Compression {
           throw new IOException("SNAPPY codec class not specified. Did you forget to set property "
               + CONF_SNAPPY_CLASS + "?");
         }
-
-        CompressionCodec decomCodec = snappyCodec;
-        // if we're not using the same buffer size, we'll pull the codec from the loading cache
-        if (downStreamBufferSize != DEFAULT_BUFFER_SIZE) {
-          Entry<Algorithm,Integer> sizeOpt = Maps.immutableEntry(SNAPPY, downStreamBufferSize);
-          try {
-            decomCodec = codecCache.get(sizeOpt);
-          } catch (ExecutionException e) {
-            throw new IOException(e);
-          }
-        }
-
-        CompressionInputStream cis = decomCodec.createInputStream(downStream, decompressor);
-        return new BufferedInputStream(cis, DATA_IBUF_SIZE);
+        return createDecompressionStream(downStream, decompressor, downStreamBufferSize,
+            DEFAULT_BUFFER_SIZE, SNAPPY, codec);
       }
 
       @Override
       public boolean isSupported() {
-
-        return snappyCodec != null;
+        return codec != null;
       }
     },
 
     ZSTANDARD(COMPRESSION_ZSTD) {
-      // Use base type to avoid compile-time dependencies.
-      private transient CompressionCodec zstdCodec = null;
+
       /**
-       * determines if we've checked the codec status. ensures we don't recreate the default codec
+       * The default codec class.
        */
-      private final AtomicBoolean checked = new AtomicBoolean(false);
-      private static final String defaultClazz = "org.apache.hadoop.io.compress.ZStandardCodec";
+      private static final String DEFAULT_CLAZZ = "org.apache.hadoop.io.compress.ZStandardCodec";
 
       /**
-       * Buffer size option
+       * Configuration option for LZO buffer size.
        */
       private static final String BUFFER_SIZE_OPT = "io.compression.codec.zstd.buffersize";
 
       /**
-       * Default buffer size value
+       * Default buffer size.
        */
       private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
 
+      /**
+       * Whether or not the codec status has been checked. Ensures the default codec is not
+       * recreated.
+       */
+      private final AtomicBoolean checked = new AtomicBoolean(false);
+
+      private transient CompressionCodec codec = null;
+
       @Override
       public CompressionCodec getCodec() {
-        return zstdCodec;
+        return codec;
       }
 
       @Override
       public void initializeDefaultCodec() {
-        if (!checked.get()) {
-          checked.set(true);
-          zstdCodec = createNewCodec(DEFAULT_BUFFER_SIZE);
-        }
+        codec = initCodec(checked, DEFAULT_BUFFER_SIZE, codec);
       }
 
       /**
        * Creates a new ZStandard codec.
-       *
-       * @param bufferSize
-       *          incoming buffer size
-       * @return new codec or null, depending on if installed
        */
       @Override
       protected CompressionCodec createNewCodec(final int bufferSize) {
-
-        String extClazz =
-            (conf.get(CONF_ZSTD_CLASS) == null ? System.getProperty(CONF_ZSTD_CLASS) : null);
-        String clazz = (extClazz != null) ? extClazz : defaultClazz;
-        try {
-          log.info("Trying to load ZStandard codec class: {}", clazz);
-
-          Configuration myConf = new Configuration(conf);
-          // only use the buffersize if > 0, otherwise we'll use
-          // the default defined within the codec
-          if (bufferSize > 0)
-            myConf.setInt(BUFFER_SIZE_OPT, bufferSize);
-
-          return (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
-
-        } catch (ClassNotFoundException e) {
-          // that is okay
-        }
-
-        return null;
+        return createNewCodec(CONF_ZSTD_CLASS, DEFAULT_CLAZZ, bufferSize, BUFFER_SIZE_OPT);
       }
 
       @Override
       public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
           int downStreamBufferSize) throws IOException {
-
         if (!isSupported()) {
           throw new IOException(
               "ZStandard codec class not specified. Did you forget to set property "
                   + CONF_ZSTD_CLASS + "?");
         }
-        OutputStream bos1;
-        if (downStreamBufferSize > 0) {
-          bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
-        } else {
-          bos1 = downStream;
-        }
-        // use the default codec
-        CompressionOutputStream cos = zstdCodec.createOutputStream(bos1, compressor);
-        BufferedOutputStream bos2 =
-            new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
-        return bos2;
+        return createFinishedOnFlushCompressionStream(downStream, compressor, downStreamBufferSize);
       }
 
       @Override
@@ -543,44 +432,48 @@ public final class Compression {
               "ZStandard codec class not specified. Did you forget to set property "
                   + CONF_ZSTD_CLASS + "?");
         }
-
-        CompressionCodec decomCodec = zstdCodec;
-        // if we're not using the same buffer size, we'll pull the codec from the loading cache
-        if (downStreamBufferSize != DEFAULT_BUFFER_SIZE) {
-          Entry<Algorithm,Integer> sizeOpt = Maps.immutableEntry(ZSTANDARD, downStreamBufferSize);
-          try {
-            decomCodec = codecCache.get(sizeOpt);
-          } catch (ExecutionException e) {
-            throw new IOException(e);
-          }
-        }
-
-        CompressionInputStream cis = decomCodec.createInputStream(downStream, decompressor);
-        BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
-        return bis2;
+        return createDecompressionStream(downStream, decompressor, downStreamBufferSize,
+            DEFAULT_BUFFER_SIZE, ZSTANDARD, codec);
       }
 
       @Override
       public boolean isSupported() {
-        return zstdCodec != null;
+        return codec != null;
       }
     };
 
     /**
-     * The model defined by the static block, below, creates a singleton for each defined codec in
-     * the Algorithm enumeration. By creating the codecs, each call to isSupported shall return
-     * true/false depending on if the codec singleton is defined. The static initializer, below,
-     * will ensure this occurs when the Enumeration is loaded. Furthermore, calls to getCodec will
-     * return the singleton, whether it is null or not.
-     *
-     * Calls to createCompressionStream and createDecompressionStream may return a different codec
-     * than getCodec, if the incoming downStreamBufferSize is different than the default. In such a
-     * case, we will place the resulting codec into the codecCache, defined below, to ensure we have
-     * cache codecs.
-     *
-     * Since codecs are immutable, there is no concern about concurrent access to the
-     * CompressionCodec objects within the guava cache.
+     * Guava cache to have a limited factory pattern defined in the Algorithm enum.
      */
+    private static LoadingCache<Entry<Algorithm,Integer>,CompressionCodec> codecCache =
+        CacheBuilder.newBuilder().maximumSize(25).build(new CacheLoader<>() {
+          @Override
+          public CompressionCodec load(Entry<Algorithm,Integer> key) {
+            return key.getKey().createNewCodec(key.getValue());
+          }
+        });
+
+    public static final String CONF_LZO_CLASS = "io.compression.codec.lzo.class";
+    public static final String CONF_SNAPPY_CLASS = "io.compression.codec.snappy.class";
+    public static final String CONF_ZSTD_CLASS = "io.compression.codec.zstd.class";
+
+    // All compression-related settings are required to be configured statically in the
+    // Configuration object.
+    protected static final Configuration conf;
+
+    // The model defined by the static block below creates a singleton for each defined codec in the
+    // Algorithm enumeration. By creating the codecs, each call to isSupported shall return
+    // true/false depending on if the codec singleton is defined. The static initializer, below,
+    // will ensure this occurs when the Enumeration is loaded. Furthermore, calls to getCodec will
+    // return the singleton, whether it is null or not.
+    //
+    // Calls to createCompressionStream and createDecompressionStream may return a different codec
+    // than getCodec, if the incoming downStreamBufferSize is different than the default. In such a
+    // case, we will place the resulting codec into the codecCache, defined below, to ensure we have
+    // cache codecs.
+    //
+    // Since codecs are immutable, there is no concern about concurrent access to the
+    // CompressionCodec objects within the guava cache.
     static {
       conf = new Configuration();
       for (final Algorithm al : Algorithm.values()) {
@@ -588,75 +481,54 @@ public final class Compression {
       }
     }
 
-    /**
-     * Guava cache to have a limited factory pattern defined in the Algorithm enum.
-     */
-    private static LoadingCache<Entry<Algorithm,Integer>,CompressionCodec> codecCache =
-        CacheBuilder.newBuilder().maximumSize(25)
-            .build(new CacheLoader<Entry<Algorithm,Integer>,CompressionCodec>() {
-              @Override
-              public CompressionCodec load(Entry<Algorithm,Integer> key) {
-                return key.getKey().createNewCodec(key.getValue());
-              }
-            });
-
-    // We require that all compression related settings are configured
-    // statically in the Configuration object.
-    protected static final Configuration conf;
-    private final String compressName;
-    // data input buffer size to absorb small reads from application.
-    private static final int DATA_IBUF_SIZE = 1 * 1024;
-    // data output buffer size to absorb small writes from application.
+    // Data input buffer size to absorb small reads from application.
+    private static final int DATA_IBUF_SIZE = 1024;
+
+    // Data output buffer size to absorb small writes from application.
     private static final int DATA_OBUF_SIZE = 4 * 1024;
-    public static final String CONF_LZO_CLASS = "io.compression.codec.lzo.class";
-    public static final String CONF_SNAPPY_CLASS = "io.compression.codec.snappy.class";
-    public static final String CONF_ZSTD_CLASS = "io.compression.codec.zstd.class";
+
+    // The name of the compression algorithm.
+    private final String name;
 
     Algorithm(String name) {
-      this.compressName = name;
+      this.name = name;
     }
 
+    public abstract InputStream createDecompressionStream(InputStream downStream,
+        Decompressor decompressor, int downStreamBufferSize) throws IOException;
+
+    public abstract OutputStream createCompressionStream(OutputStream downStream,
+        Compressor compressor, int downStreamBufferSize) throws IOException;
+
+    public abstract boolean isSupported();
+
     abstract CompressionCodec getCodec();
 
     /**
-     * function to create the default codec object.
+     * Create the default codec object.
      */
     abstract void initializeDefaultCodec();
 
     /**
      * Shared function to create new codec objects. It is expected that if buffersize is invalid, a
-     * codec will be created with the default buffer size
-     *
-     * @param bufferSize
-     *          configured buffer size.
-     * @return new codec
+     * codec will be created with the default buffer size.
      */
     abstract CompressionCodec createNewCodec(int bufferSize);
 
-    public abstract InputStream createDecompressionStream(InputStream downStream,
-        Decompressor decompressor, int downStreamBufferSize) throws IOException;
-
-    public abstract OutputStream createCompressionStream(OutputStream downStream,
-        Compressor compressor, int downStreamBufferSize) throws IOException;
-
-    public abstract boolean isSupported();
-
     public Compressor getCompressor() {
       CompressionCodec codec = getCodec();
       if (codec != null) {
         Compressor compressor = CodecPool.getCompressor(codec);
         if (compressor != null) {
           if (compressor.finished()) {
-            // Somebody returns the compressor to CodecPool but is still using
-            // it.
+            // Somebody returns the compressor to CodecPool but is still using it.
             log.warn("Compressor obtained from CodecPool already finished()");
           } else {
             log.debug("Got a compressor: {}", compressor.hashCode());
           }
-          /**
-           * Following statement is necessary to get around bugs in 0.18 where a compressor is
-           * referenced after returned back to the codec pool.
-           */
+          // The following statement is necessary to get around bugs in 0.18 where a compressor is
+          // referenced after it's
+          // returned back to the codec pool.
           compressor.reset();
         }
         return compressor;
@@ -664,7 +536,7 @@ public final class Compression {
       return null;
     }
 
-    public void returnCompressor(Compressor compressor) {
+    public void returnCompressor(final Compressor compressor) {
       if (compressor != null) {
         log.debug("Return a compressor: {}", compressor.hashCode());
         CodecPool.returnCompressor(compressor);
@@ -677,57 +549,157 @@ public final class Compression {
         Decompressor decompressor = CodecPool.getDecompressor(codec);
         if (decompressor != null) {
           if (decompressor.finished()) {
-            // Somebody returns the decompressor to CodecPool but is still using
-            // it.
+            // Somebody returns the decompressor to CodecPool but is still using it.
             log.warn("Decompressor obtained from CodecPool already finished()");
           } else {
             log.debug("Got a decompressor: {}", decompressor.hashCode());
           }
-          /**
-           * Following statement is necessary to get around bugs in 0.18 where a decompressor is
-           * referenced after returned back to the codec pool.
-           */
+          // The following statement is necessary to get around bugs in 0.18 where a decompressor is
+          // referenced after
+          // it's returned back to the codec pool.
           decompressor.reset();
         }
         return decompressor;
       }
-
       return null;
     }
 
-    public void returnDecompressor(Decompressor decompressor) {
+    /**
+     * Returns the specified {@link Decompressor} to the codec cache if it is not null.
+     */
+    public void returnDecompressor(final Decompressor decompressor) {
       if (decompressor != null) {
         log.debug("Returned a decompressor: {}", decompressor.hashCode());
         CodecPool.returnDecompressor(decompressor);
       }
     }
 
+    /**
+     * Returns the name of the compression algorithm.
+     *
+     * @return the name
+     */
     public String getName() {
-      return compressName;
+      return name;
+    }
+
+    /**
+     * Initializes and returns a new codec with the specified buffer size if and only if the
+     * specified {@link AtomicBoolean} has a value of false, or returns the specified original coded
+     * otherwise.
+     */
+    CompressionCodec initCodec(final AtomicBoolean checked, final int bufferSize,
+        final CompressionCodec originalCodec) {
+      if (!checked.get()) {
+        checked.set(true);
+        return createNewCodec(bufferSize);
+      }
+      return originalCodec;
     }
-  }
 
-  static Algorithm getCompressionAlgorithmByName(String compressName) {
-    Algorithm[] algos = Algorithm.class.getEnumConstants();
+    /**
+     * Returns a new {@link CompressionCodec} of the specified type, or the default type if no
+     * primary type is specified. If the specified buffer size is greater than 0, the specified
+     * buffer size configuration option will be updated in the codec's configuration with the buffer
+     * size. If the neither the specified codec type or the default codec type can be found, null
+     * will be returned.
+     */
+    CompressionCodec createNewCodec(final String codecClazzProp, final String defaultClazz,
+        final int bufferSize, final String bufferSizeConfigOpt) {
+      String extClazz = (conf.get(codecClazzProp) == null ? System.getProperty(codecClazzProp) : null);
+      String clazz = (extClazz != null) ? extClazz : defaultClazz;
+      try {
+        log.info("Trying to load codec class {} for {}", clazz, codecClazzProp);
+        Configuration config = new Configuration(conf);
+        updateBuffer(config, bufferSizeConfigOpt, bufferSize);
+        return (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), config);
+      } catch (ClassNotFoundException e) {
+        // This is okay.
+      }
+      return null;
+    }
 
-    for (Algorithm a : algos) {
-      if (a.getName().equals(compressName)) {
-        return a;
+    InputStream createDecompressionStream(final InputStream stream, final Decompressor decompressor,
+        final int bufferSize, final int defaultBufferSize, final Algorithm algorithm,
+        CompressionCodec codec) throws IOException {
+      // If the default buffer size is not being used, pull from the loading cache.
+      if (bufferSize != defaultBufferSize) {
+        Entry<Algorithm,Integer> sizeOpt = Maps.immutableEntry(algorithm, bufferSize);
+        try {
+          codec = codecCache.get(sizeOpt);
+        } catch (ExecutionException e) {
+          throw new IOException(e);
+        }
       }
+      CompressionInputStream cis = codec.createInputStream(stream, decompressor);
+      return new BufferedInputStream(cis, DATA_IBUF_SIZE);
     }
 
-    throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
+    /**
+     * Returns a new {@link FinishOnFlushCompressionStream} initialized for the specified output
+     * stream and compressor.
+     */
+    OutputStream createFinishedOnFlushCompressionStream(final OutputStream downStream,
+        final Compressor compressor, final int downStreamBufferSize) throws IOException {
+      OutputStream out = bufferStream(downStream, downStreamBufferSize);
+      CompressionOutputStream cos = getCodec().createOutputStream(out, compressor);
+      return new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
+    }
+
+    /**
+     * Return the given stream wrapped as a {@link BufferedOutputStream} with the given buffer size
+     * if the buffer size is greater than 0, or return the original stream otherwise.
+     */
+    OutputStream bufferStream(final OutputStream stream, final int bufferSize) {
+      if (bufferSize > 0) {
+        return new BufferedOutputStream(stream, bufferSize);
+      }
+      return stream;
+    }
+
+    /**
+     * Return the given stream wrapped as a {@link BufferedInputStream} with the given buffer size
+     * if the buffer size is greater than 0, or return the original stream otherwise.
+     */
+    InputStream bufferStream(final InputStream stream, final int bufferSize) {
+      if (bufferSize > 0) {
+        return new BufferedInputStream(stream, bufferSize);
+      }
+      return stream;
+    }
+
+    /**
+     * Updates the value of the specified buffer size opt in the given {@link Configuration} if the
+     * new buffer size is greater than 0.
+     */
+    void updateBuffer(final Configuration config, final String bufferSizeOpt,
+        final int bufferSize) {
+      // Use the buffersize only if it is greater than 0, otherwise use the default defined within
+      // the codec.
+      if (bufferSize > 0) {
+        config.setInt(bufferSizeOpt, bufferSize);
+      }
+    }
   }
 
   public static String[] getSupportedAlgorithms() {
-    Algorithm[] algos = Algorithm.class.getEnumConstants();
+    Algorithm[] algorithms = Algorithm.class.getEnumConstants();
+    ArrayList<String> supportedAlgorithms = new ArrayList<>();
+    for (Algorithm algorithm : algorithms) {
+      if (algorithm.isSupported()) {
+        supportedAlgorithms.add(algorithm.getName());
+      }
+    }
+    return supportedAlgorithms.toArray(new String[0]);
+  }
 
-    ArrayList<String> ret = new ArrayList<>();
-    for (Algorithm a : algos) {
-      if (a.isSupported()) {
-        ret.add(a.getName());
+  static Algorithm getCompressionAlgorithmByName(final String name) {
+    Algorithm[] algorithms = Algorithm.class.getEnumConstants();
+    for (Algorithm algorithm : algorithms) {
+      if (algorithm.getName().equals(name)) {
+        return algorithm;
       }
     }
-    return ret.toArray(new String[ret.size()]);
+    throw new IllegalArgumentException("Unsupported compression algorithm name: " + name);
   }
 }