You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/10/27 15:17:18 UTC

[hbase] branch branch-2 updated: HBASE-26353 Support loadable dictionaries in hbase-compression-zstd (#3787)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new f2c58fc  HBASE-26353 Support loadable dictionaries in hbase-compression-zstd (#3787)
f2c58fc is described below

commit f2c58fcf686b9529dd9928f151ac31807322d4d8
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Wed Oct 27 07:39:55 2021 -0700

    HBASE-26353 Support loadable dictionaries in hbase-compression-zstd (#3787)
    
    ZStandard supports initialization of compressors and decompressors with a
    precomputed dictionary, which can dramatically improve and speed up compression
    of tables with small values. For more details, please see
    
      The Case For Small Data Compression
      https://github.com/facebook/zstd#the-case-for-small-data-compression
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    
    Conflicts:
    	hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
---
 .../hadoop/hbase/io/compress/CompressionUtil.java  |   4 +-
 .../hadoop/hbase/io/compress/DictionaryCache.java  | 164 +++++++++++++++++++++
 .../hbase/io/compress/CompressionTestBase.java     |  65 +++++---
 .../hadoop/hbase/io/compress/zstd/ZstdCodec.java   |  36 ++++-
 .../hbase/io/compress/zstd/ZstdCompressor.java     |  44 +++++-
 .../hbase/io/compress/zstd/ZstdDecompressor.java   |  36 ++++-
 .../hbase/io/compress/zstd/TestZstdCodec.java      |   9 +-
 .../hbase/io/compress/zstd/TestZstdDictionary.java |  98 ++++++++++++
 .../zstd/TestZstdDictionarySplitMerge.java         | 148 +++++++++++++++++++
 .../src/test/resources/zstd.test.data              | Bin 0 -> 1024000 bytes
 .../src/test/resources/zstd.test.dict              | Bin 0 -> 112640 bytes
 .../assignment/MergeTableRegionsProcedure.java     |   9 ++
 .../assignment/SplitTableRegionProcedure.java      |  12 +-
 .../apache/hadoop/hbase/regionserver/HStore.java   |   3 +
 .../hadoop/hbase/regionserver/StoreFileInfo.java   |  20 ++-
 15 files changed, 600 insertions(+), 48 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java
index becff76..70b959a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.io.compress;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
-public class CompressionUtil {
+public final class CompressionUtil {
+
+  private CompressionUtil() { }
 
   /**
    * Round up to the next power of two, unless the value would become negative (ints
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/DictionaryCache.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/DictionaryCache.java
new file mode 100644
index 0000000..3d3fb2a
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/DictionaryCache.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.io.compress;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
+
+/**
+ * A utility class for managing compressor/decompressor dictionary loading and caching of load
+ * results. Useful for any codec that can support changing dictionaries at runtime,
+ * such as ZStandard.
+ */
+@InterfaceAudience.Private
+public final class DictionaryCache {
+
+  public static final String DICTIONARY_MAX_SIZE_KEY = "hbase.io.compress.dictionary.max.size";
+  public static final int DEFAULT_DICTIONARY_MAX_SIZE = 10 * 1024 * 1024;
+  public static final String RESOURCE_SCHEME = "resource://";
+
+  private static final Logger LOG = LoggerFactory.getLogger(DictionaryCache.class);
+  private static LoadingCache<String, byte[]> CACHE;
+
+  private DictionaryCache() { }
+
+  /**
+   * Load a dictionary or return a previously cached load.
+   * @param conf configuration
+   * @param path the hadoop Path where the dictionary is located, as a String
+   * @return the dictionary bytes if successful, null otherwise
+   */
+  public static byte[] getDictionary(final Configuration conf, final String path)
+      throws IOException {
+    if (path == null || path.isEmpty()) {
+      return null;
+    }
+    // Create the dictionary loading cache if we haven't already
+    if (CACHE == null) {
+      synchronized (DictionaryCache.class) {
+        if (CACHE == null) {
+          final int maxSize = conf.getInt(DICTIONARY_MAX_SIZE_KEY, DEFAULT_DICTIONARY_MAX_SIZE);
+          CACHE = CacheBuilder.newBuilder()
+            .maximumSize(100)
+            .expireAfterAccess(10, TimeUnit.MINUTES)
+            .build(
+              new CacheLoader<String, byte[]>() {
+                @Override
+                public byte[] load(String s) throws Exception {
+                  byte[] bytes;
+                  if (path.startsWith(RESOURCE_SCHEME)) {
+                    bytes = loadFromResource(conf, path, maxSize);
+                  } else {
+                    bytes = loadFromHadoopFs(conf, path, maxSize);
+                  }
+                  LOG.info("Loaded dictionary from {} (size {})", s, bytes.length);
+                  return bytes;
+                }
+              });
+        }
+      }
+    }
+
+    // Get or load the dictionary for the given path
+    try {
+      return CACHE.get(path);
+    } catch (ExecutionException e) {
+      throw new IOException(e);
+    }
+  }
+
+  // Visible for testing
+  public static byte[] loadFromResource(final Configuration conf, final String s,
+      final int maxSize) throws IOException {
+    if (!s.startsWith(RESOURCE_SCHEME)) {
+      throw new IOException("Path does not start with " + RESOURCE_SCHEME);
+    }
+    final String path = s.substring(RESOURCE_SCHEME.length(), s.length());
+    LOG.info("Loading resource {}", path);
+    final InputStream in = DictionaryCache.class.getClassLoader().getResourceAsStream(path);
+    if (in == null) {
+      throw new FileNotFoundException("Resource " + path + " not found");
+    }
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try {
+      final byte[] buffer = new byte[8192];
+      int n, len = 0;
+      do {
+        n = in.read(buffer);
+        if (n > 0) {
+          len += n;
+          if (len > maxSize) {
+            throw new IOException("Dictionary " + s + " is too large, limit=" + maxSize);
+          }
+          baos.write(buffer, 0, n);
+        }
+      } while (n > 0);
+    } finally {
+      in.close();
+    }
+    return baos.toByteArray();
+  }
+
+  private static byte[] loadFromHadoopFs(final Configuration conf, final String s,
+      final int maxSize) throws IOException {
+    final Path path = new Path(s);
+    final FileSystem fs = FileSystem.get(path.toUri(), conf);
+    LOG.info("Loading file {}", path);
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    final FSDataInputStream in = fs.open(path);
+    try {
+      final byte[] buffer = new byte[8192];
+      int n, len = 0;
+      do {
+        n = in.read(buffer);
+        if (n > 0) {
+          len += n;
+          if (len > maxSize) {
+            throw new IOException("Dictionary " + s + " is too large, limit=" + maxSize);
+          }
+          baos.write(buffer, 0, n);
+        }
+      } while (n > 0);
+    } finally {
+      in.close();
+    }
+    return baos.toByteArray();
+  }
+
+  // Visible for testing
+  public static boolean contains(String dictionaryPath) {
+    if (CACHE != null) {
+      return CACHE.asMap().containsKey(dictionaryPath);
+    }
+    return false;
+  }
+
+}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/compress/CompressionTestBase.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/compress/CompressionTestBase.java
index 616bf0b..fddff46 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/compress/CompressionTestBase.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/compress/CompressionTestBase.java
@@ -17,12 +17,10 @@
 package org.apache.hadoop.hbase.io.compress;
 
 import static org.junit.Assert.assertTrue;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.util.Arrays;
 import java.util.Random;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -31,6 +29,8 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionInputStream;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,11 +39,11 @@ public class CompressionTestBase {
 
   protected static final Logger LOG = LoggerFactory.getLogger(CompressionTestBase.class);
 
-  static final int LARGE_SIZE = 10 * 1024 * 1024;
-  static final int VERY_LARGE_SIZE = 100 * 1024 * 1024;
-  static final int BLOCK_SIZE = 4096;
+  protected static final int LARGE_SIZE = 10 * 1024 * 1024;
+  protected static final int VERY_LARGE_SIZE = 100 * 1024 * 1024;
+  protected static final int BLOCK_SIZE = 4096;
 
-  static final byte[] SMALL_INPUT;
+  protected static final byte[] SMALL_INPUT;
   static {
     // 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597
     SMALL_INPUT = new byte[1+1+2+3+5+8+13+21+34+55+89+144+233+377+610+987+1597];
@@ -67,15 +67,20 @@ public class CompressionTestBase {
     Arrays.fill(SMALL_INPUT, off, (off+=1597), (byte)'Q');
   }
 
-  protected void codecTest(final CompressionCodec codec, final byte[][] input)
-      throws Exception {
+  protected void codecTest(final CompressionCodec codec, final byte[][] input) throws Exception {
+    codecTest(codec, input, null);
+  }
+
+  protected void codecTest(final CompressionCodec codec, final byte[][] input,
+      final Integer expectedCompressedSize) throws Exception {
     // We do this in Compression.java
     ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
     // Compress
+    long start = EnvironmentEdgeManager.currentTime();
+    Compressor compressor = codec.createCompressor();
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    CompressionOutputStream out = codec.createOutputStream(baos);
+    CompressionOutputStream out = codec.createOutputStream(baos, compressor);
     int inLen = 0;
-    long start = EnvironmentEdgeManager.currentTime();
     for (int i = 0; i < input.length; i++) {
       out.write(input[i]);
       inLen += input[i].length;
@@ -85,9 +90,15 @@ public class CompressionTestBase {
     final byte[] compressed = baos.toByteArray();
     LOG.info("{} compressed {} bytes to {} bytes in {} ms", codec.getClass().getSimpleName(),
       inLen, compressed.length, end - start);
+    if (expectedCompressedSize != null) {
+      assertTrue("Expected compressed size does not match: (expected=" + expectedCompressedSize +
+        ", actual=" + compressed.length + ")", expectedCompressedSize == compressed.length);
+    }
     // Decompress
     final byte[] plain = new byte[inLen];
-    CompressionInputStream in = codec.createInputStream(new ByteArrayInputStream(compressed));
+    Decompressor decompressor = codec.createDecompressor();
+    CompressionInputStream in = codec.createInputStream(new ByteArrayInputStream(compressed),
+      decompressor);
     start = EnvironmentEdgeManager.currentTime();
     IOUtils.readFully(in, plain, 0, plain.length);
     in.close();
@@ -113,29 +124,37 @@ public class CompressionTestBase {
   /**
    * Test with a large input (1MB) divided into blocks of 4KB.
    */
-  protected void codecLargeTest(final CompressionCodec codec, final double sigma) throws Exception {
-    RandomDistribution.DiscreteRNG zipf =
+  protected void codecLargeTest(final CompressionCodec codec, final double sigma)
+      throws Exception {
+    RandomDistribution.DiscreteRNG rng =
       new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
     final byte[][] input = new byte[LARGE_SIZE/BLOCK_SIZE][BLOCK_SIZE];
-    for (int i = 0; i < input.length; i++) {
-      for (int j = 0; j < input[i].length; j++) {
-        input[i][j] = (byte)zipf.nextInt();
-      }
-    }
+    fill(rng, input);
     codecTest(codec, input);
   }
 
   /**
    * Test with a very large input (100MB) as a single input buffer.
    */
-  protected void codecVeryLargeTest(final CompressionCodec codec, final double sigma) throws Exception {
-    RandomDistribution.DiscreteRNG zipf =
+  protected void codecVeryLargeTest(final CompressionCodec codec, final double sigma)
+      throws Exception {
+    RandomDistribution.DiscreteRNG rng =
         new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, sigma);
     final byte[][] input = new byte[1][VERY_LARGE_SIZE];
-    for (int i = 0; i < VERY_LARGE_SIZE; i++) {
-      input[0][i] = (byte)zipf.nextInt();
-    }
+    fill(rng, input);
     codecTest(codec, input);
   }
 
+  protected static void fill(RandomDistribution.DiscreteRNG rng, byte[][] input) {
+    for (int i = 0; i < input.length; i++) {
+      fill(rng, input[i]);
+    }
+  }
+
+  protected static void fill(RandomDistribution.DiscreteRNG rng, byte[] input) {
+    for (int i = 0; i < input.length; i++) {
+      input[i] = (byte) rng.nextInt();
+    }
+  }
+
 }
diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
index f933896..07b26d0 100644
--- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
+++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
@@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.io.compress.zstd;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.io.compress.DictionaryCache;
 import org.apache.hadoop.io.compress.BlockCompressorStream;
 import org.apache.hadoop.io.compress.BlockDecompressorStream;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -42,6 +44,7 @@ public class ZstdCodec implements Configurable, CompressionCodec {
 
   public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
   public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
+  public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
 
   private Configuration conf;
 
@@ -61,12 +64,12 @@ public class ZstdCodec implements Configurable, CompressionCodec {
 
   @Override
   public Compressor createCompressor() {
-    return new ZstdCompressor(getLevel(conf), getBufferSize(conf));
+    return new ZstdCompressor(getLevel(conf), getBufferSize(conf), getDictionary(conf));
   }
 
   @Override
   public Decompressor createDecompressor() {
-    return new ZstdDecompressor(getBufferSize(conf));
+    return new ZstdDecompressor(getBufferSize(conf), getDictionary(conf));
   }
 
   @Override
@@ -124,4 +127,31 @@ public class ZstdCodec implements Configurable, CompressionCodec {
     return size > 0 ? size : 256 * 1024; // Don't change this default
   }
 
+  static byte[] getDictionary(final Configuration conf) {
+    String path = conf.get(ZSTD_DICTIONARY_KEY);
+    try {
+      return DictionaryCache.getDictionary(conf, path);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to load dictionary at " + path, e);
+    }
+  }
+
+  // Zstandard dictionaries begin with a 32-bit magic number, 0xEC30A437 in little-endian
+  // format, followed by a 32-bit identifier also in little-endian format.
+  // Reference: https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md
+
+  static boolean isDictionary(byte[] dictionary) {
+    return (dictionary[0] == (byte)0x37 &&
+            dictionary[1] == (byte)0xA4 &&
+            dictionary[2] == (byte)0x30 &&
+            dictionary[3] == (byte)0xEC);
+  }
+
+  static int getDictionaryId(byte[] dictionary) {
+    if (!isDictionary(dictionary)) {
+      throw new IllegalArgumentException("Not a ZStandard dictionary");
+    }
+    return ByteBuffer.wrap(dictionary, 4, 4).order(ByteOrder.LITTLE_ENDIAN).getInt();
+  }
+
 }
diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java
index 16ec438..deaf7e1 100644
--- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java
+++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.github.luben.zstd.Zstd;
+import com.github.luben.zstd.ZstdDictCompress;
 
 /**
  * Hadoop compressor glue for zstd-jni.
@@ -40,13 +41,23 @@ public class ZstdCompressor implements CanReinit, Compressor {
   protected ByteBuffer inBuf, outBuf;
   protected boolean finish, finished;
   protected long bytesRead, bytesWritten;
+  protected int dictId;
+  protected ZstdDictCompress dict;
 
-  ZstdCompressor(final int level, final int bufferSize) {
+  ZstdCompressor(final int level, final int bufferSize, final byte[] dictionary) {
     this.level = level;
     this.bufferSize = bufferSize;
     this.inBuf = ByteBuffer.allocateDirect(bufferSize);
     this.outBuf = ByteBuffer.allocateDirect(bufferSize);
     this.outBuf.position(bufferSize);
+    if (dictionary != null) {
+      this.dictId = ZstdCodec.getDictionaryId(dictionary);
+      this.dict = new ZstdDictCompress(dictionary, level);
+    }
+  }
+
+  ZstdCompressor(final int level, final int bufferSize) {
+    this(level, bufferSize, null);
   }
 
   @Override
@@ -74,7 +85,12 @@ public class ZstdCompressor implements CanReinit, Compressor {
         } else {
           outBuf.clear();
         }
-        int written = Zstd.compress(outBuf, inBuf, level);
+        int written;
+        if (dict != null) {
+          written = Zstd.compress(outBuf, inBuf, dict);
+        } else {
+          written = Zstd.compress(outBuf, inBuf, level);
+        }
         bytesWritten += written;
         inBuf.clear();
         LOG.trace("compress: compressed {} -> {} (level {})", uncompressed, written, level);
@@ -132,13 +148,33 @@ public class ZstdCompressor implements CanReinit, Compressor {
     LOG.trace("reinit");
     if (conf != null) {
       // Level might have changed
-      level = ZstdCodec.getLevel(conf);
+      boolean levelChanged = false;
+      int newLevel = ZstdCodec.getLevel(conf);
+      if (level != newLevel) {
+        LOG.trace("Level changed, was {} now {}", level, newLevel);
+        level = newLevel;
+        levelChanged = true;
+      }
+      // Dictionary may have changed
+      byte[] b = ZstdCodec.getDictionary(conf);
+      if (b != null) {
+        // Don't casually create dictionary objects; they consume native memory
+        int thisDictId = ZstdCodec.getDictionaryId(b);
+        if (dict == null || dictId != thisDictId || levelChanged) {
+          dictId = thisDictId;
+          dict = new ZstdDictCompress(b, level);
+          LOG.trace("Reloaded dictionary, new id is {}", dictId);
+        }
+      } else {
+        dict = null;
+      }
       // Buffer size might have changed
       int newBufferSize = ZstdCodec.getBufferSize(conf);
       if (bufferSize != newBufferSize) {
         bufferSize = newBufferSize;
         this.inBuf = ByteBuffer.allocateDirect(bufferSize);
         this.outBuf = ByteBuffer.allocateDirect(bufferSize);
+        LOG.trace("Resized buffers, new size is {}", bufferSize);
       }
     }
     reset();
@@ -182,7 +218,7 @@ public class ZstdCompressor implements CanReinit, Compressor {
 
   // Package private
 
-  int maxCompressedLength(final int len) {
+  static int maxCompressedLength(final int len) {
     return (int) Zstd.compressBound(len);
   }
 
diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java
index a3d77f5..dfa37db 100644
--- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java
+++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java
@@ -26,6 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import com.github.luben.zstd.Zstd;
+import com.github.luben.zstd.ZstdDictDecompress;
 
 /**
  * Hadoop decompressor glue for zstd-java.
@@ -38,12 +39,22 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
   protected int bufferSize;
   protected int inLen;
   protected boolean finished;
+  protected int dictId;
+  protected ZstdDictDecompress dict;
 
-  ZstdDecompressor(final int bufferSize) {
+  ZstdDecompressor(final int bufferSize, final byte[] dictionary) {
     this.bufferSize = bufferSize;
     this.inBuf = ByteBuffer.allocateDirect(bufferSize);
     this.outBuf = ByteBuffer.allocateDirect(bufferSize);
     this.outBuf.position(bufferSize);
+    if (dictionary != null) {
+      this.dictId = ZstdCodec.getDictionaryId(dictionary);
+      this.dict = new ZstdDictDecompress(dictionary);
+    }
+  }
+
+  ZstdDecompressor(final int bufferSize) {
+    this(bufferSize, null);
   }
 
   @Override
@@ -60,7 +71,11 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
       inLen -= remaining;
       outBuf.clear();
       int written;
-      written = Zstd.decompress(outBuf, inBuf);
+      if (dict != null) {
+        written = Zstd.decompress(outBuf, inBuf, dict);
+      } else {
+        written = Zstd.decompress(outBuf, inBuf);
+      }
       inBuf.clear();
       LOG.trace("decompress: decompressed {} -> {}", remaining, written);
       outBuf.flip();
@@ -116,8 +131,7 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
 
   @Override
   public void setDictionary(final byte[] b, final int off, final int len) {
-    LOG.trace("setDictionary: off={} len={}", off, len);
-    throw new UnsupportedOperationException("setDictionary not supported");
+    throw new UnsupportedOperationException("setDictionary is not supported");
   }
 
   @Override
@@ -143,12 +157,26 @@ public class ZstdDecompressor implements CanReinit, Decompressor {
   public void reinit(final Configuration conf) {
     LOG.trace("reinit");
     if (conf != null) {
+      // Dictionary may have changed
+      byte[] b = ZstdCodec.getDictionary(conf);
+      if (b != null) {
+        // Don't casually create dictionary objects; they consume native memory
+        int thisDictId = ZstdCodec.getDictionaryId(b);
+        if (dict == null || dictId != thisDictId) {
+          dictId = thisDictId;
+          dict = new ZstdDictDecompress(b);
+          LOG.trace("Reloaded dictionary, new id is {}", dictId);
+        }
+      } else {
+        dict = null;
+      }
       // Buffer size might have changed
       int newBufferSize = ZstdCodec.getBufferSize(conf);
       if (bufferSize != newBufferSize) {
         bufferSize = newBufferSize;
         this.inBuf = ByteBuffer.allocateDirect(bufferSize);
         this.outBuf = ByteBuffer.allocateDirect(bufferSize);
+        LOG.trace("Resized buffers, new size is {}", bufferSize);
       }
     }
     reset();
diff --git a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdCodec.java b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdCodec.java
index 6bcb2aa..bf1c78c 100644
--- a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdCodec.java
+++ b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdCodec.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io.compress.zstd;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.ClassRule;
@@ -33,20 +34,20 @@ public class TestZstdCodec extends CompressionTestBase {
       HBaseClassTestRule.forClass(TestZstdCodec.class);
 
   @Test
-  public void testzstdCodecSmall() throws Exception {
+  public void testZstdCodecSmall() throws Exception {
     codecSmallTest(new ZstdCodec());
   }
 
   @Test
-  public void testzstdCodecLarge() throws Exception {
+  public void testZstdCodecLarge() throws Exception {
     codecLargeTest(new ZstdCodec(), 1.1); // poor compressability
     codecLargeTest(new ZstdCodec(),   2);
     codecLargeTest(new ZstdCodec(),  10); // very high compressability
   }
 
   @Test
-  public void testzstdCodecVeryLarge() throws Exception {
-    Configuration conf = new Configuration();
+  public void testZstdCodecVeryLarge() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
     // ZStandard levels range from 1 to 22.
     // Level 22 might take up to a minute to complete. 3 is the Hadoop default, and will be fast.
     conf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 3);
diff --git a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdDictionary.java b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdDictionary.java
new file mode 100644
index 0000000..0a17ef9
--- /dev/null
+++ b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdDictionary.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.zstd;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
+import org.apache.hadoop.hbase.io.compress.DictionaryCache;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.RandomDistribution;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestZstdDictionary extends CompressionTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestZstdDictionary.class);
+
+  private static final String DICTIONARY_PATH = DictionaryCache.RESOURCE_SCHEME + "zstd.test.dict";
+  // zstd.test.data compressed with zstd.test.dict at level 3 will produce a result of
+  // 358555 bytes
+  private static final int EXPECTED_COMPRESSED_SIZE = 358555;
+
+  private static byte[] TEST_DATA;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    TEST_DATA = DictionaryCache.loadFromResource(conf,
+      DictionaryCache.RESOURCE_SCHEME + "zstd.test.data", /* maxSize */ 1024*1024);
+    assertNotNull("Failed to load test data", TEST_DATA);
+  }
+
+  @Test
+  public void test() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, 3);
+    conf.set(ZstdCodec.ZSTD_DICTIONARY_KEY, DICTIONARY_PATH);
+    ZstdCodec codec = new ZstdCodec();
+    codec.setConf(conf);
+    codecTest(codec, new byte[][] { TEST_DATA }, EXPECTED_COMPRESSED_SIZE);
+    // Assert that the dictionary was actually loaded
+    assertTrue("Dictionary was not loaded by codec", DictionaryCache.contains(DICTIONARY_PATH));
+  }
+
+  //
+  // For generating the test data in src/test/resources/
+  //
+
+  public static void main(String[] args) throws IOException {
+    // Write 1000 1k blocks for training to the specified file
+    // Train with:
+    //   zstd --train -B1024 -o <dictionary_file> <input_file>
+    if (args.length < 1) {
+      System.err.println("Usage: TestZstdCodec <outFile>");
+      System.exit(-1);
+    }
+    final RandomDistribution.DiscreteRNG rng =
+      new RandomDistribution.Zipf(new Random(), 0, Byte.MAX_VALUE, 2);
+    final File outFile = new File(args[0]);
+    final byte[] buffer = new byte[1024];
+    System.out.println("Generating " + outFile);
+    try (FileOutputStream os = new FileOutputStream(outFile)) {
+      for (int i = 0; i < 1000; i++) {
+        fill(rng, buffer);
+        os.write(buffer);
+      }
+    }
+    System.out.println("Done");
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdDictionarySplitMerge.java b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdDictionarySplitMerge.java
new file mode 100644
index 0000000..dff3848
--- /dev/null
+++ b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdDictionarySplitMerge.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.zstd;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.compress.DictionaryCache;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, LargeTests.class })
+public class TestZstdDictionarySplitMerge {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestZstdDictionarySplitMerge.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static Configuration conf;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    // NOTE: Don't put configuration settings in global site schema. We are testing if per
+    // CF or per table schema settings are applied correctly.
+    conf = TEST_UTIL.getConfiguration();
+    conf.set(Compression.ZSTD_CODEC_CLASS_KEY, ZstdCodec.class.getCanonicalName());
+    Compression.Algorithm.ZSTD.reload(conf);
+    conf.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 1000);
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    // Create the table
+
+    final TableName tableName = TableName.valueOf("TestZstdDictionarySplitMerge");
+    final byte[] cfName = Bytes.toBytes("info");
+    final String dictionaryPath = DictionaryCache.RESOURCE_SCHEME + "zstd.test.dict";
+    final TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cfName)
+        .setCompressionType(Compression.Algorithm.ZSTD)
+        .setConfiguration(ZstdCodec.ZSTD_DICTIONARY_KEY, dictionaryPath)
+        .build())
+      .build();
+    final Admin admin = TEST_UTIL.getAdmin();
+    admin.createTable(td, new byte[][] { Bytes.toBytes(1) });
+    TEST_UTIL.waitTableAvailable(tableName);
+
+    // Load some data
+
+    Table t = ConnectionFactory.createConnection(conf).getTable(tableName);
+    TEST_UTIL.loadNumericRows(t, cfName, 0, 100_000);
+    admin.flush(tableName);
+    assertTrue("Dictionary was not loaded", DictionaryCache.contains(dictionaryPath));
+    TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0);
+
+    // Test split procedure
+
+    admin.split(tableName, Bytes.toBytes(50_000));
+    TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return TEST_UTIL.getMiniHBaseCluster().getRegions(tableName).size() == 3;
+      }
+      @Override
+      public String explainFailure() throws Exception {
+        return "Split has not finished yet";
+      }
+    });
+    TEST_UTIL.waitUntilNoRegionsInTransition();
+    TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0);
+
+    // Test merge procedure
+
+    RegionInfo regionA = null;
+    RegionInfo regionB = null;
+    for (RegionInfo region: admin.getRegions(tableName)) {
+      if (region.getStartKey().length == 0) {
+        regionA = region;
+      } else if (Bytes.equals(region.getStartKey(), Bytes.toBytes(1))) {
+        regionB = region;
+      }
+    }
+    assertNotNull(regionA);
+    assertNotNull(regionB);
+    admin.mergeRegionsAsync(new byte[][] {
+        regionA.getRegionName(),
+        regionB.getRegionName()
+      }, false).get(30, TimeUnit.SECONDS);
+    assertEquals(2, admin.getRegions(tableName).size());
+    ServerName expected = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName();
+    assertEquals(expected, TEST_UTIL.getConnection().getRegionLocator(tableName)
+      .getRegionLocation(Bytes.toBytes(1), true).getServerName());
+    try (AsyncConnection asyncConn =
+      ConnectionFactory.createAsyncConnection(conf).get()) {
+      assertEquals(expected, asyncConn.getRegionLocator(tableName)
+        .getRegionLocation(Bytes.toBytes(1), true).get().getServerName());
+    }
+    TEST_UTIL.verifyNumericRows(t, cfName, 0, 100_000, 0);
+  }
+
+}
diff --git a/hbase-compression/hbase-compression-zstd/src/test/resources/zstd.test.data b/hbase-compression/hbase-compression-zstd/src/test/resources/zstd.test.data
new file mode 100644
index 0000000..a497af5
Binary files /dev/null and b/hbase-compression/hbase-compression-zstd/src/test/resources/zstd.test.data differ
diff --git a/hbase-compression/hbase-compression-zstd/src/test/resources/zstd.test.dict b/hbase-compression/hbase-compression-zstd/src/test/resources/zstd.test.dict
new file mode 100644
index 0000000..8d9ec65
Binary files /dev/null and b/hbase-compression/hbase-compression-zstd/src/test/resources/zstd.test.dict differ
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
index 6658e5c..347c7d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
@@ -24,6 +24,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Stream;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.MetaMutationAnnotation;
@@ -54,6 +56,7 @@ import org.apache.hadoop.hbase.quotas.QuotaExceededException;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.wal.WALSplitUtil;
@@ -610,10 +613,16 @@ public class MergeTableRegionsProcedure
       String family = hcd.getNameAsString();
       final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
       if (storeFiles != null && storeFiles.size() > 0) {
+        final Configuration storeConfiguration =
+          StoreUtils.createStoreConfiguration(env.getMasterConfiguration(), htd, hcd);
         for (StoreFileInfo storeFileInfo : storeFiles) {
           // Create reference file(s) to parent region file here in mergedDir.
           // As this procedure is running on master, use CacheConfig.DISABLED means
           // don't cache any block.
+          // We also need to pass through a suitable CompoundConfiguration as if this
+          // is running in a regionserver's Store context, or we might not be able
+          // to read the hfiles.
+          storeFileInfo.setConf(storeConfiguration);
           mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
             new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
         }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
index 0a15e36..26d0a4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
 import org.apache.hadoop.hbase.regionserver.RegionSplitRestriction;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -711,12 +712,17 @@ public class SplitTableRegionProcedure
       final ColumnFamilyDescriptor hcd = htd.getColumnFamily(familyName);
       final Collection<StoreFileInfo> storeFiles = e.getValue();
       if (storeFiles != null && storeFiles.size() > 0) {
+        final Configuration storeConfiguration =
+          StoreUtils.createStoreConfiguration(env.getMasterConfiguration(), htd, hcd);
         for (StoreFileInfo storeFileInfo : storeFiles) {
           // As this procedure is running on master, use CacheConfig.DISABLED means
           // don't cache any block.
-          StoreFileSplitter sfs =
-              new StoreFileSplitter(regionFs, familyName, new HStoreFile(
-                  storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
+          // We also need to pass through a suitable CompoundConfiguration as if this
+          // is running in a regionserver's Store context, or we might not be able
+          // to read the hfiles.
+          storeFileInfo.setConf(storeConfiguration);
+          StoreFileSplitter sfs = new StoreFileSplitter(regionFs, familyName,
+            new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED));
           futures.add(threadPool.submit(sfs));
         }
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 2d53276..bd06300 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -531,6 +531,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
 
     int totalValidStoreFile = 0;
     for (StoreFileInfo storeFileInfo : files) {
+      // The StoreFileInfo will carry store configuration down to HFile, we need to set it to
+      // our store's CompoundConfiguration here.
+      storeFileInfo.setConf(conf);
       // open each store file in parallel
       completionService.submit(() -> this.createStoreFileAndReader(storeFileInfo));
       totalValidStoreFile++;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index abfb44f..608a18b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -48,7 +49,7 @@ import org.slf4j.LoggerFactory;
  * Describe a StoreFile (hfile, reference, link)
  */
 @InterfaceAudience.Private
-public class StoreFileInfo {
+public class StoreFileInfo implements Configurable {
   private static final Logger LOG = LoggerFactory.getLogger(StoreFileInfo.class);
 
   /**
@@ -87,7 +88,7 @@ public class StoreFileInfo {
   public static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false;
 
   // Configuration
-  private final Configuration conf;
+  private Configuration conf;
 
   // FileSystem handle
   private final FileSystem fs;
@@ -234,6 +235,16 @@ public class StoreFileInfo {
         DEFAULT_STORE_FILE_READER_NO_READAHEAD);
   }
 
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
   /**
    * Size of the Hfile
    * @return size
@@ -632,10 +643,6 @@ public class StoreFileInfo {
     return this.fs;
   }
 
-  Configuration getConf() {
-    return this.conf;
-  }
-
   boolean isNoReadahead() {
     return this.noReadahead;
   }
@@ -673,4 +680,5 @@ public class StoreFileInfo {
   public void initHFileInfo(ReaderContext context) throws IOException {
     this.hfileInfo = new HFileInfo(context, conf);
   }
+
 }