You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/10/06 20:55:00 UTC

[hbase] branch branch-2 updated: HBASE-26259 Fallback support to pure Java compression (#3691)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 18b9fa8a HBASE-26259 Fallback support to pure Java compression (#3691)
18b9fa8a is described below

commit 18b9fa8a3caa7f2a0544c8b33e98e930ddcc0104
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Wed Oct 6 13:17:18 2021 -0700

    HBASE-26259 Fallback support to pure Java compression (#3691)
    
    This change introduces provided compression codecs to HBase as
    new Maven modules. Each module provides compression codec support
    that formerly required Hadoop native codecs, which in turn relies
    on native code integration, which may or may not be available on
    a given hardware platform or in an operational environment. We
    now provide codecs in the HBase distribution for users whom for
    whatever reason cannot or do not wish to deploy the Hadoop native
    codecs.
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Signed-off-by: Viraj Jasani <vj...@apache.org>
    
    Conflicts:
    	hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
    	hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
    	hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java
---
 hbase-assembly/pom.xml                             |  20 ++
 .../hadoop/hbase/io/compress/Compression.java      | 282 +++++++++++++++------
 .../hadoop/hbase/io/compress/CompressionUtil.java  |  36 +++
 .../org/apache/hadoop/hbase/util/UnsafeAccess.java |  31 +--
 .../hbase/io/compress/CompressionTestBase.java     | 141 +++++++++++
 .../hadoop/hbase/util}/RandomDistribution.java     |   2 +-
 .../hbase-compression-aircompressor/pom.xml        | 172 +++++++++++++
 .../compress/aircompressor/HadoopCompressor.java   | 214 ++++++++++++++++
 .../compress/aircompressor/HadoopDecompressor.java | 150 +++++++++++
 .../hbase/io/compress/aircompressor/Lz4Codec.java  | 158 ++++++++++++
 .../hbase/io/compress/aircompressor/LzoCodec.java  | 158 ++++++++++++
 .../io/compress/aircompressor/SnappyCodec.java     | 158 ++++++++++++
 .../hbase/io/compress/aircompressor/ZstdCodec.java | 166 ++++++++++++
 .../aircompressor/TestHFileCompressionLz4.java     |  51 ++++
 .../aircompressor/TestHFileCompressionLzo.java     |  51 ++++
 .../aircompressor/TestHFileCompressionSnappy.java  |  51 ++++
 .../aircompressor/TestHFileCompressionZstd.java    |  51 ++++
 .../io/compress/aircompressor/TestLz4Codec.java    |  49 ++++
 .../io/compress/aircompressor/TestLzoCodec.java    |  50 ++++
 .../io/compress/aircompressor/TestSnappyCodec.java |  50 ++++
 .../aircompressor/TestWALCompressionLz4.java       |  69 +++++
 .../aircompressor/TestWALCompressionLzo.java       |  69 +++++
 .../aircompressor/TestWALCompressionSnappy.java    |  69 +++++
 .../aircompressor/TestWALCompressionZstd.java      |  69 +++++
 .../io/compress/aircompressor/TestZstdCodec.java   |  50 ++++
 hbase-compression/hbase-compression-lz4/pom.xml    | 161 ++++++++++++
 .../hadoop/hbase/io/compress/lz4/Lz4Codec.java     | 119 +++++++++
 .../hbase/io/compress/lz4/Lz4Compressor.java       | 205 +++++++++++++++
 .../hbase/io/compress/lz4/Lz4Decompressor.java     | 144 +++++++++++
 .../io/compress/lz4/TestHFileCompressionLz4.java   |  51 ++++
 .../hadoop/hbase/io/compress/lz4/TestLz4Codec.java |  50 ++++
 .../io/compress/lz4/TestWALCompressionLz4.java     |  69 +++++
 hbase-compression/hbase-compression-snappy/pom.xml | 161 ++++++++++++
 .../hbase/io/compress/xerial/SnappyCodec.java      | 119 +++++++++
 .../hbase/io/compress/xerial/SnappyCompressor.java | 184 ++++++++++++++
 .../io/compress/xerial/SnappyDecompressor.java     | 137 ++++++++++
 .../xerial/TestHFileCompressionSnappy.java         |  51 ++++
 .../hbase/io/compress/xerial/TestSnappyCodec.java  |  50 ++++
 .../compress/xerial/TestWALCompressionSnappy.java  |  69 +++++
 hbase-compression/hbase-compression-xz/pom.xml     | 145 +++++++++++
 .../hadoop/hbase/io/compress/xz/LzmaCodec.java     | 120 +++++++++
 .../hbase/io/compress/xz/LzmaCompressor.java       | 242 ++++++++++++++++++
 .../hbase/io/compress/xz/LzmaDecompressor.java     | 167 ++++++++++++
 .../io/compress/xz/TestHFileCompressionLzma.java   |  51 ++++
 .../hadoop/hbase/io/compress/xz/TestLzmaCodec.java |  57 +++++
 .../io/compress/xz/TestWALCompressionLzma.java     |  69 +++++
 hbase-compression/hbase-compression-zstd/pom.xml   | 161 ++++++++++++
 .../hadoop/hbase/io/compress/zstd/ZstdCodec.java   | 127 ++++++++++
 .../hbase/io/compress/zstd/ZstdCompressor.java     | 188 ++++++++++++++
 .../hbase/io/compress/zstd/ZstdDecompressor.java   | 138 ++++++++++
 .../io/compress/zstd/TestHFileCompressionZstd.java |  51 ++++
 .../io/compress/zstd/TestWALCompressionZstd.java   |  69 +++++
 .../hbase/io/compress/zstd/TestZstdCodec.java      |  58 +++++
 hbase-compression/pom.xml                          | 110 ++++++++
 .../apache/hadoop/hbase/PerformanceEvaluation.java |   2 +-
 .../src/main/resources/supplemental-models.xml     |  15 ++
 .../hadoop/hbase/io/compress/HFileTestBase.java    | 131 ++++++++++
 .../apache/hadoop/hbase/io/hfile/KVGenerator.java  |   1 +
 .../apache/hadoop/hbase/io/hfile/KeySampler.java   |   2 +-
 .../hadoop/hbase/io/hfile/TestHFileEncryption.java |   9 +-
 .../hadoop/hbase/io/hfile/TestHFileSeek.java       |   1 +
 .../wal/TestWALCellCodecWithCompression.java       |  19 +-
 ...mpressedWAL.java => CompressedWALTestBase.java} | 109 +++-----
 .../apache/hadoop/hbase/wal/TestCompressedWAL.java | 104 +-------
 .../wal/TestCompressedWALValueCompression.java     |  85 +++++++
 pom.xml                                            |  59 +++++
 66 files changed, 5953 insertions(+), 274 deletions(-)

diff --git a/hbase-assembly/pom.xml b/hbase-assembly/pom.xml
index 4ea650d..7e9389a 100644
--- a/hbase-assembly/pom.xml
+++ b/hbase-assembly/pom.xml
@@ -310,6 +310,26 @@
       <artifactId>hbase-hbtop</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-compression-aircompressor</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-compression-lz4</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-compression-snappy</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-compression-xz</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-compression-zstd</artifactId>
+    </dependency>
+    <dependency>
       <groupId>jline</groupId>
       <artifactId>jline</artifactId>
     </dependency>
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
index fb7a893..d411fd7 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with this
  * work for additional information regarding copyright ownership. The ASF
@@ -25,6 +25,7 @@ import java.io.OutputStream;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.io.util.BlockIOUtils;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.io.compress.CodecPool;
@@ -33,9 +34,7 @@ import org.apache.hadoop.io.compress.CompressionInputStream;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.DoNotPool;
-import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -49,6 +48,58 @@ import org.slf4j.LoggerFactory;
 public final class Compression {
   private static final Logger LOG = LoggerFactory.getLogger(Compression.class);
 
+
+  // LZO
+
+  public static final String LZO_CODEC_CLASS_KEY =
+      "hbase.io.compress.lzo.codec";
+  public static final String LZO_CODEC_CLASS_DEFAULT =
+      "com.hadoop.compression.lzo.LzoCodec";
+
+  // GZ
+
+  public static final String GZ_CODEC_CLASS_KEY =
+      "hbase.io.compress.gz.codec";
+  // Our ReusableStreamGzipCodec fixes an inefficiency in Hadoop's Gzip codec, allowing us to
+  // reuse compression streams, but still requires the Hadoop native codec.
+  public static final String GZ_CODEC_CLASS_DEFAULT =
+      "org.apache.hadoop.hbase.io.compress.ReusableStreamGzipCodec";
+
+  // SNAPPY
+
+  public static final String SNAPPY_CODEC_CLASS_KEY =
+      "hbase.io.compress.snappy.codec";
+  public static final String SNAPPY_CODEC_CLASS_DEFAULT =
+      "org.apache.hadoop.io.compress.SnappyCodec";
+
+  // LZ4
+
+  public static final String LZ4_CODEC_CLASS_KEY =
+      "hbase.io.compress.lz4.codec";
+  public static final String LZ4_CODEC_CLASS_DEFAULT =
+      "org.apache.hadoop.io.compress.Lz4Codec";
+
+  // ZSTD
+
+  public static final String ZSTD_CODEC_CLASS_KEY =
+      "hbase.io.compress.zstd.codec";
+  public static final String ZSTD_CODEC_CLASS_DEFAULT =
+      "org.apache.hadoop.io.compress.ZStandardCodec";
+
+  // BZIP2
+
+  public static final String BZIP2_CODEC_CLASS_KEY =
+      "hbase.io.compress.bzip2.codec";
+  public static final String BZIP2_CODEC_CLASS_DEFAULT =
+      "org.apache.hadoop.io.compress.BZip2Codec";
+
+  // LZMA
+
+  public static final String LZMA_CODEC_CLASS_KEY =
+      "hbase.io.compress.lzma.codec";
+  public static final String LZMA_CODEC_CLASS_DEFAULT =
+      "org.apache.hadoop.hbase.io.compress.xz.LzmaCodec";
+
   /**
    * Prevent the instantiation of class.
    */
@@ -104,61 +155,63 @@ public final class Compression {
   public static enum Algorithm {
     // LZO is GPL and requires extra install to setup. See
     // https://stackoverflow.com/questions/23441142/class-com-hadoop-compression-lzo-lzocodec-not-found-for-spark-on-cdh-5
-    LZO("lzo") {
+    LZO("lzo", LZO_CODEC_CLASS_KEY, LZO_CODEC_CLASS_DEFAULT) {
       // Use base type to avoid compile-time dependencies.
       private volatile transient CompressionCodec lzoCodec;
       private final transient Object lock = new Object();
-
       @Override
       CompressionCodec getCodec(Configuration conf) {
         if (lzoCodec == null) {
           synchronized (lock) {
             if (lzoCodec == null) {
-              lzoCodec = buildCodec(conf);
+              lzoCodec = buildCodec(conf, this);
             }
           }
         }
         return lzoCodec;
       }
-
-      private CompressionCodec buildCodec(Configuration conf) {
-        try {
-          Class<?> externalCodec =
-              getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec");
-          return (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
-              new Configuration(conf));
-        } catch (ClassNotFoundException e) {
-          throw new RuntimeException(e);
+      @Override
+      public CompressionCodec reload(Configuration conf) {
+        synchronized (lock) {
+          lzoCodec = buildCodec(conf, this);
+          LOG.warn("Reloaded configuration for {}", name());
+          return lzoCodec;
         }
       }
     },
-    GZ("gz") {
-      private volatile transient GzipCodec codec;
-      private final transient Object lock = new Object();
 
+    GZ("gz", GZ_CODEC_CLASS_KEY, GZ_CODEC_CLASS_DEFAULT) {
+      private volatile transient CompressionCodec gzCodec;
+      private final transient Object lock = new Object();
       @Override
-      DefaultCodec getCodec(Configuration conf) {
-        if (codec == null) {
+      CompressionCodec getCodec(Configuration conf) {
+        if (gzCodec == null) {
           synchronized (lock) {
-            if (codec == null) {
-              codec = buildCodec(conf);
+            if (gzCodec == null) {
+              gzCodec = buildCodec(conf, this);
             }
           }
         }
-
-        return codec;
+        return gzCodec;
       }
-
-      private GzipCodec buildCodec(Configuration conf) {
-        GzipCodec gzcodec = new ReusableStreamGzipCodec();
-        gzcodec.setConf(new Configuration(conf));
-        return gzcodec;
+      @Override
+      public CompressionCodec reload(Configuration conf) {
+        synchronized (lock) {
+          gzCodec = buildCodec(conf, this);
+          LOG.warn("Reloaded configuration for {}", name());
+          return gzCodec;
+        }
       }
     },
 
-    NONE("none") {
+    NONE("none", "", "") {
+      @Override
+      CompressionCodec getCodec(Configuration conf) {
+        return null;
+      }
+
       @Override
-      DefaultCodec getCodec(Configuration conf) {
+      public CompressionCodec reload(Configuration conf) {
         return null;
       }
 
@@ -183,130 +236,158 @@ public final class Compression {
         return downStream;
       }
     },
-    SNAPPY("snappy") {
+    SNAPPY("snappy", SNAPPY_CODEC_CLASS_KEY, SNAPPY_CODEC_CLASS_DEFAULT) {
       // Use base type to avoid compile-time dependencies.
       private volatile transient CompressionCodec snappyCodec;
       private final transient Object lock = new Object();
-
       @Override
       CompressionCodec getCodec(Configuration conf) {
         if (snappyCodec == null) {
           synchronized (lock) {
             if (snappyCodec == null) {
-              snappyCodec = buildCodec(conf);
+              snappyCodec = buildCodec(conf, this);
             }
           }
         }
         return snappyCodec;
       }
-
-      private CompressionCodec buildCodec(Configuration conf) {
-        try {
-          Class<?> externalCodec =
-              getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.SnappyCodec");
-          return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
-        } catch (ClassNotFoundException e) {
-          throw new RuntimeException(e);
+      @Override
+      public CompressionCodec reload(Configuration conf) {
+        synchronized (lock) {
+          snappyCodec = buildCodec(conf, this);
+          LOG.warn("Reloaded configuration for {}", name());
+          return snappyCodec;
         }
       }
     },
-    LZ4("lz4") {
+    LZ4("lz4", LZ4_CODEC_CLASS_KEY, LZ4_CODEC_CLASS_DEFAULT) {
       // Use base type to avoid compile-time dependencies.
       private volatile transient CompressionCodec lz4Codec;
       private final transient Object lock = new Object();
-
       @Override
       CompressionCodec getCodec(Configuration conf) {
         if (lz4Codec == null) {
           synchronized (lock) {
             if (lz4Codec == null) {
-              lz4Codec = buildCodec(conf);
+              lz4Codec = buildCodec(conf, this);
             }
           }
         }
         return lz4Codec;
       }
-
-      private CompressionCodec buildCodec(Configuration conf) {
-        try {
-          Class<?> externalCodec =
-              getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.Lz4Codec");
-          return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
-        } catch (ClassNotFoundException e) {
-          throw new RuntimeException(e);
+      @Override
+      public CompressionCodec reload(Configuration conf) {
+        synchronized (lock) {
+          lz4Codec = buildCodec(conf, this);
+          LOG.warn("Reloaded configuration for {}", name());
+          return lz4Codec;
         }
       }
     },
-    BZIP2("bzip2") {
+    BZIP2("bzip2", BZIP2_CODEC_CLASS_KEY, BZIP2_CODEC_CLASS_DEFAULT) {
       // Use base type to avoid compile-time dependencies.
       private volatile transient CompressionCodec bzipCodec;
       private final transient Object lock = new Object();
-
       @Override
       CompressionCodec getCodec(Configuration conf) {
         if (bzipCodec == null) {
           synchronized (lock) {
             if (bzipCodec == null) {
-              bzipCodec = buildCodec(conf);
+              bzipCodec = buildCodec(conf, this);
             }
           }
         }
         return bzipCodec;
       }
-
-      private CompressionCodec buildCodec(Configuration conf) {
-        try {
-          Class<?> externalCodec =
-              getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.BZip2Codec");
-          return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
-        } catch (ClassNotFoundException e) {
-          throw new RuntimeException(e);
+      @Override
+      public CompressionCodec reload(Configuration conf) {
+        synchronized (lock) {
+          bzipCodec = buildCodec(conf, this);
+          LOG.warn("Reloaded configuration for {}", name());
+          return bzipCodec;
         }
       }
     },
-    ZSTD("zstd") {
+    ZSTD("zstd", ZSTD_CODEC_CLASS_KEY, ZSTD_CODEC_CLASS_DEFAULT) {
       // Use base type to avoid compile-time dependencies.
       private volatile transient CompressionCodec zStandardCodec;
       private final transient Object lock = new Object();
-
       @Override
       CompressionCodec getCodec(Configuration conf) {
         if (zStandardCodec == null) {
           synchronized (lock) {
             if (zStandardCodec == null) {
-              zStandardCodec = buildCodec(conf);
+              zStandardCodec = buildCodec(conf, this);
             }
           }
         }
         return zStandardCodec;
       }
-
-      private CompressionCodec buildCodec(Configuration conf) {
-        try {
-          Class<?> externalCodec =
-              getClassLoaderForCodec().loadClass("org.apache.hadoop.io.compress.ZStandardCodec");
-          return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, conf);
-        } catch (ClassNotFoundException e) {
-          throw new RuntimeException(e);
+      @Override
+      public CompressionCodec reload(Configuration conf) {
+        synchronized (lock) {
+          zStandardCodec = buildCodec(conf, this);
+          LOG.warn("Reloaded configuration for {}", name());
+          return zStandardCodec;
+        }
+      }
+    },
+    LZMA("lzma", LZMA_CODEC_CLASS_KEY, LZMA_CODEC_CLASS_DEFAULT) {
+      // Use base type to avoid compile-time dependencies.
+      private volatile transient CompressionCodec lzmaCodec;
+      private final transient Object lock = new Object();
+      @Override
+      CompressionCodec getCodec(Configuration conf) {
+        if (lzmaCodec == null) {
+          synchronized (lock) {
+            if (lzmaCodec == null) {
+              lzmaCodec = buildCodec(conf, this);
+            }
+          }
+        }
+        return lzmaCodec;
+      }
+      @Override
+      public CompressionCodec reload(Configuration conf) {
+        synchronized (lock) {
+          lzmaCodec = buildCodec(conf, this);
+          LOG.warn("Reloaded configuration for {}", name());
+          return lzmaCodec;
         }
       }
     };
 
     private final Configuration conf;
     private final String compressName;
+    private final String confKey;
+    private final String confDefault;
     /** data input buffer size to absorb small reads from application. */
     private static final int DATA_IBUF_SIZE = 1 * 1024;
     /** data output buffer size to absorb small writes from application. */
     private static final int DATA_OBUF_SIZE = 4 * 1024;
 
-    Algorithm(String name) {
-      this.conf = new Configuration();
+    Algorithm(String name, String confKey, String confDefault) {
+      this.conf = HBaseConfiguration.create();
       this.conf.setBoolean("io.native.lib.available", true);
       this.compressName = name;
+      this.confKey = confKey;
+      this.confDefault = confDefault;
     }
 
     abstract CompressionCodec getCodec(Configuration conf);
 
+    /**
+     * Reload configuration for the given algorithm.
+     * <p>
+     * NOTE: Experts only. This can only be done safely during process startup, before
+     * the algorithm's codecs are in use. If the codec implementation is changed, the
+     * new implementation may not be fully compatible with what was loaded at static
+     * initialization time, leading to potential data corruption.
+     * Mostly used by unit tests.
+     * @param conf configuration
+     */
+    public abstract CompressionCodec reload(Configuration conf);
+
     public InputStream createDecompressionStream(
         InputStream downStream, Decompressor decompressor,
         int downStreamBufferSize) throws IOException {
@@ -384,7 +465,7 @@ public final class Compression {
         if (decompressor != null) {
           if (decompressor.finished()) {
             // Somebody returns the decompressor to CodecPool but is still using it.
-            LOG.warn("Deompressor obtained from CodecPool is already finished()");
+            LOG.warn("Decompressor {} obtained from CodecPool is already finished", decompressor);
           }
           decompressor.reset();
         }
@@ -470,4 +551,49 @@ public final class Compression {
       }
     }
   }
+
+  /**
+   * Load a codec implementation for an algorithm using the supplied configuration.
+   * @param conf the configuration to use
+   * @param algo the algorithm to implement
+   */
+  private static CompressionCodec buildCodec(final Configuration conf, final Algorithm algo) {
+    try {
+      String codecClassName = conf.get(algo.confKey, algo.confDefault);
+      if (codecClassName == null) {
+        throw new RuntimeException("No codec configured for " + algo.confKey);
+      }
+      Class<?> codecClass = getClassLoaderForCodec().loadClass(codecClassName);
+      CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
+          new Configuration(conf));
+      LOG.info("Loaded codec {} for compression algorithm {}",
+        codec.getClass().getCanonicalName(), algo.name());
+      return codec;
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    java.util.Map<String, CompressionCodec> implMap = new java.util.HashMap<>();
+    for (Algorithm algo: Algorithm.class.getEnumConstants()) {
+      try {
+        implMap.put(algo.name(), algo.getCodec(conf));
+      } catch (Exception e) {
+        // Ignore failures to load codec native implementations while building the report.
+        // We are to report what is configured.
+      }
+    }
+    for (Algorithm algo: Algorithm.class.getEnumConstants()) {
+      System.out.println(algo.name() + ":");
+      System.out.println("    name: " + algo.getName());
+      System.out.println("    confKey: " + algo.confKey);
+      System.out.println("    confDefault: " + algo.confDefault);
+      CompressionCodec codec = implMap.get(algo.name());
+      System.out.println("    implClass: " +
+        (codec != null ? codec.getClass().getCanonicalName() : "<none>"));
+    }
+  }
+
 }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java
new file mode 100644
index 0000000..becff76
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.io.compress;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class CompressionUtil {
+
+  /**
+   * Round up to the next power of two, unless the value would become negative (ints
+   * are signed), in which case just return Integer.MAX_VALUE.
+   */
+  public static int roundInt2(int v) {
+    v = Integer.highestOneBit(v) << 1;
+    if (v < 0) {
+      return Integer.MAX_VALUE;
+    }
+    return v;
+  }
+
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
index 15d3cd5..9c4b1ec 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
@@ -311,11 +311,11 @@ public final class UnsafeAccess {
   // APIs to copy data. This will be direct memory location copy and will be much faster
   /**
    * Copies the bytes from given array's offset to length part into the given buffer.
-   * @param src
-   * @param srcOffset
-   * @param dest
-   * @param destOffset
-   * @param length
+   * @param src source array
+   * @param srcOffset offset into source buffer
+   * @param dest destination buffer
+   * @param destOffset offset into destination buffer
+   * @param length length of data to copy
    */
   public static void copy(byte[] src, int srcOffset, ByteBuffer dest, int destOffset, int length) {
     long destAddress = destOffset;
@@ -344,11 +344,11 @@ public final class UnsafeAccess {
    * Copies specified number of bytes from given offset of {@code src} ByteBuffer to the
    * {@code dest} array.
    *
-   * @param src
-   * @param srcOffset
-   * @param dest
-   * @param destOffset
-   * @param length
+   * @param src source buffer
+   * @param srcOffset offset into source buffer
+   * @param dest destination array
+   * @param destOffset offset into destination buffer
+   * @param length length of data to copy
    */
   public static void copy(ByteBuffer src, int srcOffset, byte[] dest, int destOffset,
       int length) {
@@ -368,11 +368,11 @@ public final class UnsafeAccess {
    * Copies specified number of bytes from given offset of {@code src} buffer into the {@code dest}
    * buffer.
    *
-   * @param src
-   * @param srcOffset
-   * @param dest
-   * @param destOffset
-   * @param length
+   * @param src source buffer
+   * @param srcOffset offset into source buffer
+   * @param dest destination buffer
+   * @param destOffset offset into destination buffer
+   * @param length length of data to copy
    */
   public static void copy(ByteBuffer src, int srcOffset, ByteBuffer dest, int destOffset,
       int length) {
@@ -471,4 +471,5 @@ public final class UnsafeAccess {
   public static byte toByte(Object ref, long offset) {
     return theUnsafe.getByte(ref, offset);
   }
+
 }
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/compress/CompressionTestBase.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/compress/CompressionTestBase.java
new file mode 100644
index 0000000..616bf0b
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/compress/CompressionTestBase.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.RandomDistribution;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("checkstyle:innerassignment")
+public class CompressionTestBase {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(CompressionTestBase.class);
+
+  static final int LARGE_SIZE = 10 * 1024 * 1024;
+  static final int VERY_LARGE_SIZE = 100 * 1024 * 1024;
+  static final int BLOCK_SIZE = 4096;
+
+  static final byte[] SMALL_INPUT;
+  static {
+    // 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597
+    SMALL_INPUT = new byte[1+1+2+3+5+8+13+21+34+55+89+144+233+377+610+987+1597];
+    int off = 0;
+    Arrays.fill(SMALL_INPUT, off, (off+=1), (byte)'A');
+    Arrays.fill(SMALL_INPUT, off, (off+=1), (byte)'B');
+    Arrays.fill(SMALL_INPUT, off, (off+=2), (byte)'C');
+    Arrays.fill(SMALL_INPUT, off, (off+=3), (byte)'D');
+    Arrays.fill(SMALL_INPUT, off, (off+=5), (byte)'E');
+    Arrays.fill(SMALL_INPUT, off, (off+=8), (byte)'F');
+    Arrays.fill(SMALL_INPUT, off, (off+=13), (byte)'G');
+    Arrays.fill(SMALL_INPUT, off, (off+=21), (byte)'H');
+    Arrays.fill(SMALL_INPUT, off, (off+=34), (byte)'I');
+    Arrays.fill(SMALL_INPUT, off, (off+=55), (byte)'J');
+    Arrays.fill(SMALL_INPUT, off, (off+=89), (byte)'K');
+    Arrays.fill(SMALL_INPUT, off, (off+=144), (byte)'L');
+    Arrays.fill(SMALL_INPUT, off, (off+=233), (byte)'M');
+    Arrays.fill(SMALL_INPUT, off, (off+=377), (byte)'N');
+    Arrays.fill(SMALL_INPUT, off, (off+=610), (byte)'O');
+    Arrays.fill(SMALL_INPUT, off, (off+=987), (byte)'P');
+    Arrays.fill(SMALL_INPUT, off, (off+=1597), (byte)'Q');
+  }
+
+  protected void codecTest(final CompressionCodec codec, final byte[][] input)
+      throws Exception {
+    // We do this in Compression.java
+    ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
+    // Compress
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    CompressionOutputStream out = codec.createOutputStream(baos);
+    int inLen = 0;
+    long start = EnvironmentEdgeManager.currentTime();
+    for (int i = 0; i < input.length; i++) {
+      out.write(input[i]);
+      inLen += input[i].length;
+    }
+    out.close();
+    long end = EnvironmentEdgeManager.currentTime();
+    final byte[] compressed = baos.toByteArray();
+    LOG.info("{} compressed {} bytes to {} bytes in {} ms", codec.getClass().getSimpleName(),
+      inLen, compressed.length, end - start);
+    // Decompress
+    final byte[] plain = new byte[inLen];
+    CompressionInputStream in = codec.createInputStream(new ByteArrayInputStream(compressed));
+    start = EnvironmentEdgeManager.currentTime();
+    IOUtils.readFully(in, plain, 0, plain.length);
+    in.close();
+    end = EnvironmentEdgeManager.currentTime();
+    LOG.info("{} decompressed {} bytes to {} bytes in {} ms", codec.getClass().getSimpleName(),
+      compressed.length, plain.length, end - start);
+    // Decompressed bytes should equal the original
+    int offset = 0;
+    for (int i = 0; i < input.length; i++) {
+      assertTrue("Comparison failed at offset " + offset,
+        Bytes.compareTo(plain, offset, input[i].length, input[i], 0, input[i].length) == 0);
+      offset += input[i].length;
+    }
+  }
+
+  /**
+   * Test with one smallish input buffer
+   */
+  protected void codecSmallTest(final CompressionCodec codec) throws Exception {
+    codecTest(codec, new byte[][] { SMALL_INPUT });
+  }
+
+  /**
+   * Test with a large input (1MB) divided into blocks of 4KB.
+   */
+  protected void codecLargeTest(final CompressionCodec codec, final double sigma) throws Exception {
+    RandomDistribution.DiscreteRNG zipf =
+      new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
+    final byte[][] input = new byte[LARGE_SIZE/BLOCK_SIZE][BLOCK_SIZE];
+    for (int i = 0; i < input.length; i++) {
+      for (int j = 0; j < input[i].length; j++) {
+        input[i][j] = (byte)zipf.nextInt();
+      }
+    }
+    codecTest(codec, input);
+  }
+
+  /**
+   * Test with a very large input (100MB) as a single input buffer.
+   */
+  protected void codecVeryLargeTest(final CompressionCodec codec, final double sigma) throws Exception {
+    RandomDistribution.DiscreteRNG zipf =
+        new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
+    final byte[][] input = new byte[1][VERY_LARGE_SIZE];
+    for (int i = 0; i < VERY_LARGE_SIZE; i++) {
+      input[0][i] = (byte)zipf.nextInt();
+    }
+    codecTest(codec, input);
+  }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomDistribution.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/RandomDistribution.java
similarity index 99%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomDistribution.java
rename to hbase-common/src/test/java/org/apache/hadoop/hbase/util/RandomDistribution.java
index bbc612f..d02268c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/RandomDistribution.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/RandomDistribution.java
@@ -14,7 +14,7 @@
  * License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.hadoop.hbase.io.hfile;
+package org.apache.hadoop.hbase.util;
 
 import java.util.ArrayList;
 import java.util.Arrays;
diff --git a/hbase-compression/hbase-compression-aircompressor/pom.xml b/hbase-compression/hbase-compression-aircompressor/pom.xml
new file mode 100644
index 0000000..bb93d78
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/pom.xml
@@ -0,0 +1,172 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>hbase-compression</artifactId>
+    <groupId>org.apache.hbase</groupId>
+    <version>2.5.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <artifactId>hbase-compression-aircompressor</artifactId>
+  <name>Apache HBase - Compression - Aircompressor</name>
+  <description>Pure Java compression support using Aircompressor codecs</description>
+  <build>
+    <plugins>
+      <!-- Testing plugins -->
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>net.revelc.code</groupId>
+        <artifactId>warbucks-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <!--Make it so assembly:single does nothing in here-->
+          <artifactId>maven-assembly-plugin</artifactId>
+          <configuration>
+            <skipAssembly>true</skipAssembly>
+          </configuration>
+        </plugin>
+	<plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-checkstyle-plugin</artifactId>
+          <configuration>
+            <failOnViolation>true</failOnViolation>
+          </configuration>
+	</plugin>
+	<plugin>
+          <groupId>net.revelc.code</groupId>
+          <artifactId>warbucks-maven-plugin</artifactId>
+	</plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+  <dependencies>
+    <!-- Intra-project dependencies -->
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-logging</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-asyncfs</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-annotations</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.github.stephenc.findbugs</groupId>
+      <artifactId>findbugs-annotations</artifactId>
+      <scope>compile</scope>
+      <optional>true</optional>
+    </dependency>
+    <!-- native Java compression codecs -->
+    <dependency>
+      <groupId>io.airlift</groupId>
+      <artifactId>aircompressor</artifactId>
+    </dependency>
+    <!--Test-->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jcl-over-slf4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jul-to-slf4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-library</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <profiles>
+    <profile>
+      <id>build-with-jdk11</id>
+      <activation>
+        <jdk>[1.11,)</jdk>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>javax.annotation</groupId>
+          <artifactId>javax.annotation-api</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+</project>
diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopCompressor.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopCompressor.java
new file mode 100644
index 0000000..f5040d1
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopCompressor.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.airlift.compress.Compressor;
+
+/**
+ * Hadoop compressor glue for aircompressor compressors.
+ */
+@InterfaceAudience.Private
+public abstract class HadoopCompressor<T extends Compressor>
+    implements org.apache.hadoop.io.compress.Compressor {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(HadoopCompressor.class);
+  protected T compressor;
+  protected ByteBuffer inBuf, outBuf;
+  protected int bufferSize;
+  protected boolean finish, finished;
+  protected long bytesRead, bytesWritten;
+
+  HadoopCompressor(T compressor, int bufferSize) {
+    this.compressor = compressor;
+    this.bufferSize = bufferSize;
+    this.inBuf = ByteBuffer.allocate(bufferSize);
+    this.outBuf = ByteBuffer.allocate(bufferSize);
+    this.outBuf.position(bufferSize);
+  }
+
+  @Override
+  public int compress(byte[] b, int off, int len) throws IOException {
+    // If we have previously compressed our input and still have some buffered bytes
+    // remaining, provide them to the caller.
+    if (outBuf.hasRemaining()) {
+      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
+      outBuf.get(b, off, n);
+      LOG.trace("compress: {} bytes from outBuf", n);
+      return n;
+    }
+    // We don't actually begin compression until our caller calls finish().
+    // The aircompressor compressors operate over a range of input in one shot.
+    if (finish) {
+      if (inBuf.position() > 0) {
+        inBuf.flip();
+        int uncompressed = inBuf.remaining();
+        // If we don't have enough capacity in our currently allocated output buffer,
+        // allocate a new one which does.
+        int needed = maxCompressedLength(uncompressed);
+        // Can we decompress directly into the provided array?
+        ByteBuffer writeBuffer;
+        boolean direct = false;
+        if (len <= needed) {
+          writeBuffer = ByteBuffer.wrap(b, off, len);
+          direct = true;
+        } else {
+          if (outBuf.capacity() < needed) {
+            needed = CompressionUtil.roundInt2(needed);
+            LOG.trace("compress: resize outBuf {}", needed);
+            outBuf = ByteBuffer.allocate(needed);
+          } else {
+            outBuf.clear();
+          }
+          writeBuffer = outBuf;
+        }
+        final int oldPos = writeBuffer.position();
+        compressor.compress(inBuf, writeBuffer);
+        final int written = writeBuffer.position() - oldPos;
+        bytesWritten += written;
+        inBuf.clear();
+        LOG.trace("compress: compressed {} -> {}", uncompressed, written);
+        finished = true;
+        if (!direct) {
+          outBuf.flip();
+          int n = Math.min(written, len);
+          outBuf.get(b, off, n);
+          LOG.trace("compress: {} bytes", n);
+          return n;
+        } else {
+          LOG.trace("compress: {} bytes direct", written);
+          return written;
+        }
+      } else {
+        finished = true;
+      }
+    }
+    LOG.trace("No output");
+    return 0;
+  }
+
+  @Override
+  public void end() {
+    LOG.trace("end");
+  }
+
+  @Override
+  public void finish() {
+    LOG.trace("finish");
+    finish = true;
+  }
+
+  @Override
+  public boolean finished() {
+    boolean b = finished && !outBuf.hasRemaining();
+    LOG.trace("finished: {}", b);
+    return b;
+  }
+
+  @Override
+  public long getBytesRead() {
+    return bytesRead;
+  }
+
+  @Override
+  public long getBytesWritten() {
+    return bytesWritten;
+  }
+
+  @Override
+  public boolean needsInput() {
+    boolean b = !finished();
+    LOG.trace("needsInput: {}", b);
+    return b;
+  }
+
+  @Override
+  public void reinit(Configuration conf) {
+    LOG.trace("reinit");
+    if (conf != null) {
+      // Buffer size might have changed
+      int newBufferSize = getBufferSize(conf);
+      if (bufferSize != newBufferSize) {
+        bufferSize = newBufferSize;
+        this.inBuf = ByteBuffer.allocate(bufferSize);
+        this.outBuf = ByteBuffer.allocate(bufferSize);
+      }
+    }
+    reset();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void reset() {
+    LOG.trace("reset");
+    try {
+      compressor = (T)(compressor.getClass().getDeclaredConstructor().newInstance());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    inBuf.clear();
+    outBuf.clear();
+    outBuf.position(outBuf.capacity());
+    bytesRead = 0;
+    bytesWritten = 0;
+    finish = false;
+    finished = false;
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException("setDictionary is not supported");
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    LOG.trace("setInput: off={} len={}", off, len);
+    if (inBuf.remaining() < len) {
+      // Get a new buffer that can accomodate the accumulated input plus the additional
+      // input that would cause a buffer overflow without reallocation.
+      // This condition should be fortunately rare, because it is expensive.
+      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
+      LOG.trace("setInput: resize inBuf {}", needed);
+      ByteBuffer newBuf = ByteBuffer.allocate(needed);
+      inBuf.flip();
+      newBuf.put(inBuf);
+      inBuf = newBuf;
+    }
+    inBuf.put(b, off, len);
+    bytesRead += len;
+    finished = false;
+  }
+
+  // Package private
+
+  int maxCompressedLength(int len) {
+    return compressor.maxCompressedLength(len);
+  }
+
+  abstract int getLevel(Configuration conf);
+
+  abstract int getBufferSize(Configuration conf);
+
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopDecompressor.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopDecompressor.java
new file mode 100644
index 0000000..f5f5b83
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopDecompressor.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.airlift.compress.Decompressor;
+
+/**
+ * Hadoop decompressor glue for aircompressor decompressors.
+ */
+@InterfaceAudience.Private
+public class HadoopDecompressor<T extends Decompressor>
+    implements org.apache.hadoop.io.compress.Decompressor {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(HadoopDecompressor.class);
+  protected T decompressor;
+  protected ByteBuffer inBuf, outBuf;
+  protected int inLen;
+  protected boolean finished;
+
+  HadoopDecompressor(T decompressor, int bufferSize) {
+    this.decompressor = decompressor;
+    this.inBuf = ByteBuffer.allocate(bufferSize);
+    this.outBuf = ByteBuffer.allocate(bufferSize);
+    this.outBuf.position(bufferSize);
+  }
+
+  @Override
+  public int decompress(byte[] b, int off, int len) throws IOException {
+    if (outBuf.hasRemaining()) {
+      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
+      outBuf.get(b, off, n);
+      LOG.trace("decompress: {} bytes from outBuf", n);
+      return n;
+    }
+    if (inBuf.position() > 0) {
+      inBuf.flip();
+      int remaining = inBuf.remaining();
+      inLen -= remaining;
+      outBuf.rewind();
+      outBuf.limit(outBuf.capacity());
+      decompressor.decompress(inBuf, outBuf);
+      inBuf.rewind();
+      inBuf.limit(inBuf.capacity());
+      final int written = outBuf.position();
+      LOG.trace("decompress: decompressed {} -> {}", remaining, written);
+      outBuf.flip();
+      int n = Math.min(written, len);
+      outBuf.get(b, off, n);
+      LOG.trace("decompress: {} bytes", n);
+      return n;
+    }
+    LOG.trace("decompress: No output, finished");
+    finished = true;
+    return 0;
+  }
+
+  @Override
+  public void end() {
+    LOG.trace("end");
+  }
+
+  @Override
+  public boolean finished() {
+    LOG.trace("finished");
+    return finished;
+  }
+
+  @Override
+  public int getRemaining() {
+    LOG.trace("getRemaining: {}", inLen);
+    return inLen;
+  }
+
+  @Override
+  public boolean needsDictionary() {
+    LOG.trace("needsDictionary");
+    return false;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void reset() {
+    LOG.trace("reset");
+    try {
+      decompressor = (T)(decompressor.getClass().getDeclaredConstructor().newInstance());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    inBuf.rewind();
+    inBuf.limit(inBuf.capacity());
+    inLen = 0;
+    outBuf.rewind();
+    outBuf.limit(0);
+    finished = false;
+  }
+
+  @Override
+  public boolean needsInput() {
+    boolean b = (inBuf.position() == 0);
+    LOG.trace("needsInput: {}", b);
+    return b;
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException("setDictionary is not supported");
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    LOG.trace("setInput: off={} len={}", off, len);
+    if (inBuf.remaining() < len) {
+      // Get a new buffer that can accomodate the accumulated input plus the additional
+      // input that would cause a buffer overflow without reallocation.
+      // This condition should be fortunately rare, because it is expensive.
+      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
+      LOG.trace("setInput: resize inBuf {}", needed);
+      ByteBuffer newBuf = ByteBuffer.allocate(needed);
+      inBuf.flip();
+      newBuf.put(inBuf);
+      inBuf = newBuf;
+    }
+    inBuf.put(b, off, len);
+    inLen += len;
+    finished = false;
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/Lz4Codec.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/Lz4Codec.java
new file mode 100644
index 0000000..c1766dc
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/Lz4Codec.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.compress.BlockCompressorStream;
+import org.apache.hadoop.io.compress.BlockDecompressorStream;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import io.airlift.compress.lz4.Lz4Compressor;
+import io.airlift.compress.lz4.Lz4Decompressor;
+
+/**
+ * Hadoop Lz4 codec implemented with aircompressor.
+ * <p>
+ * This is data format compatible with Hadoop's native lz4 codec.
+ */
+@InterfaceAudience.Private
+public class Lz4Codec implements Configurable, CompressionCodec {
+
+  public static final String LZ4_BUFFER_SIZE_KEY = "hbase.io.compress.lz4.buffersize";
+
+  private Configuration conf;
+
+  public Lz4Codec() {
+    conf = new Configuration();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Compressor createCompressor() {
+    return new HadoopLz4Compressor();
+  }
+
+  @Override
+  public Decompressor createDecompressor() {
+    return new HadoopLz4Decompressor();
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in) throws IOException {
+    return createInputStream(in, createDecompressor());
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
+      throws IOException {
+    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
+    return createOutputStream(out, createCompressor());
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
+      throws IOException {
+    int bufferSize = getBufferSize(conf);
+    int compressionOverhead = (bufferSize / 6) + 32;
+    return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
+  }
+
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    return HadoopLz4Compressor.class;
+  }
+
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    return HadoopLz4Decompressor.class;
+  }
+
+  @Override
+  public String getDefaultExtension() {
+    return ".lz4";
+  }
+
+  @InterfaceAudience.Private
+  public class HadoopLz4Compressor extends HadoopCompressor<Lz4Compressor> {
+
+    HadoopLz4Compressor(Lz4Compressor compressor) {
+      super(compressor, Lz4Codec.getBufferSize(conf));
+    }
+
+    HadoopLz4Compressor() {
+      this(new Lz4Compressor());
+    }
+
+    @Override
+    int getLevel(Configuration conf) {
+      return 0;
+    }
+
+    @Override
+    int getBufferSize(Configuration conf) {
+      return Lz4Codec.getBufferSize(conf);
+    }
+
+  }
+
+  @InterfaceAudience.Private
+  public class HadoopLz4Decompressor extends HadoopDecompressor<Lz4Decompressor> {
+
+    HadoopLz4Decompressor(Lz4Decompressor decompressor) {
+      super(decompressor, getBufferSize(conf));
+    }
+
+    HadoopLz4Decompressor() {
+      this(new Lz4Decompressor());
+    }
+
+  }
+
+  // Package private
+
+  static int getBufferSize(Configuration conf) {
+    int size = conf.getInt(LZ4_BUFFER_SIZE_KEY,
+      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
+    return size > 0 ? size : 256 * 1024; // Don't change this default
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/LzoCodec.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/LzoCodec.java
new file mode 100644
index 0000000..3e5ab04
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/LzoCodec.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.compress.BlockCompressorStream;
+import org.apache.hadoop.io.compress.BlockDecompressorStream;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import io.airlift.compress.lzo.LzoCompressor;
+import io.airlift.compress.lzo.LzoDecompressor;
+
+/**
+ * Hadoop Lzo codec implemented with aircompressor.
+ * <p>
+ * This is data format compatible with Hadoop's native lzo codec.
+ */
+@InterfaceAudience.Private
+public class LzoCodec implements Configurable, CompressionCodec {
+
+  public static final String LZO_BUFFER_SIZE_KEY = "hbase.io.compress.lzo.buffersize";
+
+  private Configuration conf;
+
+  public LzoCodec() {
+    conf = new Configuration();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Compressor createCompressor() {
+    return new HadoopLzoCompressor();
+  }
+
+  @Override
+  public Decompressor createDecompressor() {
+    return new HadoopLzoDecompressor();
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in) throws IOException {
+    return createInputStream(in, createDecompressor());
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
+      throws IOException {
+    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
+    return createOutputStream(out, createCompressor());
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
+      throws IOException {
+    int bufferSize = getBufferSize(conf);
+    int compressionOverhead = (bufferSize / 6) + 32;
+    return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
+  }
+
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    return HadoopLzoCompressor.class;
+  }
+
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    return HadoopLzoDecompressor.class;
+  }
+
+  @Override
+  public String getDefaultExtension() {
+    return ".lzo";
+  }
+
+  @InterfaceAudience.Private
+  public class HadoopLzoCompressor extends HadoopCompressor<LzoCompressor> {
+
+    HadoopLzoCompressor(LzoCompressor compressor) {
+      super(compressor, LzoCodec.getBufferSize(conf));
+    }
+
+    HadoopLzoCompressor() {
+      this(new LzoCompressor());
+    }
+
+    @Override
+    int getLevel(Configuration conf) {
+      return 0;
+    }
+
+    @Override
+    int getBufferSize(Configuration conf) {
+      return LzoCodec.getBufferSize(conf);
+    }
+
+  }
+
+  @InterfaceAudience.Private
+  public class HadoopLzoDecompressor extends HadoopDecompressor<LzoDecompressor> {
+
+    HadoopLzoDecompressor(LzoDecompressor decompressor) {
+      super(decompressor, getBufferSize(conf));
+    }
+
+    HadoopLzoDecompressor() {
+      this(new LzoDecompressor());
+    }
+
+  }
+
+  // Package private
+
+  static int getBufferSize(Configuration conf) {
+    int size = conf.getInt(LZO_BUFFER_SIZE_KEY,
+      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT));
+    return size > 0 ? size : 256 * 1024; // Don't change this default
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/SnappyCodec.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/SnappyCodec.java
new file mode 100644
index 0000000..e325b8b
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/SnappyCodec.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.compress.BlockCompressorStream;
+import org.apache.hadoop.io.compress.BlockDecompressorStream;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import io.airlift.compress.snappy.SnappyCompressor;
+import io.airlift.compress.snappy.SnappyDecompressor;
+
+/**
+ * Hadoop snappy codec implemented with aircompressor.
+ * <p>
+ * This is data format compatible with Hadoop's native snappy codec.
+ */
+@InterfaceAudience.Private
+public class SnappyCodec implements Configurable, CompressionCodec {
+
+  public static final String SNAPPY_BUFFER_SIZE_KEY = "hbase.io.compress.snappy.buffersize";
+
+  private Configuration conf;
+
+  public SnappyCodec() {
+    conf = new Configuration();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Compressor createCompressor() {
+    return new HadoopSnappyCompressor();
+  }
+
+  @Override
+  public Decompressor createDecompressor() {
+    return new HadoopSnappyDecompressor();
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in) throws IOException {
+    return createInputStream(in, createDecompressor());
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
+      throws IOException {
+    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
+    return createOutputStream(out, createCompressor());
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
+      throws IOException {
+    int bufferSize = getBufferSize(conf);
+    int compressionOverhead = (bufferSize / 6) + 32;
+    return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
+  }
+
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    return HadoopSnappyCompressor.class;
+  }
+
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    return HadoopSnappyDecompressor.class;
+  }
+
+  @Override
+  public String getDefaultExtension() {
+    return ".snappy";
+  }
+
+  @InterfaceAudience.Private
+  public class HadoopSnappyCompressor extends HadoopCompressor<SnappyCompressor> {
+
+    HadoopSnappyCompressor(SnappyCompressor compressor) {
+      super(compressor, SnappyCodec.getBufferSize(conf));
+    }
+
+    HadoopSnappyCompressor() {
+      this(new SnappyCompressor());
+    }
+
+    @Override
+    int getLevel(Configuration conf) {
+      return 0;
+    }
+
+    @Override
+    int getBufferSize(Configuration conf) {
+      return SnappyCodec.getBufferSize(conf);
+    }
+
+  }
+
+  @InterfaceAudience.Private
+  public class HadoopSnappyDecompressor extends HadoopDecompressor<SnappyDecompressor> {
+
+    HadoopSnappyDecompressor(SnappyDecompressor decompressor) {
+      super(decompressor, getBufferSize(conf));
+    }
+
+    HadoopSnappyDecompressor() {
+      this(new SnappyDecompressor());
+    }
+
+  }
+
+  // Package private
+
+  static int getBufferSize(Configuration conf) {
+    int size = conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
+      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
+    return size > 0 ? size : 256 * 1024; // Don't change this default
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/ZstdCodec.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/ZstdCodec.java
new file mode 100644
index 0000000..a25943f
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/ZstdCodec.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.compress.BlockCompressorStream;
+import org.apache.hadoop.io.compress.BlockDecompressorStream;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
+
+/**
+ * Hadoop codec implementation for Zstandard, implemented with aircompressor.
+ * <p>
+ * Unlike the other codecs this one should be considered as under development and unstable
+ * (as in changing), reflecting the status of aircompressor's zstandard implementation.
+ * <p>
+ * NOTE: This codec is NOT data format compatible with the Hadoop native zstandard codec.
+ * There are issues with both framing and limitations of the aircompressor zstandard
+ * compressor. This codec can be used as an alternative to the native codec, if the native
+ * codec cannot be made available and/or an eventual migration will never be necessary
+ * (i.e. this codec's performance meets anticipated requirements). Once you begin using this
+ * alternative you will be locked into it.
+ */
+@InterfaceAudience.Private
+public class ZstdCodec implements Configurable, CompressionCodec {
+
+  public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
+
+  private Configuration conf;
+
+  public ZstdCodec() {
+    conf = new Configuration();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public Compressor createCompressor() {
+    return new HadoopZstdCompressor();
+  }
+
+  @Override
+  public Decompressor createDecompressor() {
+    return new HadoopZstdDecompressor();
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in) throws IOException {
+    return createInputStream(in, createDecompressor());
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
+      throws IOException {
+    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
+    return createOutputStream(out, createCompressor());
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
+      throws IOException {
+    int bufferSize = getBufferSize(conf);
+    int compressionOverhead = (bufferSize / 6) + 32;
+    return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
+  }
+
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    return HadoopZstdCompressor.class;
+  }
+
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    return HadoopZstdDecompressor.class;
+  }
+
+  @Override
+  public String getDefaultExtension() {
+    return ".zst";
+  }
+
+  @InterfaceAudience.Private
+  public class HadoopZstdCompressor extends HadoopCompressor<ZstdCompressor> {
+
+    HadoopZstdCompressor(ZstdCompressor compressor) {
+      super(compressor, ZstdCodec.getBufferSize(conf));
+    }
+
+    HadoopZstdCompressor() {
+      this(new ZstdCompressor());
+    }
+
+    @Override
+    int getLevel(Configuration conf) {
+      return 0;
+    }
+
+    @Override
+    int getBufferSize(Configuration conf) {
+      return ZstdCodec.getBufferSize(conf);
+    }
+
+  }
+
+  @InterfaceAudience.Private
+  public class HadoopZstdDecompressor extends HadoopDecompressor<ZstdDecompressor> {
+
+    HadoopZstdDecompressor(ZstdDecompressor decompressor) {
+      super(decompressor, getBufferSize(conf));
+    }
+
+    HadoopZstdDecompressor() {
+      this(new ZstdDecompressor());
+    }
+
+  }
+
+  // Package private
+
+  static int getBufferSize(Configuration conf) {
+    int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY,
+      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT));
+    return size > 0 ? size : 256 * 1024; // Don't change this default
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestHFileCompressionLz4.java b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestHFileCompressionLz4.java
new file mode 100644
index 0000000..547fe1d
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestHFileCompressionLz4.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.compress.HFileTestBase;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({IOTests.class, SmallTests.class})
+public class TestHFileCompressionLz4 extends HFileTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestHFileCompressionLz4.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.LZ4_CODEC_CLASS_KEY, Lz4Codec.class.getCanonicalName());
+    Compression.Algorithm.LZ4.reload(conf);
+    HFileTestBase.setUpBeforeClass();
+  }
+
+  @Test
+  public void test() throws Exception {
+    doTest(Compression.Algorithm.LZ4);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestHFileCompressionLzo.java b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestHFileCompressionLzo.java
new file mode 100644
index 0000000..db0a79d
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestHFileCompressionLzo.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.compress.HFileTestBase;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({IOTests.class, SmallTests.class})
+public class TestHFileCompressionLzo extends HFileTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestHFileCompressionLzo.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.LZO_CODEC_CLASS_KEY, LzoCodec.class.getCanonicalName());
+    Compression.Algorithm.LZO.reload(conf);
+    HFileTestBase.setUpBeforeClass();
+  }
+
+  @Test
+  public void test() throws Exception {
+    doTest(Compression.Algorithm.LZO);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestHFileCompressionSnappy.java b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestHFileCompressionSnappy.java
new file mode 100644
index 0000000..85b17b0
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestHFileCompressionSnappy.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.compress.HFileTestBase;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({IOTests.class, SmallTests.class})
+public class TestHFileCompressionSnappy extends HFileTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestHFileCompressionSnappy.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.SNAPPY_CODEC_CLASS_KEY, SnappyCodec.class.getCanonicalName());
+    Compression.Algorithm.SNAPPY.reload(conf);
+    HFileTestBase.setUpBeforeClass();
+  }
+
+  @Test
+  public void test() throws Exception {
+    doTest(Compression.Algorithm.SNAPPY);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestHFileCompressionZstd.java b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestHFileCompressionZstd.java
new file mode 100644
index 0000000..692cc09
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestHFileCompressionZstd.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.compress.HFileTestBase;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({IOTests.class, SmallTests.class})
+public class TestHFileCompressionZstd extends HFileTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestHFileCompressionZstd.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.ZSTD_CODEC_CLASS_KEY, ZstdCodec.class.getCanonicalName());
+    Compression.Algorithm.ZSTD.reload(conf);
+    HFileTestBase.setUpBeforeClass();
+  }
+
+  @Test
+  public void test() throws Exception {
+    doTest(Compression.Algorithm.ZSTD);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestLz4Codec.java b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestLz4Codec.java
new file mode 100644
index 0000000..db1cc72
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestLz4Codec.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestLz4Codec extends CompressionTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestLz4Codec.class);
+
+  @Test
+  public void testLz4CodecSmall() throws Exception {
+    codecSmallTest(new Lz4Codec());
+  }
+
+  @Test
+  public void testLz4CodecLarge() throws Exception {
+    codecLargeTest(new Lz4Codec(), 1.1); // poor compressability, expansion with this codec
+    codecLargeTest(new Lz4Codec(), 2);
+    codecLargeTest(new Lz4Codec(), 10);  // high compressability
+  }
+
+  @Test
+  public void testLz4CodecVeryLarge() throws Exception {
+    codecVeryLargeTest(new Lz4Codec(), 3); // like text
+  }
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestLzoCodec.java b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestLzoCodec.java
new file mode 100644
index 0000000..bd1b75a
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestLzoCodec.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestLzoCodec extends CompressionTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestLzoCodec.class);
+
+  @Test
+  public void testLzoCodecSmall() throws Exception {
+    codecSmallTest(new LzoCodec());
+  }
+
+  @Test
+  public void testLzoCodecLarge() throws Exception {
+    codecLargeTest(new LzoCodec(), 1.1); // poor compressability, expansion with this codec
+    codecLargeTest(new LzoCodec(), 2);
+    codecLargeTest(new LzoCodec(), 10);  // very high compressability
+  }
+
+  @Test
+  public void testLzoCodecVeryLarge() throws Exception {
+    codecVeryLargeTest(new LzoCodec(), 3); // like text
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestSnappyCodec.java b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestSnappyCodec.java
new file mode 100644
index 0000000..98e6281
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestSnappyCodec.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestSnappyCodec extends CompressionTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestSnappyCodec.class);
+
+  @Test
+  public void testSnappyCodecSmall() throws Exception {
+    codecSmallTest(new SnappyCodec());
+  }
+
+  @Test
+  public void testSnappyCodecLarge() throws Exception {
+    codecLargeTest(new SnappyCodec(), 1.1); // poor compressability, expansion with this codec
+    codecLargeTest(new SnappyCodec(), 2);
+    codecLargeTest(new SnappyCodec(), 10);  // very high compressability
+  }
+
+  @Test
+  public void testSnappyCodecVeryLarge() throws Exception {
+    codecVeryLargeTest(new SnappyCodec(), 3); // like text
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestWALCompressionLz4.java b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestWALCompressionLz4.java
new file mode 100644
index 0000000..23d7777
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestWALCompressionLz4.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.wal.CompressedWALTestBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestWALCompressionLz4 extends CompressedWALTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestWALCompressionLz4.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.LZ4_CODEC_CLASS_KEY, Lz4Codec.class.getCanonicalName());
+    Compression.Algorithm.LZ4.reload(conf);
+    conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+    conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
+    conf.set(CompressionContext.WAL_VALUE_COMPRESSION_TYPE, Compression.Algorithm.LZ4.getName());
+    TEST_UTIL.startMiniDFSCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
+    doTest(tableName);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestWALCompressionLzo.java b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestWALCompressionLzo.java
new file mode 100644
index 0000000..997d687
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestWALCompressionLzo.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.wal.CompressedWALTestBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestWALCompressionLzo extends CompressedWALTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestWALCompressionLzo.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.LZO_CODEC_CLASS_KEY, LzoCodec.class.getCanonicalName());
+    Compression.Algorithm.LZO.reload(conf);
+    conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+    conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
+    conf.set(CompressionContext.WAL_VALUE_COMPRESSION_TYPE, Compression.Algorithm.LZO.getName());
+    TEST_UTIL.startMiniDFSCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
+    doTest(tableName);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestWALCompressionSnappy.java b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestWALCompressionSnappy.java
new file mode 100644
index 0000000..924e46a
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestWALCompressionSnappy.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.wal.CompressedWALTestBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestWALCompressionSnappy extends CompressedWALTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestWALCompressionSnappy.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.SNAPPY_CODEC_CLASS_KEY, SnappyCodec.class.getCanonicalName());
+    Compression.Algorithm.SNAPPY.reload(conf);
+    conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+    conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
+    conf.set(CompressionContext.WAL_VALUE_COMPRESSION_TYPE, Compression.Algorithm.SNAPPY.getName());
+    TEST_UTIL.startMiniDFSCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
+    doTest(tableName);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestWALCompressionZstd.java b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestWALCompressionZstd.java
new file mode 100644
index 0000000..0de6de2
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestWALCompressionZstd.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.wal.CompressedWALTestBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestWALCompressionZstd extends CompressedWALTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestWALCompressionZstd.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.ZSTD_CODEC_CLASS_KEY, ZstdCodec.class.getCanonicalName());
+    Compression.Algorithm.ZSTD.reload(conf);
+    conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+    conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
+    conf.set(CompressionContext.WAL_VALUE_COMPRESSION_TYPE, Compression.Algorithm.ZSTD.getName());
+    TEST_UTIL.startMiniDFSCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
+    doTest(tableName);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestZstdCodec.java b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestZstdCodec.java
new file mode 100644
index 0000000..707fee2
--- /dev/null
+++ b/hbase-compression/hbase-compression-aircompressor/src/test/java/org/apache/hadoop/hbase/io/compress/aircompressor/TestZstdCodec.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.aircompressor;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestZstdCodec extends CompressionTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestZstdCodec.class);
+
+  @Test
+  public void testZstdCodecSmall() throws Exception {
+    codecSmallTest(new ZstdCodec());
+  }
+
+  @Test
+  public void testZstdCodecLarge() throws Exception {
+    codecLargeTest(new ZstdCodec(), 1.1); // poor compressability, expansion with this codec
+    codecLargeTest(new ZstdCodec(), 2);
+    codecLargeTest(new ZstdCodec(), 10);  // very high compressability
+  }
+
+  @Test
+  public void testZstdCodecVeryLarge() throws Exception {
+    codecVeryLargeTest(new ZstdCodec(), 3); // like text
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-lz4/pom.xml b/hbase-compression/hbase-compression-lz4/pom.xml
new file mode 100644
index 0000000..1956c89
--- /dev/null
+++ b/hbase-compression/hbase-compression-lz4/pom.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>hbase-compression</artifactId>
+    <groupId>org.apache.hbase</groupId>
+    <version>2.5.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <artifactId>hbase-compression-lz4</artifactId>
+  <name>Apache HBase - Compression - LZ4</name>
+  <description>Pure Java compression support using lz4-java</description>
+  <build>
+    <plugins>
+      <!-- Testing plugins -->
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>net.revelc.code</groupId>
+        <artifactId>warbucks-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <!--Make it so assembly:single does nothing in here-->
+          <artifactId>maven-assembly-plugin</artifactId>
+          <configuration>
+            <skipAssembly>true</skipAssembly>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+  <dependencies>
+    <!-- Intra-project dependencies -->
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-logging</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-asyncfs</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-annotations</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.github.stephenc.findbugs</groupId>
+      <artifactId>findbugs-annotations</artifactId>
+      <scope>compile</scope>
+      <optional>true</optional>
+    </dependency>
+    <!-- native Java compression codecs -->
+    <dependency>
+      <groupId>org.lz4</groupId>
+      <artifactId>lz4-java</artifactId>
+    </dependency>
+    <!--Test-->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jcl-over-slf4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jul-to-slf4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-library</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <profiles>
+    <profile>
+      <id>build-with-jdk11</id>
+      <activation>
+        <jdk>[1.11,)</jdk>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>javax.annotation</groupId>
+          <artifactId>javax.annotation-api</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+</project>
diff --git a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Codec.java b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Codec.java
new file mode 100644
index 0000000..a218954
--- /dev/null
+++ b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Codec.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.lz4;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.compress.BlockCompressorStream;
+import org.apache.hadoop.io.compress.BlockDecompressorStream;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Hadoop Lz4 codec implemented with lz4-java.
+ * <p>
+ * This is data format compatible with Hadoop's native lz4 codec.
+ */
+@InterfaceAudience.Private
+public class Lz4Codec implements Configurable, CompressionCodec {
+
+  public static final String LZ4_BUFFER_SIZE_KEY = "hbase.io.compress.lz4.buffersize";
+
+  private Configuration conf;
+
+  public Lz4Codec() {
+    conf = new Configuration();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Compressor createCompressor() {
+    return new Lz4Compressor(getBufferSize(conf));
+  }
+
+  @Override
+  public Decompressor createDecompressor() {
+    return new Lz4Decompressor(getBufferSize(conf));
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in) throws IOException {
+    return createInputStream(in, createDecompressor());
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
+      throws IOException {
+    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
+    return createOutputStream(out, createCompressor());
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
+      throws IOException {
+    int bufferSize = getBufferSize(conf);
+    int compressionOverhead = (bufferSize / 6) + 32;
+    return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
+  }
+
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    return Lz4Compressor.class;
+  }
+
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    return Lz4Decompressor.class;
+  }
+
+  @Override
+  public String getDefaultExtension() {
+    return ".lz4";
+  }
+
+  // Package private
+
+  static int getBufferSize(Configuration conf) {
+    int size = conf.getInt(LZ4_BUFFER_SIZE_KEY,
+      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
+    return size > 0 ? size : 256 * 1024; // Don't change this default
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Compressor.java b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Compressor.java
new file mode 100644
index 0000000..66928ad
--- /dev/null
+++ b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Compressor.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.lz4;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+
+/**
+ * Hadoop compressor glue for lz4-java.
+ */
+@InterfaceAudience.Private
+public class Lz4Compressor implements Compressor {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(Lz4Compressor.class);
+  protected LZ4Compressor compressor;
+  protected ByteBuffer inBuf, outBuf;
+  protected int bufferSize;
+  protected boolean finish, finished;
+  protected long bytesRead, bytesWritten;
+
+  Lz4Compressor(int bufferSize) {
+    compressor = LZ4Factory.fastestInstance().fastCompressor();
+    this.bufferSize = bufferSize;
+    this.inBuf = ByteBuffer.allocate(bufferSize);
+    this.outBuf = ByteBuffer.allocate(bufferSize);
+    this.outBuf.position(bufferSize);
+  }
+
+  @Override
+  public int compress(byte[] b, int off, int len) throws IOException {
+    // If we have previously compressed our input and still have some buffered bytes
+    // remaining, provide them to the caller.
+    if (outBuf.hasRemaining()) {
+      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
+      outBuf.get(b, off, n);
+      LOG.trace("compress: {} bytes from outBuf", n);
+      return n;
+    }
+    // We don't actually begin compression until our caller calls finish().
+    if (finish) {
+      if (inBuf.position() > 0) {
+        inBuf.flip();
+        int uncompressed = inBuf.remaining();
+        int needed = maxCompressedLength(uncompressed);
+        // Can we decompress directly into the provided array?
+        ByteBuffer writeBuffer;
+        boolean direct = false;
+        if (len <= needed) {
+          writeBuffer = ByteBuffer.wrap(b, off, len);
+          direct = true;
+        } else {
+          // If we don't have enough capacity in our currently allocated output buffer,
+          // allocate a new one which does.
+          if (outBuf.capacity() < needed) {
+            needed = CompressionUtil.roundInt2(needed);
+            LOG.trace("compress: resize outBuf {}", needed);
+            outBuf = ByteBuffer.allocate(needed);
+          } else {
+            outBuf.clear();
+          }
+          writeBuffer = outBuf;
+        }
+        final int oldPos = writeBuffer.position();
+        compressor.compress(inBuf, writeBuffer);
+        final int written = writeBuffer.position() - oldPos;
+        bytesWritten += written;
+        inBuf.clear();
+        LOG.trace("compress: compressed {} -> {}", uncompressed, written);
+        finished = true;
+        if (!direct) {
+          outBuf.flip();
+          int n = Math.min(written, len);
+          outBuf.get(b, off, n);
+          LOG.trace("compress: {} bytes", n);
+          return n;
+        } else {
+          LOG.trace("compress: {} bytes direct", written);
+          return written;
+        }
+      } else {
+        finished = true;
+      }
+    }
+    LOG.trace("No output");
+    return 0;
+  }
+
+  @Override
+  public void end() {
+    LOG.trace("end");
+  }
+
+  @Override
+  public void finish() {
+    LOG.trace("finish");
+    finish = true;
+  }
+
+  @Override
+  public boolean finished() {
+    boolean b = finished && !outBuf.hasRemaining();
+    LOG.trace("finished: {}", b);
+    return b;
+  }
+
+  @Override
+  public long getBytesRead() {
+    return bytesRead;
+  }
+
+  @Override
+  public long getBytesWritten() {
+    return bytesWritten;
+  }
+
+  @Override
+  public boolean needsInput() {
+    boolean b = !finished();
+    LOG.trace("needsInput: {}", b);
+    return b;
+  }
+
+  @Override
+  public void reinit(Configuration conf) {
+    LOG.trace("reinit");
+    if (conf != null) {
+      // Buffer size might have changed
+      int newBufferSize = Lz4Codec.getBufferSize(conf);
+      if (bufferSize != newBufferSize) {
+        bufferSize = newBufferSize;
+        this.inBuf = ByteBuffer.allocate(bufferSize);
+        this.outBuf = ByteBuffer.allocate(bufferSize);
+      }
+    }
+    reset();
+  }
+
+  @Override
+  public void reset() {
+    LOG.trace("reset");
+    compressor = LZ4Factory.fastestInstance().fastCompressor();
+    inBuf.clear();
+    outBuf.clear();
+    outBuf.position(outBuf.capacity());
+    bytesRead = 0;
+    bytesWritten = 0;
+    finish = false;
+    finished = false;
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException("setDictionary is not supported");
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    LOG.trace("setInput: off={} len={}", off, len);
+    if (inBuf.remaining() < len) {
+      // Get a new buffer that can accomodate the accumulated input plus the additional
+      // input that would cause a buffer overflow without reallocation.
+      // This condition should be fortunately rare, because it is expensive.
+      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
+      LOG.trace("setInput: resize inBuf {}", needed);
+      ByteBuffer newBuf = ByteBuffer.allocate(needed);
+      inBuf.flip();
+      newBuf.put(inBuf);
+      inBuf = newBuf;
+    }
+    inBuf.put(b, off, len);
+    bytesRead += len;
+    finished = false;
+  }
+
+  // Package private
+
+  int maxCompressedLength(int len) {
+    return compressor.maxCompressedLength(len);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Decompressor.java b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Decompressor.java
new file mode 100644
index 0000000..efb8c84
--- /dev/null
+++ b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Decompressor.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.lz4;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+
+/**
+ * Hadoop decompressor glue for lz4-java.
+ */
+@InterfaceAudience.Private
+public class Lz4Decompressor implements Decompressor {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(Lz4Decompressor.class);
+  protected LZ4SafeDecompressor decompressor;
+  protected ByteBuffer inBuf, outBuf;
+  protected int bufferSize, inLen;
+  protected boolean finished;
+
+  Lz4Decompressor(int bufferSize) {
+    this.decompressor = LZ4Factory.fastestInstance().safeDecompressor();
+    this.bufferSize = bufferSize;
+    this.inBuf = ByteBuffer.allocate(bufferSize);
+    this.outBuf = ByteBuffer.allocate(bufferSize);
+    this.outBuf.position(bufferSize);
+  }
+
+  @Override
+  public int decompress(byte[] b, int off, int len) throws IOException {
+    if (outBuf.hasRemaining()) {
+      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
+      outBuf.get(b, off, n);
+      LOG.trace("decompress: {} bytes from outBuf", n);
+      return n;
+    }
+    if (inBuf.position() > 0) {
+      inBuf.flip();
+      int remaining = inBuf.remaining();
+      inLen -= remaining;
+      outBuf.clear();
+      decompressor.decompress(inBuf, outBuf);
+      inBuf.clear();
+      final int written = outBuf.position();
+      LOG.trace("decompress: decompressed {} -> {}", remaining, written);
+      outBuf.flip();
+      int n = Math.min(written, len);
+      outBuf.get(b, off, n);
+      LOG.trace("decompress: {} bytes", n);
+      return n;
+    }
+    LOG.trace("decompress: No output, finished");
+    finished = true;
+    return 0;
+  }
+
+  @Override
+  public void end() {
+    LOG.trace("end");
+  }
+
+  @Override
+  public boolean finished() {
+    LOG.trace("finished");
+    return finished;
+  }
+
+  @Override
+  public int getRemaining() {
+    LOG.trace("getRemaining: {}", inLen);
+    return inLen;
+  }
+
+  @Override
+  public boolean needsDictionary() {
+    LOG.trace("needsDictionary");
+    return false;
+  }
+
+  @Override
+  public void reset() {
+    LOG.trace("reset");
+    this.decompressor = LZ4Factory.fastestInstance().safeDecompressor();
+    inBuf.clear();
+    inLen = 0;
+    outBuf.clear();
+    outBuf.position(outBuf.capacity());
+    finished = false;
+  }
+
+  @Override
+  public boolean needsInput() {
+    boolean b = (inBuf.position() == 0);
+    LOG.trace("needsInput: {}", b);
+    return b;
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException("setDictionary is not supported");
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    LOG.trace("setInput: off={} len={}", off, len);
+    if (inBuf.remaining() < len) {
+      // Get a new buffer that can accomodate the accumulated input plus the additional
+      // input that would cause a buffer overflow without reallocation.
+      // This condition should be fortunately rare, because it is expensive.
+      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
+      LOG.trace("setInput: resize inBuf {}", needed);
+      ByteBuffer newBuf = ByteBuffer.allocate(needed);
+      inBuf.flip();
+      newBuf.put(inBuf);
+      inBuf = newBuf;
+    }
+    inBuf.put(b, off, len);
+    inLen += len;
+    finished = false;
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-lz4/src/test/java/org/apache/hadoop/hbase/io/compress/lz4/TestHFileCompressionLz4.java b/hbase-compression/hbase-compression-lz4/src/test/java/org/apache/hadoop/hbase/io/compress/lz4/TestHFileCompressionLz4.java
new file mode 100644
index 0000000..78c0f65
--- /dev/null
+++ b/hbase-compression/hbase-compression-lz4/src/test/java/org/apache/hadoop/hbase/io/compress/lz4/TestHFileCompressionLz4.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.lz4;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.compress.HFileTestBase;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({IOTests.class, SmallTests.class})
+public class TestHFileCompressionLz4 extends HFileTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestHFileCompressionLz4.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.LZ4_CODEC_CLASS_KEY, Lz4Codec.class.getCanonicalName());
+    Compression.Algorithm.LZ4.reload(conf);
+    HFileTestBase.setUpBeforeClass();
+  }
+
+  @Test
+  public void test() throws Exception {
+    doTest(Compression.Algorithm.LZ4);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-lz4/src/test/java/org/apache/hadoop/hbase/io/compress/lz4/TestLz4Codec.java b/hbase-compression/hbase-compression-lz4/src/test/java/org/apache/hadoop/hbase/io/compress/lz4/TestLz4Codec.java
new file mode 100644
index 0000000..0c237e1
--- /dev/null
+++ b/hbase-compression/hbase-compression-lz4/src/test/java/org/apache/hadoop/hbase/io/compress/lz4/TestLz4Codec.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.lz4;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestLz4Codec extends CompressionTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestLz4Codec.class);
+
+  @Test
+  public void testLz4CodecSmall() throws Exception {
+    codecSmallTest(new Lz4Codec());
+  }
+
+  @Test
+  public void testLz4CodecLarge() throws Exception {
+    codecLargeTest(new Lz4Codec(), 1.1); // poor compressability, expansion with this codec
+    codecLargeTest(new Lz4Codec(),   2);
+    codecLargeTest(new Lz4Codec(),  10); // very high compressability
+  }
+
+  @Test
+  public void testLz4CodecVeryLarge() throws Exception {
+    codecVeryLargeTest(new Lz4Codec(), 3); // like text
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-lz4/src/test/java/org/apache/hadoop/hbase/io/compress/lz4/TestWALCompressionLz4.java b/hbase-compression/hbase-compression-lz4/src/test/java/org/apache/hadoop/hbase/io/compress/lz4/TestWALCompressionLz4.java
new file mode 100644
index 0000000..fdf9b0a
--- /dev/null
+++ b/hbase-compression/hbase-compression-lz4/src/test/java/org/apache/hadoop/hbase/io/compress/lz4/TestWALCompressionLz4.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.lz4;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.wal.CompressedWALTestBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestWALCompressionLz4 extends CompressedWALTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestWALCompressionLz4.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.LZ4_CODEC_CLASS_KEY, Lz4Codec.class.getCanonicalName());
+    Compression.Algorithm.LZ4.reload(conf);
+    conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+    conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
+    conf.set(CompressionContext.WAL_VALUE_COMPRESSION_TYPE, Compression.Algorithm.LZ4.getName());
+    TEST_UTIL.startMiniDFSCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
+    doTest(tableName);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-snappy/pom.xml b/hbase-compression/hbase-compression-snappy/pom.xml
new file mode 100644
index 0000000..3d41820
--- /dev/null
+++ b/hbase-compression/hbase-compression-snappy/pom.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>hbase-compression</artifactId>
+    <groupId>org.apache.hbase</groupId>
+    <version>2.5.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <artifactId>hbase-compression-snappy</artifactId>
+  <name>Apache HBase - Compression - Snappy</name>
+  <description>Pure Java compression support using Xerial Snappy</description>
+  <build>
+    <plugins>
+      <!-- Testing plugins -->
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>net.revelc.code</groupId>
+        <artifactId>warbucks-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <!--Make it so assembly:single does nothing in here-->
+          <artifactId>maven-assembly-plugin</artifactId>
+          <configuration>
+            <skipAssembly>true</skipAssembly>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+  <dependencies>
+    <!-- Intra-project dependencies -->
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-logging</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-asyncfs</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-annotations</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.github.stephenc.findbugs</groupId>
+      <artifactId>findbugs-annotations</artifactId>
+      <scope>compile</scope>
+      <optional>true</optional>
+    </dependency>
+    <!-- native Java compression codecs -->
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>snappy-java</artifactId>
+    </dependency>
+    <!--Test-->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jcl-over-slf4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jul-to-slf4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-library</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <profiles>
+    <profile>
+      <id>build-with-jdk11</id>
+      <activation>
+        <jdk>[1.11,)</jdk>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>javax.annotation</groupId>
+          <artifactId>javax.annotation-api</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+</project>
diff --git a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCodec.java b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCodec.java
new file mode 100644
index 0000000..e7c62c5
--- /dev/null
+++ b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCodec.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.xerial;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.compress.BlockCompressorStream;
+import org.apache.hadoop.io.compress.BlockDecompressorStream;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Hadoop Snappy codec implemented with Xerial Snappy.
+ * <p>
+ * This is data format compatible with Hadoop's native snappy codec.
+ */
+@InterfaceAudience.Private
+public class SnappyCodec implements Configurable, CompressionCodec {
+
+  public static final String SNAPPY_BUFFER_SIZE_KEY = "hbase.io.compress.snappy.buffersize";
+
+  private Configuration conf;
+
+  public SnappyCodec() {
+    conf = new Configuration();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Compressor createCompressor() {
+    return new SnappyCompressor(getBufferSize(conf));
+  }
+
+  @Override
+  public Decompressor createDecompressor() {
+    return new SnappyDecompressor(getBufferSize(conf));
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in) throws IOException {
+    return createInputStream(in, createDecompressor());
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
+      throws IOException {
+    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
+    return createOutputStream(out, createCompressor());
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
+      throws IOException {
+    int bufferSize = getBufferSize(conf);
+    int compressionOverhead = (bufferSize / 6) + 32;
+    return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
+  }
+
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    return SnappyCompressor.class;
+  }
+
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    return SnappyDecompressor.class;
+  }
+
+  @Override
+  public String getDefaultExtension() {
+    return ".snappy";
+  }
+
+  // Package private
+
+  static int getBufferSize(Configuration conf) {
+    int size = conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
+      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
+    return size > 0 ? size : 256 * 1024; // Don't change this default
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCompressor.java b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCompressor.java
new file mode 100644
index 0000000..71e7b7a
--- /dev/null
+++ b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCompressor.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.xerial;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.xerial.snappy.Snappy;
+
+/**
+ * Hadoop compressor glue for Xerial Snappy.
+ */
+@InterfaceAudience.Private
+public class SnappyCompressor implements Compressor {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(SnappyCompressor.class);
+  protected ByteBuffer inBuf, outBuf;
+  protected int bufferSize;
+  protected boolean finish, finished;
+  protected long bytesRead, bytesWritten;
+
+  SnappyCompressor(int bufferSize) {
+    this.bufferSize = bufferSize;
+    this.inBuf = ByteBuffer.allocateDirect(bufferSize);
+    this.outBuf = ByteBuffer.allocateDirect(bufferSize);
+    this.outBuf.position(bufferSize);
+  }
+
+  @Override
+  public int compress(byte[] b, int off, int len) throws IOException {
+    // If we have previously compressed our input and still have some buffered bytes
+    // remaining, provide them to the caller.
+    if (outBuf.hasRemaining()) {
+      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
+      outBuf.get(b, off, n);
+      LOG.trace("compress: {} bytes from outBuf", n);
+      return n;
+    }
+    // We don't actually begin compression until our caller calls finish().
+    if (finish) {
+      if (inBuf.position() > 0) {
+        inBuf.flip();
+        int uncompressed = inBuf.remaining();
+        // If we don't have enough capacity in our currently allocated output buffer,
+        // allocate a new one which does.
+        int needed = maxCompressedLength(uncompressed);
+        if (outBuf.capacity() < needed) {
+          needed = CompressionUtil.roundInt2(needed);
+          LOG.trace("setInput: resize inBuf {}", needed);
+          outBuf = ByteBuffer.allocateDirect(needed);
+        } else {
+          outBuf.clear();
+        }
+        int written = Snappy.compress(inBuf, outBuf);
+        bytesWritten += written;
+        inBuf.clear();
+        LOG.trace("compress: compressed {} -> {}", uncompressed, written);
+        finished = true;
+        int n = Math.min(written, len);
+        outBuf.get(b, off, n);
+        LOG.trace("compress: {} bytes", n);
+        return n;
+      } else {
+        finished = true;
+      }
+    }
+    LOG.trace("No output");
+    return 0;
+  }
+
+  @Override
+  public void end() {
+    LOG.trace("end");
+  }
+
+  @Override
+  public void finish() {
+    LOG.trace("finish");
+    finish = true;
+  }
+
+  @Override
+  public boolean finished() {
+    boolean b = finished && !outBuf.hasRemaining();
+    LOG.trace("finished: {}", b);
+    return b;
+  }
+
+  @Override
+  public long getBytesRead() {
+    return bytesRead;
+  }
+
+  @Override
+  public long getBytesWritten() {
+    return bytesWritten;
+  }
+
+  @Override
+  public boolean needsInput() {
+    boolean b = !finished();
+    LOG.trace("needsInput: {}", b);
+    return b;
+  }
+
+  @Override
+  public void reinit(Configuration conf) {
+    LOG.trace("reinit");
+    if (conf != null) {
+      // Buffer size might have changed
+      int newBufferSize = SnappyCodec.getBufferSize(conf);
+      if (bufferSize != newBufferSize) {
+        bufferSize = newBufferSize;
+        this.inBuf = ByteBuffer.allocateDirect(bufferSize);
+        this.outBuf = ByteBuffer.allocateDirect(bufferSize);
+      }
+    }
+    reset();
+  }
+
+  @Override
+  public void reset() {
+    LOG.trace("reset");
+    inBuf.clear();
+    outBuf.clear();
+    outBuf.position(outBuf.capacity());
+    bytesRead = 0;
+    bytesWritten = 0;
+    finish = false;
+    finished = false;
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException("setDictionary is not supported");
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    LOG.trace("setInput: off={} len={}", off, len);
+    if (inBuf.remaining() < len) {
+      // Get a new buffer that can accomodate the accumulated input plus the additional
+      // input that would cause a buffer overflow without reallocation.
+      // This condition should be fortunately rare, because it is expensive.
+      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
+      LOG.trace("setInput: resize inBuf {}", needed);
+      ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
+      inBuf.flip();
+      newBuf.put(inBuf);
+      inBuf = newBuf;
+    }
+    inBuf.put(b, off, len);
+    bytesRead += len;
+    finished = false;
+  }
+
+  // Package private
+
+  int maxCompressedLength(int len) {
+    return Snappy.maxCompressedLength(len);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyDecompressor.java b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyDecompressor.java
new file mode 100644
index 0000000..e911921
--- /dev/null
+++ b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyDecompressor.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.xerial;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.xerial.snappy.Snappy;
+
+/**
+ * Hadoop decompressor glue for Xerial Snappy.
+ */
+@InterfaceAudience.Private
+public class SnappyDecompressor implements Decompressor {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(SnappyDecompressor.class);
+  protected ByteBuffer inBuf, outBuf;
+  protected int inLen;
+  protected boolean finished;
+
+  SnappyDecompressor(int bufferSize) {
+    this.inBuf = ByteBuffer.allocateDirect(bufferSize);
+    this.outBuf = ByteBuffer.allocateDirect(bufferSize);
+    this.outBuf.position(bufferSize);
+  }
+
+  @Override
+  public int decompress(byte[] b, int off, int len) throws IOException {
+    if (outBuf.hasRemaining()) {
+      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
+      outBuf.get(b, off, n);
+      LOG.trace("decompress: {} bytes from outBuf", n);
+      return n;
+    }
+    if (inBuf.position() > 0) {
+      inBuf.flip();
+      int remaining = inBuf.remaining();
+      inLen -= remaining;
+      outBuf.clear();
+      int written = Snappy.uncompress(inBuf, outBuf);
+      inBuf.clear();
+      LOG.trace("decompress: decompressed {} -> {}", remaining, written);
+      int n = Math.min(written, len);
+      outBuf.get(b, off, n);
+      LOG.trace("decompress: {} bytes", n);
+      return n;
+    }
+    LOG.trace("decompress: No output, finished");
+    finished = true;
+    return 0;
+  }
+
+  @Override
+  public void end() {
+    LOG.trace("end");
+  }
+
+  @Override
+  public boolean finished() {
+    LOG.trace("finished");
+    return finished;
+  }
+
+  @Override
+  public int getRemaining() {
+    LOG.trace("getRemaining: {}", inLen);
+    return inLen;
+  }
+
+  @Override
+  public boolean needsDictionary() {
+    LOG.trace("needsDictionary");
+    return false;
+  }
+
+  @Override
+  public void reset() {
+    LOG.trace("reset");
+    inBuf.clear();
+    inLen = 0;
+    outBuf.clear();
+    outBuf.position(outBuf.capacity());
+    finished = false;
+  }
+
+  @Override
+  public boolean needsInput() {
+    boolean b = (inBuf.position() == 0);
+    LOG.trace("needsInput: {}", b);
+    return b;
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException("setDictionary is not supported");
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    LOG.trace("setInput: off={} len={}", off, len);
+    if (inBuf.remaining() < len) {
+      // Get a new buffer that can accomodate the accumulated input plus the additional
+      // input that would cause a buffer overflow without reallocation.
+      // This condition should be fortunately rare, because it is expensive.
+      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
+      LOG.trace("setInput: resize inBuf {}", needed);
+      ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
+      inBuf.flip();
+      newBuf.put(inBuf);
+      inBuf = newBuf;
+    }
+    inBuf.put(b, off, len);
+    inLen += len;
+    finished = false;
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-snappy/src/test/java/org/apache/hadoop/hbase/io/compress/xerial/TestHFileCompressionSnappy.java b/hbase-compression/hbase-compression-snappy/src/test/java/org/apache/hadoop/hbase/io/compress/xerial/TestHFileCompressionSnappy.java
new file mode 100644
index 0000000..094b434
--- /dev/null
+++ b/hbase-compression/hbase-compression-snappy/src/test/java/org/apache/hadoop/hbase/io/compress/xerial/TestHFileCompressionSnappy.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.xerial;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.compress.HFileTestBase;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({IOTests.class, SmallTests.class})
+public class TestHFileCompressionSnappy extends HFileTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestHFileCompressionSnappy.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.SNAPPY_CODEC_CLASS_KEY, SnappyCodec.class.getCanonicalName());
+    Compression.Algorithm.SNAPPY.reload(conf);
+    HFileTestBase.setUpBeforeClass();
+  }
+
+  @Test
+  public void test() throws Exception {
+    doTest(Compression.Algorithm.SNAPPY);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-snappy/src/test/java/org/apache/hadoop/hbase/io/compress/xerial/TestSnappyCodec.java b/hbase-compression/hbase-compression-snappy/src/test/java/org/apache/hadoop/hbase/io/compress/xerial/TestSnappyCodec.java
new file mode 100644
index 0000000..e882d79
--- /dev/null
+++ b/hbase-compression/hbase-compression-snappy/src/test/java/org/apache/hadoop/hbase/io/compress/xerial/TestSnappyCodec.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.xerial;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestSnappyCodec extends CompressionTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestSnappyCodec.class);
+
+  @Test
+  public void testSnappyCodecSmall() throws Exception {
+    codecSmallTest(new SnappyCodec());
+  }
+
+  @Test
+  public void testSnappyCodecLarge() throws Exception {
+    codecLargeTest(new SnappyCodec(), 1.1); // poor compressability, expansion with this codec
+    codecLargeTest(new SnappyCodec(), 2);
+    codecLargeTest(new SnappyCodec(), 10);  // very high compressability
+  }
+
+  @Test
+  public void testSnappyCodecVeryLarge() throws Exception {
+    codecVeryLargeTest(new SnappyCodec(), 3); // like text
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-snappy/src/test/java/org/apache/hadoop/hbase/io/compress/xerial/TestWALCompressionSnappy.java b/hbase-compression/hbase-compression-snappy/src/test/java/org/apache/hadoop/hbase/io/compress/xerial/TestWALCompressionSnappy.java
new file mode 100644
index 0000000..ba59b65
--- /dev/null
+++ b/hbase-compression/hbase-compression-snappy/src/test/java/org/apache/hadoop/hbase/io/compress/xerial/TestWALCompressionSnappy.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.xerial;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.wal.CompressedWALTestBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestWALCompressionSnappy extends CompressedWALTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestWALCompressionSnappy.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.SNAPPY_CODEC_CLASS_KEY, SnappyCodec.class.getCanonicalName());
+    Compression.Algorithm.SNAPPY.reload(conf);
+    conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+    conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
+    conf.set(CompressionContext.WAL_VALUE_COMPRESSION_TYPE, Compression.Algorithm.SNAPPY.getName());
+    TEST_UTIL.startMiniDFSCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
+    doTest(tableName);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-xz/pom.xml b/hbase-compression/hbase-compression-xz/pom.xml
new file mode 100644
index 0000000..e3ded6b
--- /dev/null
+++ b/hbase-compression/hbase-compression-xz/pom.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>hbase-compression</artifactId>
+    <groupId>org.apache.hbase</groupId>
+    <version>2.5.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <artifactId>hbase-compression-xz</artifactId>
+  <name>Apache HBase - Compression - XZ</name>
+  <description>Pure Java compression support using XZ for Java</description>
+  <build>
+    <plugins>
+      <!-- Testing plugins -->
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>net.revelc.code</groupId>
+        <artifactId>warbucks-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <!--Make it so assembly:single does nothing in here-->
+          <artifactId>maven-assembly-plugin</artifactId>
+          <configuration>
+            <skipAssembly>true</skipAssembly>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+  <dependencies>
+    <!-- Intra-project dependencies -->
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-logging</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-annotations</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.github.stephenc.findbugs</groupId>
+      <artifactId>findbugs-annotations</artifactId>
+      <scope>compile</scope>
+      <optional>true</optional>
+    </dependency>
+    <!-- native Java compression codecs -->
+    <dependency>
+      <groupId>org.tukaani</groupId>
+      <artifactId>xz</artifactId>
+    </dependency>
+    <!--Test-->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jcl-over-slf4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jul-to-slf4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-library</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <profiles>
+    <profile>
+      <id>build-with-jdk11</id>
+      <activation>
+        <jdk>[1.11,)</jdk>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>javax.annotation</groupId>
+          <artifactId>javax.annotation-api</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+</project>
diff --git a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCodec.java b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCodec.java
new file mode 100644
index 0000000..99f29a2
--- /dev/null
+++ b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCodec.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.xz;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.BlockCompressorStream;
+import org.apache.hadoop.io.compress.BlockDecompressorStream;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Hadoop lzma codec implemented with XZ for Java.
+ */
+@InterfaceAudience.Private
+public class LzmaCodec implements Configurable, CompressionCodec {
+
+  public static final String LZMA_LEVEL_KEY = "hbase.io.compress.lzma.level";
+  public static final int LZMA_LEVEL_DEFAULT = 6;
+  public static final String LZMA_BUFFERSIZE_KEY = "hbase.io.compress.lzma.buffersize";
+  public static final int LZMA_BUFFERSIZE_DEFAULT = 256 * 1024;
+
+  private Configuration conf;
+
+  public LzmaCodec() {
+    conf = new Configuration();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Compressor createCompressor() {
+    return new LzmaCompressor(getLevel(conf), getBufferSize(conf));
+  }
+
+  @Override
+  public Decompressor createDecompressor() {
+    return new LzmaDecompressor(getBufferSize(conf));
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in) throws IOException {
+    return createInputStream(in, createDecompressor());
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
+      throws IOException {
+    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
+    return createOutputStream(out, createCompressor());
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
+      throws IOException {
+    int bufferSize = getBufferSize(conf);
+    int compressionOverhead = (bufferSize / 6) + 32;
+    return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
+  }
+
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    return LzmaCompressor.class;
+  }
+
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    return LzmaDecompressor.class;
+  }
+
+  @Override
+  public String getDefaultExtension() {
+    return ".lzma";
+  }
+
+  // Package private
+
+  static int getLevel(Configuration conf) {
+    return conf.getInt(LZMA_LEVEL_KEY, LZMA_LEVEL_DEFAULT);
+  }
+
+  static int getBufferSize(Configuration conf) {
+    return conf.getInt(LZMA_BUFFERSIZE_KEY, LZMA_BUFFERSIZE_DEFAULT);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCompressor.java b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCompressor.java
new file mode 100644
index 0000000..dd4d999
--- /dev/null
+++ b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCompressor.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.xz;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
+import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.tukaani.xz.ArrayCache;
+import org.tukaani.xz.BasicArrayCache;
+import org.tukaani.xz.LZMA2Options;
+import org.tukaani.xz.LZMAOutputStream;
+import org.tukaani.xz.UnsupportedOptionsException;
+
+/**
+ * Hadoop compressor glue for XZ for Java.
+ */
+@InterfaceAudience.Private
+public class LzmaCompressor implements Compressor {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(LzmaCompressor.class);
+  protected static final ArrayCache ARRAY_CACHE = new BasicArrayCache();
+  protected ByteBuffer inBuf;
+  protected ByteBuffer outBuf;
+  protected int bufferSize;
+  protected boolean finish, finished;
+  protected long bytesRead, bytesWritten;
+  protected LZMA2Options lzOptions;
+
+  LzmaCompressor(int level, int bufferSize) {
+    this.bufferSize = bufferSize;
+    this.inBuf = ByteBuffer.allocate(bufferSize);
+    this.outBuf = ByteBuffer.allocate(bufferSize);
+    this.outBuf.position(bufferSize);
+    this.lzOptions = new LZMA2Options();
+    try {
+      this.lzOptions.setPreset(level);
+    } catch (UnsupportedOptionsException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public int compress(byte[] b, int off, int len) throws IOException {
+    // If we have previously compressed our input and still have some buffered bytes
+    // remaining, provide them to the caller.
+    if (outBuf.hasRemaining()) {
+      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
+      outBuf.get(b, off, n);
+      LOG.trace("compress: {} bytes from outBuf", n);
+      return n;
+    }
+    // We don't actually begin compression until our caller calls finish().
+    if (finish) {
+      if (inBuf.position() > 0) {
+        inBuf.flip();
+        int uncompressed = inBuf.remaining();
+        // If we don't have enough capacity in our currently allocated output buffer,
+        // allocate a new one which does.
+        int needed = maxCompressedLength(uncompressed);
+        // Can we decompress directly into the provided array?
+        ByteBuffer writeBuffer;
+        boolean direct = false;
+        if (len <= needed) {
+          writeBuffer = ByteBuffer.wrap(b, off, len);
+          direct = true;
+        } else {
+          if (outBuf.capacity() < needed) {
+            needed = CompressionUtil.roundInt2(needed);
+            LOG.trace("compress: resize outBuf {}", needed);
+            outBuf = ByteBuffer.allocate(needed);
+          } else {
+            outBuf.clear();
+          }
+          writeBuffer = outBuf;
+        }
+        int oldPos = writeBuffer.position();
+        // This is pretty ugly. I don't see how to do it better. Stream to byte buffers back to
+        // stream back to byte buffers... if only XZ for Java had a public block compression
+        // API. It does not. Fortunately the algorithm is so slow, especially at higher levels,
+        // that inefficiencies here may not matter.
+        try (ByteBufferOutputStream lowerOut = new ByteBufferOutputStream(writeBuffer) {
+          @Override
+          // ByteBufferOutputStream will reallocate the output buffer if it is too small. We
+          // do not want that behavior here.
+          protected void checkSizeAndGrow(int extra) {
+            long capacityNeeded = curBuf.position() + (long) extra;
+            if (capacityNeeded > curBuf.limit()) {
+              throw new BufferOverflowException();
+            }
+          }
+        }) {
+          try (LZMAOutputStream out =
+              new LZMAOutputStream(lowerOut, lzOptions, uncompressed, ARRAY_CACHE)) {
+            out.write(inBuf.array(), inBuf.arrayOffset(), uncompressed);
+          }
+        }
+        int written = writeBuffer.position() - oldPos;
+        bytesWritten += written;
+        inBuf.clear();
+        LOG.trace("compress: compressed {} -> {}", uncompressed, written);
+        finished = true;
+        outBuf.flip();
+        if (!direct) {
+          int n = Math.min(written, len);
+          outBuf.get(b, off, n);
+          LOG.trace("compress: {} bytes", n);
+          return n;
+        } else {
+          LOG.trace("compress: {} bytes direct", written);
+          return written;
+        }
+      } else {
+        finished = true;
+      }
+    }
+    LOG.trace("No output");
+    return 0;
+  }
+
+  @Override
+  public void end() {
+    LOG.trace("end");
+  }
+
+  @Override
+  public void finish() {
+    LOG.trace("finish");
+    finish = true;
+  }
+
+  @Override
+  public boolean finished() {
+    boolean b = finished && !outBuf.hasRemaining();
+    LOG.trace("finished: {}", b);
+    return b;
+  }
+
+  @Override
+  public long getBytesRead() {
+    return bytesRead;
+  }
+
+  @Override
+  public long getBytesWritten() {
+    return bytesWritten;
+  }
+
+  @Override
+  public boolean needsInput() {
+    boolean b = !finished();
+    LOG.trace("needsInput: {}", b);
+    return b;
+  }
+
+  @Override
+  public void reinit(Configuration conf) {
+    LOG.trace("reinit");
+    if (conf != null) {
+      // Level might have changed
+      try {
+        int level = LzmaCodec.getLevel(conf);
+        this.lzOptions = new LZMA2Options();
+        this.lzOptions.setPreset(level);
+      } catch (UnsupportedOptionsException e) {
+        throw new RuntimeException(e);
+      }
+      // Buffer size might have changed
+      int newBufferSize = LzmaCodec.getBufferSize(conf);
+      if (bufferSize != newBufferSize) {
+        bufferSize = newBufferSize;
+        this.inBuf = ByteBuffer.allocate(bufferSize);
+        this.outBuf = ByteBuffer.allocate(bufferSize);
+      }
+    }
+    reset();
+  }
+
+  @Override
+  public void reset() {
+    LOG.trace("reset");
+    inBuf.clear();
+    outBuf.clear();
+    outBuf.position(outBuf.capacity());
+    bytesRead = 0;
+    bytesWritten = 0;
+    finish = false;
+    finished = false;
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException("setDictionary is not supported");
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    LOG.trace("setInput: off={} len={}", off, len);
+    if (inBuf.remaining() < len) {
+      // Get a new buffer that can accomodate the accumulated input plus the additional
+      // input that would cause a buffer overflow without reallocation.
+      // This condition should be fortunately rare, because it is expensive.
+      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
+      LOG.trace("setInput: resize inBuf to {}", needed);
+      ByteBuffer newBuf = ByteBuffer.allocate(needed);
+      inBuf.flip();
+      newBuf.put(inBuf);
+      inBuf = newBuf;
+    }
+    inBuf.put(b, off, len);
+    bytesRead += len;
+    finished = false;
+  }
+
+  // Package private
+
+  int maxCompressedLength(int len) {
+    return len + 32 + (len/6);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaDecompressor.java b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaDecompressor.java
new file mode 100644
index 0000000..be450b3
--- /dev/null
+++ b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaDecompressor.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.xz;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.tukaani.xz.ArrayCache;
+import org.tukaani.xz.BasicArrayCache;
+import org.tukaani.xz.LZMAInputStream;
+
+/**
+ * Hadoop decompressor glue for XZ for Java.
+ */
+@InterfaceAudience.Private
+public class LzmaDecompressor implements Decompressor {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(LzmaDecompressor.class);
+  protected static final ArrayCache ARRAY_CACHE = new BasicArrayCache() {
+    @Override
+    public byte[] getByteArray(int size, boolean fillWithZeros) {
+      // Work around a bug in XZ decompression if cached byte arrays are not cleared by
+      // always clearing them.
+      return super.getByteArray(size, true);
+    }
+  };
+  protected ByteBuffer inBuf, outBuf;
+  protected int inLen;
+  protected boolean finished;
+
+  LzmaDecompressor(int bufferSize) {
+    this.inBuf = ByteBuffer.allocate(bufferSize);
+    this.outBuf = ByteBuffer.allocate(bufferSize);
+    this.outBuf.position(bufferSize);
+  }
+
+  @Override
+  public int decompress(byte[] b, int off, int len) throws IOException {
+    if (outBuf.hasRemaining()) {
+      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
+      outBuf.get(b, off, n);
+      LOG.trace("decompress: {} bytes from outBuf", n);
+      return n;
+    }
+    if (inBuf.position() > 0) {
+      inBuf.flip();
+      int remaining = inBuf.remaining();
+      inLen -= remaining;
+      // This is pretty ugly. I don't see how to do it better. Stream to byte buffers back to
+      // stream back to byte buffers... if only XZ for Java had a public block compression API.
+      // It does not. LZMA decompression speed is reasonably good, so inefficiency here is a
+      // shame.
+      // Perhaps we could look at using reflection to make package protected classes for block
+      // compression in XZ for Java accessible here, that library can be expected to rarely
+      // change, if at all.
+      outBuf.clear();
+      try (ByteBufferInputStream lowerIn = new ByteBufferInputStream(inBuf)) {
+        final byte[] buf = new byte[8192];
+        try (LZMAInputStream in = new LZMAInputStream(lowerIn, ARRAY_CACHE)) {
+          int read;
+          do {
+            read = in.read(buf);
+            if (read > 0) {
+              outBuf.put(buf, 0, read);
+            }
+          } while (read > 0);
+        }
+      }
+      int written = outBuf.position();
+      outBuf.flip();
+      inBuf.clear();
+      LOG.trace("decompress: decompressed {} -> {}", remaining, written);
+      int n = Math.min(written, len);
+      outBuf.get(b, off, n);
+      LOG.trace("decompress: {} bytes", n);
+      return n;
+    }
+    LOG.trace("decompress: No output, finished");
+    finished = true;
+    return 0;
+  }
+
+  @Override
+  public void end() {
+    LOG.trace("end");
+  }
+
+  @Override
+  public boolean finished() {
+    LOG.trace("finished");
+    return finished;
+  }
+
+  @Override
+  public int getRemaining() {
+    LOG.trace("getRemaining: {}", inLen);
+    return inLen;
+  }
+
+  @Override
+  public boolean needsDictionary() {
+    LOG.trace("needsDictionary");
+    return false;
+  }
+
+  @Override
+  public void reset() {
+    LOG.trace("reset");
+    inBuf.clear();
+    inLen = 0;
+    outBuf.clear();
+    outBuf.position(outBuf.capacity());
+    finished = false;
+  }
+
+  @Override
+  public boolean needsInput() {
+    boolean b = (inBuf.position() == 0);
+    LOG.trace("needsInput: {}", b);
+    return b;
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException("setDictionary is not supported");
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    LOG.trace("setInput: off={} len={}", off, len);
+    if (inBuf.remaining() < len) {
+      // Get a new buffer that can accomodate the accumulated input plus the additional
+      // input that would cause a buffer overflow without reallocation.
+      // This condition should be fortunately rare, because it is expensive.
+      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
+      LOG.trace("setInput: resize inBuf {}", needed);
+      ByteBuffer newBuf = ByteBuffer.allocate(needed);
+      inBuf.flip();
+      newBuf.put(inBuf);
+      inBuf = newBuf;
+    }
+    inBuf.put(b, off, len);
+    inLen += len;
+    finished = false;
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-xz/src/test/java/org/apache/hadoop/hbase/io/compress/xz/TestHFileCompressionLzma.java b/hbase-compression/hbase-compression-xz/src/test/java/org/apache/hadoop/hbase/io/compress/xz/TestHFileCompressionLzma.java
new file mode 100644
index 0000000..04c7b51
--- /dev/null
+++ b/hbase-compression/hbase-compression-xz/src/test/java/org/apache/hadoop/hbase/io/compress/xz/TestHFileCompressionLzma.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.xz;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.compress.HFileTestBase;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({IOTests.class, SmallTests.class})
+public class TestHFileCompressionLzma extends HFileTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestHFileCompressionLzma.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.LZMA_CODEC_CLASS_KEY, LzmaCodec.class.getCanonicalName());
+    Compression.Algorithm.LZMA.reload(conf);
+    HFileTestBase.setUpBeforeClass();
+  }
+
+  @Test
+  public void test() throws Exception {
+    doTest(Compression.Algorithm.LZMA);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-xz/src/test/java/org/apache/hadoop/hbase/io/compress/xz/TestLzmaCodec.java b/hbase-compression/hbase-compression-xz/src/test/java/org/apache/hadoop/hbase/io/compress/xz/TestLzmaCodec.java
new file mode 100644
index 0000000..63978ab
--- /dev/null
+++ b/hbase-compression/hbase-compression-xz/src/test/java/org/apache/hadoop/hbase/io/compress/xz/TestLzmaCodec.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.xz;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestLzmaCodec extends CompressionTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestLzmaCodec.class);
+
+  @Test
+  public void testLzmaCodecSmall() throws Exception {
+    codecSmallTest(new LzmaCodec());
+  }
+
+  @Test
+  public void testLzmaCodecLarge() throws Exception {
+    codecLargeTest(new LzmaCodec(), 1.1); // poor compressability
+    codecLargeTest(new LzmaCodec(),   2);
+    codecLargeTest(new LzmaCodec(),  10); // very high compressability
+  }
+
+  @Test
+  public void testLzmaCodecVeryLarge() throws Exception {
+    Configuration conf = new Configuration();
+    // LZMA levels range from 1 to 9.
+    // Level 9 might take several minutes to complete. 3 is our default. 1 will be fast.
+    conf.setInt(LzmaCodec.LZMA_LEVEL_KEY, 1);
+    LzmaCodec codec = new LzmaCodec();
+    codec.setConf(conf);
+    codecVeryLargeTest(codec, 3); // like text
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-xz/src/test/java/org/apache/hadoop/hbase/io/compress/xz/TestWALCompressionLzma.java b/hbase-compression/hbase-compression-xz/src/test/java/org/apache/hadoop/hbase/io/compress/xz/TestWALCompressionLzma.java
new file mode 100644
index 0000000..89ce68b
--- /dev/null
+++ b/hbase-compression/hbase-compression-xz/src/test/java/org/apache/hadoop/hbase/io/compress/xz/TestWALCompressionLzma.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.xz;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.wal.CompressedWALTestBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestWALCompressionLzma extends CompressedWALTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestWALCompressionLzma.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.LZMA_CODEC_CLASS_KEY, LzmaCodec.class.getCanonicalName());
+    Compression.Algorithm.LZMA.reload(conf);
+    conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+    conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
+    conf.set(CompressionContext.WAL_VALUE_COMPRESSION_TYPE, Compression.Algorithm.LZMA.getName());
+    TEST_UTIL.startMiniDFSCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
+    doTest(tableName);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-zstd/pom.xml b/hbase-compression/hbase-compression-zstd/pom.xml
new file mode 100644
index 0000000..5b78f6e
--- /dev/null
+++ b/hbase-compression/hbase-compression-zstd/pom.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>hbase-compression</artifactId>
+    <groupId>org.apache.hbase</groupId>
+    <version>2.5.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <artifactId>hbase-compression-zstd</artifactId>
+  <name>Apache HBase - Compression - ZStandard</name>
+  <description>Pure Java compression support using zstd-jni</description>
+  <build>
+    <plugins>
+      <!-- Testing plugins -->
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>net.revelc.code</groupId>
+        <artifactId>warbucks-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <!--Make it so assembly:single does nothing in here-->
+          <artifactId>maven-assembly-plugin</artifactId>
+          <configuration>
+            <skipAssembly>true</skipAssembly>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+  <dependencies>
+    <!-- Intra-project dependencies -->
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-logging</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-asyncfs</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-annotations</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.github.stephenc.findbugs</groupId>
+      <artifactId>findbugs-annotations</artifactId>
+      <scope>compile</scope>
+      <optional>true</optional>
+    </dependency>
+    <!-- native Java compression codecs -->
+    <dependency>
+      <groupId>com.github.luben</groupId>
+      <artifactId>zstd-jni</artifactId>
+    </dependency>
+    <!--Test-->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jcl-over-slf4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jul-to-slf4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-library</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <profiles>
+    <profile>
+      <id>build-with-jdk11</id>
+      <activation>
+        <jdk>[1.11,)</jdk>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>javax.annotation</groupId>
+          <artifactId>javax.annotation-api</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+</project>
diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
new file mode 100644
index 0000000..f933896
--- /dev/null
+++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.zstd;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.compress.BlockCompressorStream;
+import org.apache.hadoop.io.compress.BlockDecompressorStream;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Hadoop ZStandard codec implemented with zstd-jni.
+ * <p>
+ * This is data format compatible with Hadoop's native ZStandard codec.
+ */
+@InterfaceAudience.Private
+public class ZstdCodec implements Configurable, CompressionCodec {
+
+  public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
+  public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
+
+  private Configuration conf;
+
+  public ZstdCodec() {
+    conf = new Configuration();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Compressor createCompressor() {
+    return new ZstdCompressor(getLevel(conf), getBufferSize(conf));
+  }
+
+  @Override
+  public Decompressor createDecompressor() {
+    return new ZstdDecompressor(getBufferSize(conf));
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in) throws IOException {
+    return createInputStream(in, createDecompressor());
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in, Decompressor d)
+      throws IOException {
+    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
+    return createOutputStream(out, createCompressor());
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
+      throws IOException {
+    int bufferSize = getBufferSize(conf);
+    int compressionOverhead = (bufferSize / 6) + 32;
+    return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
+  }
+
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    return ZstdCompressor.class;
+  }
+
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    return ZstdDecompressor.class;
+  }
+
+  @Override
+  public String getDefaultExtension() {
+    return ".zst";
+  }
+
+  // Package private
+
+  static int getLevel(Configuration conf) {
+    return conf.getInt(ZSTD_LEVEL_KEY,
+      conf.getInt(
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT));
+  }
+
+  static int getBufferSize(Configuration conf) {
+    int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY,
+      conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT));
+    return size > 0 ? size : 256 * 1024; // Don't change this default
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java
new file mode 100644
index 0000000..9e0b850
--- /dev/null
+++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.zstd;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.luben.zstd.Zstd;
+
+/**
+ * Hadoop compressor glue for zstd-jni.
+ */
+@InterfaceAudience.Private
+public class ZstdCompressor implements Compressor {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(ZstdCompressor.class);
+  protected int level, bufferSize;
+  protected ByteBuffer inBuf, outBuf;
+  protected boolean finish, finished;
+  protected long bytesRead, bytesWritten;
+
+  ZstdCompressor(int level, int bufferSize) {
+    this.level = level;
+    this.bufferSize = bufferSize;
+    this.inBuf = ByteBuffer.allocateDirect(bufferSize);
+    this.outBuf = ByteBuffer.allocateDirect(bufferSize);
+    this.outBuf.position(bufferSize);
+  }
+
+  @Override
+  public int compress(byte[] b, int off, int len) throws IOException {
+    // If we have previously compressed our input and still have some buffered bytes
+    // remaining, provide them to the caller.
+    if (outBuf.hasRemaining()) {
+      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
+      outBuf.get(b, off, n);
+      LOG.trace("compress: {} bytes from outBuf", n);
+      return n;
+    }
+    // We don't actually begin compression until our caller calls finish().
+    if (finish) {
+      if (inBuf.position() > 0) {
+        inBuf.flip();
+        int uncompressed = inBuf.remaining();
+        // If we don't have enough capacity in our currently allocated output buffer,
+        // allocate a new one which does.
+        int needed = maxCompressedLength(uncompressed);
+        if (outBuf.capacity() < needed) {
+          needed = CompressionUtil.roundInt2(needed);
+          LOG.trace("compress: resize outBuf {}", needed);
+          outBuf = ByteBuffer.allocateDirect(needed);
+        } else {
+          outBuf.clear();
+        }
+        int written = Zstd.compress(outBuf, inBuf, level, true);
+        bytesWritten += written;
+        inBuf.clear();
+        LOG.trace("compress: compressed {} -> {} (level {})", uncompressed, written, level);
+        finished = true;
+        outBuf.flip();
+        int n = Math.min(written, len);
+        outBuf.get(b, off, n);
+        LOG.trace("compress: {} bytes", n);
+        return n;
+      } else {
+        finished = true;
+      }
+    }
+    LOG.trace("No output");
+    return 0;
+  }
+
+  @Override
+  public void end() {
+    LOG.trace("end");
+  }
+
+  @Override
+  public void finish() {
+    LOG.trace("finish");
+    finish = true;
+  }
+
+  @Override
+  public boolean finished() {
+    boolean b = finished && !outBuf.hasRemaining();
+    LOG.trace("finished: {}", b);
+    return b;
+  }
+
+  @Override
+  public long getBytesRead() {
+    return bytesRead;
+  }
+
+  @Override
+  public long getBytesWritten() {
+    return bytesWritten;
+  }
+
+  @Override
+  public boolean needsInput() {
+    boolean b = !finished();
+    LOG.trace("needsInput: {}", b);
+    return b;
+  }
+
+  @Override
+  public void reinit(Configuration conf) {
+    LOG.trace("reinit");
+    if (conf != null) {
+      // Level might have changed
+      level = ZstdCodec.getLevel(conf);
+      // Buffer size might have changed
+      int newBufferSize = ZstdCodec.getBufferSize(conf);
+      if (bufferSize != newBufferSize) {
+        bufferSize = newBufferSize;
+        this.inBuf = ByteBuffer.allocateDirect(bufferSize);
+        this.outBuf = ByteBuffer.allocateDirect(bufferSize);
+      }
+    }
+    reset();
+  }
+
+  @Override
+  public void reset() {
+    LOG.trace("reset");
+    inBuf.clear();
+    outBuf.clear();
+    outBuf.position(outBuf.capacity());
+    bytesRead = 0;
+    bytesWritten = 0;
+    finish = false;
+    finished = false;
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException("setDictionary is not supported");
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    LOG.trace("setInput: off={} len={}", off, len);
+    if (inBuf.remaining() < len) {
+      // Get a new buffer that can accomodate the accumulated input plus the additional
+      // input that would cause a buffer overflow without reallocation.
+      // This condition should be fortunately rare, because it is expensive.
+      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
+      LOG.trace("setInput: resize inBuf {}", needed);
+      ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
+      inBuf.flip();
+      newBuf.put(inBuf);
+      inBuf = newBuf;
+    }
+    inBuf.put(b, off, len);
+    bytesRead += len;
+    finished = false;
+  }
+
+  // Package private
+
+  int maxCompressedLength(int len) {
+    return (int) Zstd.compressBound(len);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java
new file mode 100644
index 0000000..b25d0a3
--- /dev/null
+++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.zstd;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.luben.zstd.Zstd;
+
+/**
+ * Hadoop decompressor glue for zstd-java.
+ */
+@InterfaceAudience.Private
+public class ZstdDecompressor implements Decompressor {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(ZstdDecompressor.class);
+  protected ByteBuffer inBuf, outBuf;
+  protected int inLen;
+  protected boolean finished;
+
+  ZstdDecompressor(int bufferSize) {
+    this.inBuf = ByteBuffer.allocateDirect(bufferSize);
+    this.outBuf = ByteBuffer.allocateDirect(bufferSize);
+    this.outBuf.position(bufferSize);
+  }
+
+  @Override
+  public int decompress(byte[] b, int off, int len) throws IOException {
+    if (outBuf.hasRemaining()) {
+      int remaining = outBuf.remaining(), n = Math.min(remaining, len);
+      outBuf.get(b, off, n);
+      LOG.trace("decompress: {} bytes from outBuf", n);
+      return n;
+    }
+    if (inBuf.position() > 0) {
+      inBuf.flip();
+      int remaining = inBuf.remaining();
+      inLen -= remaining;
+      outBuf.clear();
+      int written = Zstd.decompress(outBuf, inBuf);
+      inBuf.clear();
+      LOG.trace("decompress: decompressed {} -> {}", remaining, written);
+      outBuf.flip();
+      int n = Math.min(written, len);
+      outBuf.get(b, off, n);
+      LOG.trace("decompress: {} bytes", n);
+      return n;
+    }
+    LOG.trace("decompress: No output, finished");
+    finished = true;
+    return 0;
+  }
+
+  @Override
+  public void end() {
+    LOG.trace("end");
+  }
+
+  @Override
+  public boolean finished() {
+    LOG.trace("finished");
+    return finished;
+  }
+
+  @Override
+  public int getRemaining() {
+    LOG.trace("getRemaining: {}", inLen);
+    return inLen;
+  }
+
+  @Override
+  public boolean needsDictionary() {
+    LOG.trace("needsDictionary");
+    return false;
+  }
+
+  @Override
+  public void reset() {
+    LOG.trace("reset");
+    inBuf.clear();
+    inLen = 0;
+    outBuf.clear();
+    outBuf.position(outBuf.capacity());
+    finished = false;
+  }
+
+  @Override
+  public boolean needsInput() {
+    boolean b = (inBuf.position() == 0);
+    LOG.trace("needsInput: {}", b);
+    return b;
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException("setDictionary is not supported");
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    LOG.trace("setInput: off={} len={}", off, len);
+    if (inBuf.remaining() < len) {
+      // Get a new buffer that can accomodate the accumulated input plus the additional
+      // input that would cause a buffer overflow without reallocation.
+      // This condition should be fortunately rare, because it is expensive.
+      int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
+      LOG.trace("setInput: resize inBuf {}", needed);
+      ByteBuffer newBuf = ByteBuffer.allocateDirect(needed);
+      inBuf.flip();
+      newBuf.put(inBuf);
+      inBuf = newBuf;
+    }
+    inBuf.put(b, off, len);
+    inLen += len;
+    finished = false;
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestHFileCompressionZstd.java b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestHFileCompressionZstd.java
new file mode 100644
index 0000000..07ce12d
--- /dev/null
+++ b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestHFileCompressionZstd.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.zstd;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.compress.HFileTestBase;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({IOTests.class, SmallTests.class})
+public class TestHFileCompressionZstd extends HFileTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestHFileCompressionZstd.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.ZSTD_CODEC_CLASS_KEY, ZstdCodec.class.getCanonicalName());
+    Compression.Algorithm.ZSTD.reload(conf);
+    HFileTestBase.setUpBeforeClass();
+  }
+
+  @Test
+  public void test() throws Exception {
+    doTest(Compression.Algorithm.ZSTD);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestWALCompressionZstd.java b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestWALCompressionZstd.java
new file mode 100644
index 0000000..e75de9b
--- /dev/null
+++ b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestWALCompressionZstd.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.zstd;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.wal.CompressedWALTestBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestWALCompressionZstd extends CompressedWALTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestWALCompressionZstd.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.ZSTD_CODEC_CLASS_KEY, ZstdCodec.class.getCanonicalName());
+    Compression.Algorithm.ZSTD.reload(conf);
+    conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+    conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
+    conf.set(CompressionContext.WAL_VALUE_COMPRESSION_TYPE, Compression.Algorithm.ZSTD.getName());
+    TEST_UTIL.startMiniDFSCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
+    doTest(tableName);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdCodec.java b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdCodec.java
new file mode 100644
index 0000000..6bcb2aa
--- /dev/null
+++ b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdCodec.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.zstd;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestZstdCodec extends CompressionTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestZstdCodec.class);
+
+  @Test
+  public void testzstdCodecSmall() throws Exception {
+    codecSmallTest(new ZstdCodec());
+  }
+
+  @Test
+  public void testzstdCodecLarge() throws Exception {
+    codecLargeTest(new ZstdCodec(), 1.1); // poor compressability
+    codecLargeTest(new ZstdCodec(),   2);
+    codecLargeTest(new ZstdCodec(),  10); // very high compressability
+  }
+
+  @Test
+  public void testzstdCodecVeryLarge() throws Exception {
+    Configuration conf = new Configuration();
+    // ZStandard levels range from 1 to 22.
+    // Level 22 might take up to a minute to complete. 3 is the Hadoop default, and will be fast.
+    conf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 3);
+    ZstdCodec codec = new ZstdCodec();
+    codec.setConf(conf);
+    codecVeryLargeTest(codec, 3); // like text
+  }
+
+}
diff --git a/hbase-compression/pom.xml b/hbase-compression/pom.xml
new file mode 100644
index 0000000..4f65df1
--- /dev/null
+++ b/hbase-compression/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <!--
+    /*
+     * Licensed to the Apache Software Foundation (ASF) under one
+     * or more contributor license agreements.  See the NOTICE file
+     * distributed with this work for additional information
+     * regarding copyright ownership.  The ASF licenses this file
+     * to you under the Apache License, Version 2.0 (the
+     * "License"); you may not use this file except in compliance
+     * with the License.  You may obtain a copy of the License at
+     *
+     *     http://www.apache.org/licenses/LICENSE-2.0
+     *
+     * Unless required by applicable law or agreed to in writing, software
+     * distributed under the License is distributed on an "AS IS" BASIS,
+     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+     * See the License for the specific language governing permissions and
+     * limitations under the License.
+     */
+  -->
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>hbase-build-configuration</artifactId>
+    <groupId>org.apache.hbase</groupId>
+    <version>2.5.0-SNAPSHOT</version>
+    <relativePath>../hbase-build-configuration</relativePath>
+  </parent>
+  <artifactId>hbase-compression</artifactId>
+  <name>Apache HBase - Compression</name>
+  <description>Pure Java compression support parent</description>
+  <packaging>pom</packaging>
+
+  <modules>
+    <module>hbase-compression-aircompressor</module>
+    <module>hbase-compression-lz4</module>
+    <module>hbase-compression-snappy</module>
+    <module>hbase-compression-xz</module>
+    <module>hbase-compression-zstd</module>
+  </modules>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-resource-bundle</artifactId>
+      <optional>true</optional>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <pluginManagement>
+      <plugins>
+        <!-- This entry overrides the excludeFileFilter element in the findbugs
+             configuration of the hbase/pom.xml file. This override specifies that
+             the excluded-filter-file is found TWO levels up from a grandchild project. -->
+        <plugin>
+          <groupId>com.github.spotbugs</groupId>
+          <artifactId>spotbugs-maven-plugin</artifactId>
+          <configuration>
+            <excludeFilterFile>${project.basedir}/../../dev-support/spotbugs-exclude.xml</excludeFilterFile>
+            <spotbugsXmlOutput>true</spotbugsXmlOutput>
+            <xmlOutput>true</xmlOutput>
+            <effort>Max</effort>
+          </configuration>
+        </plugin>
+        <plugin>
+          <!--Make it so assembly:single does nothing in here-->
+          <artifactId>maven-assembly-plugin</artifactId>
+          <configuration>
+            <skipAssembly>true</skipAssembly>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+    <plugins>
+      <!-- Special configuration for findbugs just in the parent, emulating the setup in
+           hbase/pom.xml. Note that exclude-file-filter is found ONE level up from this project. -->
+      <plugin>
+        <groupId>com.github.spotbugs</groupId>
+        <artifactId>spotbugs-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <inherited>false</inherited>
+            <goals>
+              <goal>spotbugs</goal>
+            </goals>
+            <configuration>
+              <excludeFilterFile>${project.basedir}/../dev-support/spotbugs-exclude.xml</excludeFilterFile>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <configuration>
+          <failOnViolation>true</failOnViolation>
+        </configuration>
+      </plugin>
+      <plugin>
+        <!--Make it so assembly:single does nothing in here-->
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <skipAssembly>true</skipAssembly>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
index 68967d6..0738798 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
@@ -82,7 +82,6 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
@@ -94,6 +93,7 @@ import org.apache.hadoop.hbase.util.GsonUtil;
 import org.apache.hadoop.hbase.util.Hash;
 import org.apache.hadoop.hbase.util.MurmurHash;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.RandomDistribution;
 import org.apache.hadoop.hbase.util.YammerHistogramUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
diff --git a/hbase-resource-bundle/src/main/resources/supplemental-models.xml b/hbase-resource-bundle/src/main/resources/supplemental-models.xml
index 21ada4c..43801a1 100644
--- a/hbase-resource-bundle/src/main/resources/supplemental-models.xml
+++ b/hbase-resource-bundle/src/main/resources/supplemental-models.xml
@@ -3322,4 +3322,19 @@ Copyright (c) 2007-2017 The JRuby project
       </licenses>
     </project>
   </supplement>
+  <supplement>
+    <project>
+      <groupId>io.airlift</groupId>
+      <artifactId>aircompressor</artifactId>
+      <version>0.21</version>
+      <name>Aircompressor compression codecs</name>
+      <licenses>
+        <license>
+          <name>Apache License, Version 2.0</name>
+          <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+          <distribution>repo</distribution>
+        </license>
+      </licenses>
+    </project>
+  </supplement>
 </supplementalDataModels>
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/compress/HFileTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/compress/HFileTestBase.java
new file mode 100644
index 0000000..4ca3a43
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/compress/HFileTestBase.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.security.SecureRandom;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.logging.Log4jUtils;
+import org.apache.hadoop.hbase.util.RedundantKVGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HFileTestBase {
+
+  static {
+    Log4jUtils.setLogLevel("org.apache.hadoop.hbase.io.compress", "TRACE");
+  }
+
+  protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  protected static final Logger LOG = LoggerFactory.getLogger(HFileTestBase.class);
+  protected static final SecureRandom RNG = new SecureRandom();
+  protected static FileSystem fs;
+
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    // Disable block cache in this test.
+    conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+    conf.setInt("hfile.format.version", 3);
+    fs = FileSystem.get(conf);
+  }
+
+  @SuppressWarnings("deprecation")
+  public void doTest(Compression.Algorithm compression) throws Exception {
+    // Create 10000 random test KVs
+    RedundantKVGenerator generator = new RedundantKVGenerator();
+    List<KeyValue> testKvs = generator.generateTestKeyValues(10000);
+
+    // Iterate through data block encoding and compression combinations
+    Configuration conf = TEST_UTIL.getConfiguration();
+    CacheConfig cacheConf = new CacheConfig(conf);
+    HFileContext fileContext = new HFileContextBuilder()
+      .withBlockSize(4096) // small block
+      .withCompression(compression)
+      .build();
+    // write a new test HFile
+    LOG.info("Writing with " + fileContext);
+    Path path = new Path(TEST_UTIL.getDataTestDir(),
+      HBaseTestingUtility.getRandomUUID().toString() + ".hfile");
+    FSDataOutputStream out = fs.create(path);
+    HFile.Writer writer = HFile.getWriterFactory(conf, cacheConf)
+      .withOutputStream(out)
+      .withFileContext(fileContext)
+      .create();
+    try {
+      for (KeyValue kv: testKvs) {
+        writer.append(kv);
+      }
+    } finally {
+      writer.close();
+      out.close();
+    }
+
+    // read it back in
+    LOG.info("Reading with " + fileContext);
+    int i = 0;
+    HFileScanner scanner = null;
+    HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
+    try {
+      scanner = reader.getScanner(false, false);
+      assertTrue("Initial seekTo failed", scanner.seekTo());
+      do {
+        Cell kv = scanner.getCell();
+        assertTrue("Read back an unexpected or invalid KV",
+          testKvs.contains(KeyValueUtil.ensureKeyValue(kv)));
+        i++;
+      } while (scanner.next());
+    } finally {
+      reader.close();
+      scanner.close();
+    }
+
+    assertEquals("Did not read back as many KVs as written", i, testKvs.size());
+
+    // Test random seeks with pread
+    LOG.info("Random seeking with " + fileContext);
+    reader = HFile.createReader(fs, path, cacheConf, true, conf);
+    try {
+      scanner = reader.getScanner(false, true);
+      assertTrue("Initial seekTo failed", scanner.seekTo());
+      for (i = 0; i < 100; i++) {
+        KeyValue kv = testKvs.get(RNG.nextInt(testKvs.size()));
+        assertEquals("Unable to find KV as expected: " + kv, 0, scanner.seekTo(kv));
+      }
+    } finally {
+      scanner.close();
+      reader.close();
+    }
+  }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/KVGenerator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/KVGenerator.java
index 892f4c9..19800a4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/KVGenerator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/KVGenerator.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.hfile;
 
 import java.util.Random;
 
+import org.apache.hadoop.hbase.util.RandomDistribution;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.WritableComparator;
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/KeySampler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/KeySampler.java
index a4c1a9b..806ffd6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/KeySampler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/KeySampler.java
@@ -18,7 +18,7 @@ package org.apache.hadoop.hbase.io.hfile;
 
 import java.util.Random;
 
-import org.apache.hadoop.hbase.io.hfile.RandomDistribution.DiscreteRNG;
+import org.apache.hadoop.hbase.util.RandomDistribution.DiscreteRNG;
 import org.apache.hadoop.io.BytesWritable;
 
 /*
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
index 3fb0c7d..6e76b5a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
@@ -113,7 +113,8 @@ public class TestHFileEncryption {
     HFileBlock b = hbr.readBlockData(pos, -1, false, false, true);
     assertEquals(0, HFile.getAndResetChecksumFailuresCount());
     b.sanityCheck();
-    assertFalse(b.isUnpacked());
+    assertFalse((b.getHFileContext().getCompression() != Compression.Algorithm.NONE)
+      && b.isUnpacked());
     b = b.unpack(ctx, hbr);
     LOG.info("Read a block at " + pos + " with" +
         " onDiskSizeWithHeader=" + b.getOnDiskSizeWithHeader() +
@@ -136,7 +137,7 @@ public class TestHFileEncryption {
     for (int i = 0; i < blocks; i++) {
       blockSizes[i] = (1024 + RNG.nextInt(1024 * 63)) / Bytes.SIZEOF_INT;
     }
-    for (Compression.Algorithm compression : TestHFileBlock.COMPRESSION_ALGORITHMS) {
+    for (Compression.Algorithm compression : HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
       Path path = new Path(TEST_UTIL.getDataTestDir(), "block_v3_" + compression + "_AES");
       LOG.info("testDataBlockEncryption: encryption=AES compression=" + compression);
       long totalSize = 0;
@@ -218,7 +219,7 @@ public class TestHFileEncryption {
     Configuration conf = TEST_UTIL.getConfiguration();
     CacheConfig cacheConf = new CacheConfig(conf);
     for (DataBlockEncoding encoding: DataBlockEncoding.values()) {
-      for (Compression.Algorithm compression: TestHFileBlock.COMPRESSION_ALGORITHMS) {
+      for (Compression.Algorithm compression: HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
         HFileContext fileContext = new HFileContextBuilder()
           .withBlockSize(4096) // small blocks
           .withEncryptionContext(cryptoContext)
@@ -228,7 +229,7 @@ public class TestHFileEncryption {
         // write a new test HFile
         LOG.info("Writing with " + fileContext);
         Path path = new Path(TEST_UTIL.getDataTestDir(),
-                        TEST_UTIL.getRandomUUID().toString() + ".hfile");
+          HBaseTestingUtility.getRandomUUID().toString() + ".hfile");
         FSDataOutputStream out = fs.create(path);
         HFile.Writer writer = HFile.getWriterFactory(conf, cacheConf)
           .withOutputStream(out)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
index 135d963..7d6212d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileSeek.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.RandomDistribution;
 import org.apache.hadoop.io.BytesWritable;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
index b5504bf..6cdda79 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ArrayBackedTag;
 import org.apache.hadoop.hbase.ByteBufferKeyValue;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PrivateCellUtil;
@@ -45,14 +46,29 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 @Category({RegionServerTests.class, SmallTests.class})
+@RunWith(Parameterized.class)
 public class TestWALCellCodecWithCompression {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestWALCellCodecWithCompression.class);
 
+  private Compression.Algorithm compression;
+
+  public TestWALCellCodecWithCompression(Compression.Algorithm algo) {
+    this.compression = algo;
+  }
+
+  @Parameters
+  public static List<Object[]> params() {
+    return HBaseTestingUtility.COMPRESSION_ALGORITHMS_PARAMETERIZED;
+  }
+
   @Test
   public void testEncodeDecodeKVsWithTags() throws Exception {
     doTest(false, false);
@@ -93,7 +109,7 @@ public class TestWALCellCodecWithCompression {
 
     Configuration conf = new Configuration(false);
     WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class,
-      false, true, true, Compression.Algorithm.GZ));
+      false, true, true, compression));
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
     Encoder encoder = codec.getEncoder(bos);
     encoder.write(createKV(row_1, value_1, 0));
@@ -102,7 +118,6 @@ public class TestWALCellCodecWithCompression {
     encoder.write(createKV(row_4, value_4, 0));
     encoder.write(createKV(row_5, value_5, 0));
     encoder.flush();
-
     try (InputStream is = new ByteArrayInputStream(bos.toByteArray())) {
       Decoder decoder = codec.getDecoder(is);
       decoder.advance();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/CompressedWALTestBase.java
similarity index 54%
copy from hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java
copy to hbase-server/src/test/java/org/apache/hadoop/hbase/wal/CompressedWALTestBase.java
index 8e660b6..540c6b6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/CompressedWALTestBase.java
@@ -24,101 +24,53 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
-import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
 
-@RunWith(Parameterized.class)
-@Category({ RegionServerTests.class, MediumTests.class })
-public class TestCompressedWAL {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestCompressedWAL.class);
-
-  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
-  @Rule
-  public TestName name = new TestName();
-
-  @Parameter
-  public String walProvider;
-
-  @Parameters(name = "{index}: provider={0}")
-  public static Iterable<Object[]> data() {
-    return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" });
-  }
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    Configuration conf = TEST_UTIL.getConfiguration();
-    CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS());
-    TEST_UTIL.startMiniDFSCluster(3);
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  @Before
-  public void setUp() {
-    TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider);
-    TEST_UTIL.getConfiguration()
-      .setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
-  }
-
-  @Test
-  public void testCompressedWAL() throws Exception {
-    TEST_UTIL.getConfiguration()
-      .setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
-    doTest();
-  }
-
-  @Test
-  public void testCompressedWALWithValueCompression() throws Exception {
-    TEST_UTIL.getConfiguration()
-      .setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
-    doTest();
+@SuppressWarnings("checkstyle:innerassignment")
+public class CompressedWALTestBase {
+
+  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  static final byte[] VALUE;
+  static {
+    // 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597
+    VALUE = new byte[1+1+2+3+5+8+13+21+34+55+89+144+233+377+610+987+1597];
+    int off = 0;
+    Arrays.fill(VALUE, off, (off+=1), (byte)'A');
+    Arrays.fill(VALUE, off, (off+=1), (byte)'B');
+    Arrays.fill(VALUE, off, (off+=2), (byte)'C');
+    Arrays.fill(VALUE, off, (off+=3), (byte)'D');
+    Arrays.fill(VALUE, off, (off+=5), (byte)'E');
+    Arrays.fill(VALUE, off, (off+=8), (byte)'F');
+    Arrays.fill(VALUE, off, (off+=13), (byte)'G');
+    Arrays.fill(VALUE, off, (off+=21), (byte)'H');
+    Arrays.fill(VALUE, off, (off+=34), (byte)'I');
+    Arrays.fill(VALUE, off, (off+=55), (byte)'J');
+    Arrays.fill(VALUE, off, (off+=89), (byte)'K');
+    Arrays.fill(VALUE, off, (off+=144), (byte)'L');
+    Arrays.fill(VALUE, off, (off+=233), (byte)'M');
+    Arrays.fill(VALUE, off, (off+=377), (byte)'N');
+    Arrays.fill(VALUE, off, (off+=610), (byte)'O');
+    Arrays.fill(VALUE, off, (off+=987), (byte)'P');
+    Arrays.fill(VALUE, off, (off+=1597), (byte)'Q');
   }
 
-  private void doTest() throws Exception {
-    TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
+  public void doTest(TableName tableName) throws Exception {
     NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
     scopes.put(tableName.getName(), 0);
     RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
     final int total = 1000;
     final byte[] row = Bytes.toBytes("row");
     final byte[] family = Bytes.toBytes("family");
-    final byte[] value = Bytes.toBytes("Test value");
-    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
+    final byte[] value = VALUE;
     final WALFactory wals =
       new WALFactory(TEST_UTIL.getConfiguration(), tableName.getNameAsString());
 
@@ -131,7 +83,7 @@ public class TestCompressedWAL {
       WALEdit kvs = new WALEdit();
       kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
       wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
-        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
+        System.currentTimeMillis(), mvcc, scopes), kvs);
     }
     wal.sync();
     final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
@@ -157,4 +109,5 @@ public class TestCompressedWAL {
     assertEquals("Should have read back as many KVs as written", total, count);
     reader.close();
   }
+
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java
index 8e660b6..d59a860 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java
@@ -17,34 +17,14 @@
  */
 package org.apache.hadoop.hbase.wal;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 import java.util.Arrays;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
-import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
@@ -57,14 +37,12 @@ import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
 @Category({ RegionServerTests.class, MediumTests.class })
-public class TestCompressedWAL {
+public class TestCompressedWAL extends CompressedWALTestBase {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestCompressedWAL.class);
 
-  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
   @Rule
   public TestName name = new TestName();
 
@@ -76,85 +54,23 @@ public class TestCompressedWAL {
     return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" });
   }
 
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    Configuration conf = TEST_UTIL.getConfiguration();
-    CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS());
-    TEST_UTIL.startMiniDFSCluster(3);
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
   @Before
-  public void setUp() {
+  public void setUp() throws Exception {
     TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider);
     TEST_UTIL.getConfiguration()
       .setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+    TEST_UTIL.startMiniDFSCluster(3);
   }
 
-  @Test
-  public void testCompressedWAL() throws Exception {
-    TEST_UTIL.getConfiguration()
-      .setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
-    doTest();
+  @After
+  public void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
   }
 
   @Test
-  public void testCompressedWALWithValueCompression() throws Exception {
-    TEST_UTIL.getConfiguration()
-      .setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
-    doTest();
-  }
-
-  private void doTest() throws Exception {
+  public void test() throws Exception {
     TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
-    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    scopes.put(tableName.getName(), 0);
-    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
-    final int total = 1000;
-    final byte[] row = Bytes.toBytes("row");
-    final byte[] family = Bytes.toBytes("family");
-    final byte[] value = Bytes.toBytes("Test value");
-    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
-    final WALFactory wals =
-      new WALFactory(TEST_UTIL.getConfiguration(), tableName.getNameAsString());
-
-    // Write the WAL
-    final WAL wal = wals.getWAL(regionInfo);
-
-    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
-
-    for (int i = 0; i < total; i++) {
-      WALEdit kvs = new WALEdit();
-      kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
-      wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
-        EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
-    }
-    wal.sync();
-    final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
-    wals.shutdown();
-
-    // Confirm the WAL can be read back
-    WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
-    int count = 0;
-    WAL.Entry entry = new WAL.Entry();
-    while (reader.next(entry) != null) {
-      count++;
-      List<Cell> cells = entry.getEdit().getCells();
-      assertTrue("Should be one KV per WALEdit", cells.size() == 1);
-      for (Cell cell: cells) {
-        assertTrue("Incorrect row", Bytes.equals(cell.getRowArray(), cell.getRowOffset(),
-          cell.getRowLength(), row, 0, row.length));
-        assertTrue("Incorrect family", Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
-          cell.getFamilyLength(), family, 0, family.length));
-        assertTrue("Incorrect value", Bytes.equals(cell.getValueArray(), cell.getValueOffset(),
-          cell.getValueLength(), value, 0, value.length));
-      }
-    }
-    assertEquals("Should have read back as many KVs as written", total, count);
-    reader.close();
+    doTest(tableName);
   }
+
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWALValueCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWALValueCompression.java
new file mode 100644
index 0000000..364f548
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWALValueCompression.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestCompressedWALValueCompression extends CompressedWALTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestCompressedWALValueCompression.class);
+
+  @Parameters
+  public static List<Object[]> params() {
+    return HBaseTestingUtility.COMPRESSION_ALGORITHMS_PARAMETERIZED;
+  }
+
+  @Rule
+  public TestName name = new TestName();
+
+  private final Compression.Algorithm compression;
+
+  public TestCompressedWALValueCompression(Compression.Algorithm algo) {
+    this.compression = algo;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    TEST_UTIL.getConfiguration()
+      .setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+    TEST_UTIL.getConfiguration()
+      .setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
+    TEST_UTIL.getConfiguration()
+      .set(CompressionContext.WAL_VALUE_COMPRESSION_TYPE, compression.getName());
+    TEST_UTIL.startMiniDFSCluster(3);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
+    doTest(tableName);
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index f53c36e..2b92558 100755
--- a/pom.xml
+++ b/pom.xml
@@ -92,6 +92,7 @@
     <module>hbase-hbtop</module>
     <module>hbase-asyncfs</module>
     <module>hbase-logging</module>
+    <module>hbase-compression</module>
   </modules>
   <scm>
     <connection>scm:git:git://gitbox.apache.org/repos/asf/hbase.git</connection>
@@ -1531,6 +1532,12 @@
     <surefire.version>3.0.0-M4</surefire.version>
     <wagon.ssh.version>2.12</wagon.ssh.version>
     <xml.maven.version>1.0.1</xml.maven.version>
+    <!-- compression -->
+    <aircompressor.version>0.21</aircompressor.version>
+    <lz4.version>1.8.0</lz4.version>
+    <snappy.version>1.1.8.4</snappy.version>
+    <xz.version>1.9</xz.version>
+    <zstd-jni.version>1.5.0-4</zstd-jni.version>
     <hbase-thirdparty.version>3.5.1</hbase-thirdparty.version>
     <!-- Intraproject jar naming properties -->
     <!-- TODO this is pretty ugly, but works for the moment.
@@ -1894,6 +1901,31 @@
         <type>test-jar</type>
         <scope>test</scope>
       </dependency>
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-compression-aircompressor</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-compression-lz4</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-compression-snappy</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-compression-xz</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-compression-zstd</artifactId>
+        <version>${project.version}</version>
+      </dependency>
       <!-- General dependencies -->
       <dependency>
         <groupId>com.github.stephenc.findbugs</groupId>
@@ -2281,6 +2313,33 @@
         <artifactId>audience-annotations</artifactId>
         <version>${audience-annotations.version}</version>
       </dependency>
+      <!-- compression -->
+      <dependency>
+        <groupId>io.airlift</groupId>
+        <artifactId>aircompressor</artifactId>
+        <version>${aircompressor.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.lz4</groupId>
+        <artifactId>lz4-java</artifactId>
+        <version>${lz4.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.tukaani</groupId>
+        <artifactId>xz</artifactId>
+        <version>${xz.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.xerial.snappy</groupId>
+        <artifactId>snappy-java</artifactId>
+        <version>${snappy.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.github.luben</groupId>
+        <artifactId>zstd-jni</artifactId>
+        <version>${zstd-jni.version}</version>
+      </dependency>
+      <!-- shaded thirdparty -->
       <dependency>
         <groupId>org.apache.hbase.thirdparty</groupId>
         <artifactId>hbase-shaded-gson</artifactId>