You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ha...@apache.org on 2023/03/06 04:53:55 UTC

[hbase] branch branch-2.5 updated: HBASE-27672 Read RPC threads may BLOCKED at the Configuration.get when using java compression (#5075) (#5084)

This is an automated email from the ASF dual-hosted git repository.

haxiaolin pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 6d463f7aab0 HBASE-27672 Read RPC threads may BLOCKED at the Configuration.get when using java compression (#5075) (#5084)
6d463f7aab0 is described below

commit 6d463f7aab01ec8e82ca8d8cc1986e611aaac7bc
Author: Xiaolin Ha <ha...@apache.org>
AuthorDate: Mon Mar 6 12:53:47 2023 +0800

    HBASE-27672 Read RPC threads may BLOCKED at the Configuration.get when using java compression (#5075) (#5084)
    
    Signed-off-by: Bryan Beaudreault <bb...@apache.org>
---
 .../hbase/io/compress/aircompressor/Lz4Codec.java       |  6 ++++--
 .../hbase/io/compress/aircompressor/LzoCodec.java       |  6 ++++--
 .../hbase/io/compress/aircompressor/SnappyCodec.java    |  6 ++++--
 .../hbase/io/compress/aircompressor/ZstdCodec.java      |  6 ++++--
 .../hadoop/hbase/io/compress/brotli/BrotliCodec.java    | 16 ++++++++++++----
 .../apache/hadoop/hbase/io/compress/lz4/Lz4Codec.java   | 10 ++++++----
 .../hadoop/hbase/io/compress/xerial/SnappyCodec.java    | 10 ++++++----
 .../apache/hadoop/hbase/io/compress/xz/LzmaCodec.java   | 13 +++++++++----
 .../apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java | 17 +++++++++++++----
 9 files changed, 62 insertions(+), 28 deletions(-)

diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/Lz4Codec.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/Lz4Codec.java
index 70ea7943e8d..03c73d7e465 100644
--- a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/Lz4Codec.java
+++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/Lz4Codec.java
@@ -46,9 +46,11 @@ public class Lz4Codec implements Configurable, CompressionCodec {
   public static final String LZ4_BUFFER_SIZE_KEY = "hbase.io.compress.lz4.buffersize";
 
   private Configuration conf;
+  private int bufferSize;
 
   public Lz4Codec() {
     conf = new Configuration();
+    bufferSize = getBufferSize(conf);
   }
 
   @Override
@@ -59,6 +61,7 @@ public class Lz4Codec implements Configurable, CompressionCodec {
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
+    this.bufferSize = getBufferSize(conf);
   }
 
   @Override
@@ -79,7 +82,7 @@ public class Lz4Codec implements Configurable, CompressionCodec {
   @Override
   public CompressionInputStream createInputStream(InputStream in, Decompressor d)
     throws IOException {
-    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+    return new BlockDecompressorStream(in, d, bufferSize);
   }
 
   @Override
@@ -90,7 +93,6 @@ public class Lz4Codec implements Configurable, CompressionCodec {
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
     throws IOException {
-    int bufferSize = getBufferSize(conf);
     return new BlockCompressorStream(out, c, bufferSize,
       CompressionUtil.compressionOverhead(bufferSize));
   }
diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/LzoCodec.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/LzoCodec.java
index 5395dba5d0d..0a9a5cc85e3 100644
--- a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/LzoCodec.java
+++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/LzoCodec.java
@@ -46,9 +46,11 @@ public class LzoCodec implements Configurable, CompressionCodec {
   public static final String LZO_BUFFER_SIZE_KEY = "hbase.io.compress.lzo.buffersize";
 
   private Configuration conf;
+  private int bufferSize;
 
   public LzoCodec() {
     conf = new Configuration();
+    bufferSize = getBufferSize(conf);
   }
 
   @Override
@@ -59,6 +61,7 @@ public class LzoCodec implements Configurable, CompressionCodec {
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
+    this.bufferSize = getBufferSize(conf);
   }
 
   @Override
@@ -79,7 +82,7 @@ public class LzoCodec implements Configurable, CompressionCodec {
   @Override
   public CompressionInputStream createInputStream(InputStream in, Decompressor d)
     throws IOException {
-    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+    return new BlockDecompressorStream(in, d, bufferSize);
   }
 
   @Override
@@ -90,7 +93,6 @@ public class LzoCodec implements Configurable, CompressionCodec {
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
     throws IOException {
-    int bufferSize = getBufferSize(conf);
     return new BlockCompressorStream(out, c, bufferSize,
       CompressionUtil.compressionOverhead(bufferSize));
   }
diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/SnappyCodec.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/SnappyCodec.java
index 2448404191f..7fb0e7b5381 100644
--- a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/SnappyCodec.java
+++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/SnappyCodec.java
@@ -46,9 +46,11 @@ public class SnappyCodec implements Configurable, CompressionCodec {
   public static final String SNAPPY_BUFFER_SIZE_KEY = "hbase.io.compress.snappy.buffersize";
 
   private Configuration conf;
+  private int bufferSize;
 
   public SnappyCodec() {
     conf = new Configuration();
+    bufferSize = getBufferSize(conf);
   }
 
   @Override
@@ -59,6 +61,7 @@ public class SnappyCodec implements Configurable, CompressionCodec {
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
+    this.bufferSize = getBufferSize(conf);
   }
 
   @Override
@@ -79,7 +82,7 @@ public class SnappyCodec implements Configurable, CompressionCodec {
   @Override
   public CompressionInputStream createInputStream(InputStream in, Decompressor d)
     throws IOException {
-    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+    return new BlockDecompressorStream(in, d, bufferSize);
   }
 
   @Override
@@ -90,7 +93,6 @@ public class SnappyCodec implements Configurable, CompressionCodec {
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
     throws IOException {
-    int bufferSize = getBufferSize(conf);
     return new BlockCompressorStream(out, c, bufferSize,
       CompressionUtil.compressionOverhead(bufferSize));
   }
diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/ZstdCodec.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/ZstdCodec.java
index 3e8d345c660..ba7119d8336 100644
--- a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/ZstdCodec.java
+++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/ZstdCodec.java
@@ -54,14 +54,17 @@ public class ZstdCodec implements Configurable, CompressionCodec {
   public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;
 
   private Configuration conf;
+  private int bufferSize;
 
   public ZstdCodec() {
     conf = new Configuration();
+    bufferSize = getBufferSize(conf);
   }
 
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
+    this.bufferSize = getBufferSize(conf);
   }
 
   @Override
@@ -87,7 +90,7 @@ public class ZstdCodec implements Configurable, CompressionCodec {
   @Override
   public CompressionInputStream createInputStream(InputStream in, Decompressor d)
     throws IOException {
-    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+    return new BlockDecompressorStream(in, d, bufferSize);
   }
 
   @Override
@@ -98,7 +101,6 @@ public class ZstdCodec implements Configurable, CompressionCodec {
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
     throws IOException {
-    int bufferSize = getBufferSize(conf);
     return new BlockCompressorStream(out, c, bufferSize,
       CompressionUtil.compressionOverhead(bufferSize));
   }
diff --git a/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCodec.java b/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCodec.java
index 16aa764ba3b..d77ea35ccc5 100644
--- a/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCodec.java
+++ b/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCodec.java
@@ -47,9 +47,15 @@ public class BrotliCodec implements Configurable, CompressionCodec {
   public static final int BROTLI_BUFFERSIZE_DEFAULT = 256 * 1024;
 
   private Configuration conf;
+  private int bufferSize;
+  private int level;
+  private int window;
 
   public BrotliCodec() {
     conf = new Configuration();
+    bufferSize = getBufferSize(conf);
+    level = getLevel(conf);
+    window = getWindow(conf);
   }
 
   @Override
@@ -60,16 +66,19 @@ public class BrotliCodec implements Configurable, CompressionCodec {
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
+    this.bufferSize = getBufferSize(conf);
+    this.level = getLevel(conf);
+    this.window = getWindow(conf);
   }
 
   @Override
   public Compressor createCompressor() {
-    return new BrotliCompressor(getLevel(conf), getWindow(conf), getBufferSize(conf));
+    return new BrotliCompressor(level, window, bufferSize);
   }
 
   @Override
   public Decompressor createDecompressor() {
-    return new BrotliDecompressor(getBufferSize(conf));
+    return new BrotliDecompressor(bufferSize);
   }
 
   @Override
@@ -80,7 +89,7 @@ public class BrotliCodec implements Configurable, CompressionCodec {
   @Override
   public CompressionInputStream createInputStream(InputStream in, Decompressor d)
     throws IOException {
-    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+    return new BlockDecompressorStream(in, d, bufferSize);
   }
 
   @Override
@@ -91,7 +100,6 @@ public class BrotliCodec implements Configurable, CompressionCodec {
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
     throws IOException {
-    int bufferSize = getBufferSize(conf);
     return new BlockCompressorStream(out, c, bufferSize,
       CompressionUtil.compressionOverhead(bufferSize));
   }
diff --git a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Codec.java b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Codec.java
index 8f0f5dee672..72c9ef5e6a2 100644
--- a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Codec.java
+++ b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Codec.java
@@ -44,9 +44,11 @@ public class Lz4Codec implements Configurable, CompressionCodec {
   public static final String LZ4_BUFFER_SIZE_KEY = "hbase.io.compress.lz4.buffersize";
 
   private Configuration conf;
+  private int bufferSize;
 
   public Lz4Codec() {
     conf = new Configuration();
+    this.bufferSize = getBufferSize(conf);
   }
 
   @Override
@@ -57,16 +59,17 @@ public class Lz4Codec implements Configurable, CompressionCodec {
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
+    this.bufferSize = getBufferSize(conf);
   }
 
   @Override
   public Compressor createCompressor() {
-    return new Lz4Compressor(getBufferSize(conf));
+    return new Lz4Compressor(bufferSize);
   }
 
   @Override
   public Decompressor createDecompressor() {
-    return new Lz4Decompressor(getBufferSize(conf));
+    return new Lz4Decompressor(bufferSize);
   }
 
   @Override
@@ -77,7 +80,7 @@ public class Lz4Codec implements Configurable, CompressionCodec {
   @Override
   public CompressionInputStream createInputStream(InputStream in, Decompressor d)
     throws IOException {
-    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+    return new BlockDecompressorStream(in, d, bufferSize);
   }
 
   @Override
@@ -88,7 +91,6 @@ public class Lz4Codec implements Configurable, CompressionCodec {
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
     throws IOException {
-    int bufferSize = getBufferSize(conf);
     return new BlockCompressorStream(out, c, bufferSize,
       CompressionUtil.compressionOverhead(bufferSize));
   }
diff --git a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCodec.java b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCodec.java
index b8048ac0406..b6806dcbeef 100644
--- a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCodec.java
+++ b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCodec.java
@@ -44,9 +44,11 @@ public class SnappyCodec implements Configurable, CompressionCodec {
   public static final String SNAPPY_BUFFER_SIZE_KEY = "hbase.io.compress.snappy.buffersize";
 
   private Configuration conf;
+  private int bufferSize;
 
   public SnappyCodec() {
     conf = new Configuration();
+    bufferSize = getBufferSize(conf);
   }
 
   @Override
@@ -57,16 +59,17 @@ public class SnappyCodec implements Configurable, CompressionCodec {
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
+    this.bufferSize = getBufferSize(conf);
   }
 
   @Override
   public Compressor createCompressor() {
-    return new SnappyCompressor(getBufferSize(conf));
+    return new SnappyCompressor(bufferSize);
   }
 
   @Override
   public Decompressor createDecompressor() {
-    return new SnappyDecompressor(getBufferSize(conf));
+    return new SnappyDecompressor(bufferSize);
   }
 
   @Override
@@ -77,7 +80,7 @@ public class SnappyCodec implements Configurable, CompressionCodec {
   @Override
   public CompressionInputStream createInputStream(InputStream in, Decompressor d)
     throws IOException {
-    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+    return new BlockDecompressorStream(in, d, bufferSize);
   }
 
   @Override
@@ -88,7 +91,6 @@ public class SnappyCodec implements Configurable, CompressionCodec {
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
     throws IOException {
-    int bufferSize = getBufferSize(conf);
     return new BlockCompressorStream(out, c, bufferSize,
       Snappy.maxCompressedLength(bufferSize) - bufferSize); // overhead only
   }
diff --git a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCodec.java b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCodec.java
index d4b8ce01148..a5d583d770c 100644
--- a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCodec.java
+++ b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCodec.java
@@ -44,9 +44,13 @@ public class LzmaCodec implements Configurable, CompressionCodec {
   public static final int LZMA_BUFFERSIZE_DEFAULT = 256 * 1024;
 
   private Configuration conf;
+  private int bufferSize;
+  private int level;
 
   public LzmaCodec() {
     conf = new Configuration();
+    bufferSize = getBufferSize(conf);
+    level = getLevel(conf);
   }
 
   @Override
@@ -57,16 +61,18 @@ public class LzmaCodec implements Configurable, CompressionCodec {
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
+    this.bufferSize = getBufferSize(conf);
+    this.level = getLevel(conf);
   }
 
   @Override
   public Compressor createCompressor() {
-    return new LzmaCompressor(getLevel(conf), getBufferSize(conf));
+    return new LzmaCompressor(level, bufferSize);
   }
 
   @Override
   public Decompressor createDecompressor() {
-    return new LzmaDecompressor(getBufferSize(conf));
+    return new LzmaDecompressor(bufferSize);
   }
 
   @Override
@@ -77,7 +83,7 @@ public class LzmaCodec implements Configurable, CompressionCodec {
   @Override
   public CompressionInputStream createInputStream(InputStream in, Decompressor d)
     throws IOException {
-    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+    return new BlockDecompressorStream(in, d, bufferSize);
   }
 
   @Override
@@ -88,7 +94,6 @@ public class LzmaCodec implements Configurable, CompressionCodec {
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
     throws IOException {
-    int bufferSize = getBufferSize(conf);
     return new BlockCompressorStream(out, c, bufferSize,
       CompressionUtil.compressionOverhead(bufferSize));
   }
diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
index 6848f0dfc48..1c851a61c20 100644
--- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
+++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
@@ -50,9 +50,13 @@ public class ZstdCodec implements Configurable, CompressionCodec {
   public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
 
   private Configuration conf;
+  private int bufferSize;
+  private int level;
+  private byte[] dictionary;
 
   public ZstdCodec() {
     conf = new Configuration();
+    init();
   }
 
   @Override
@@ -63,16 +67,17 @@ public class ZstdCodec implements Configurable, CompressionCodec {
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
+    init();
   }
 
   @Override
   public Compressor createCompressor() {
-    return new ZstdCompressor(getLevel(conf), getBufferSize(conf), getDictionary(conf));
+    return new ZstdCompressor(level, bufferSize, dictionary);
   }
 
   @Override
   public Decompressor createDecompressor() {
-    return new ZstdDecompressor(getBufferSize(conf), getDictionary(conf));
+    return new ZstdDecompressor(bufferSize, dictionary);
   }
 
   @Override
@@ -83,7 +88,7 @@ public class ZstdCodec implements Configurable, CompressionCodec {
   @Override
   public CompressionInputStream createInputStream(InputStream in, Decompressor d)
     throws IOException {
-    return new BlockDecompressorStream(in, d, getBufferSize(conf));
+    return new BlockDecompressorStream(in, d, bufferSize);
   }
 
   @Override
@@ -94,7 +99,6 @@ public class ZstdCodec implements Configurable, CompressionCodec {
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
     throws IOException {
-    int bufferSize = getBufferSize(conf);
     return new BlockCompressorStream(out, c, bufferSize,
       (int) Zstd.compressBound(bufferSize) - bufferSize); // overhead only
   }
@@ -154,4 +158,9 @@ public class ZstdCodec implements Configurable, CompressionCodec {
     return ByteBuffer.wrap(dictionary, 4, 4).order(ByteOrder.LITTLE_ENDIAN).getInt();
   }
 
+  private void init() {
+    this.bufferSize = getBufferSize(conf);
+    this.level = getLevel(conf);
+    this.dictionary = getDictionary(conf);
+  }
 }