You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2019/07/16 19:01:12 UTC
[accumulo] branch master updated: Fix #483 Refactor
Compression.java removing duplicate code (#1271)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 2610d55 Fix #483 Refactor Compression.java removing duplicate code (#1271)
2610d55 is described below
commit 2610d553bd1402a2e2414fb9f8c0482958425dd1
Author: Laura Schanno <lb...@gmail.com>
AuthorDate: Tue Jul 16 15:01:07 2019 -0400
Fix #483 Refactor Compression.java removing duplicate code (#1271)
Refactor Compression.java in order to remove duplicate code and
restructure the class to make it easier to read overall.
---
.../core/file/rfile/bcfile/Compression.java | 616 ++++++++++-----------
1 file changed, 294 insertions(+), 322 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
index 6c13240..9b0b580 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
@@ -48,17 +48,19 @@ import com.google.common.collect.Maps;
* Compression related stuff.
*/
public final class Compression {
- static final Logger log = LoggerFactory.getLogger(Compression.class);
+
+ private static final Logger log = LoggerFactory.getLogger(Compression.class);
/**
- * Prevent the instantiation of class.
+ * Prevent the instantiation of this class.
*/
private Compression() {
- // nothing
+ throw new UnsupportedOperationException();
}
static class FinishOnFlushCompressionStream extends FilterOutputStream {
- public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
+
+ FinishOnFlushCompressionStream(CompressionOutputStream cout) {
super(cout);
}
@@ -76,73 +78,97 @@ public final class Compression {
}
}
- /** compression: zStandard */
+ /**
+ * Compression: zStandard
+ */
public static final String COMPRESSION_ZSTD = "zstd";
- /** snappy codec **/
+
+ /**
+ * Compression: snappy
+ **/
public static final String COMPRESSION_SNAPPY = "snappy";
- /** compression: gzip */
+
+ /**
+ * Compression: gzip
+ */
public static final String COMPRESSION_GZ = "gz";
- /** compression: lzo */
+
+ /**
+ * Compression: lzo
+ */
public static final String COMPRESSION_LZO = "lzo";
- /** compression: none */
+
+ /**
+ * compression: none
+ */
public static final String COMPRESSION_NONE = "none";
/**
* Compression algorithms. There is a static initializer, below the values defined in the
- * enumeration, that calls the initializer of all defined codecs within the Algorithm enum. This
+ * enumeration, that calls the initializer of all defined codecs within {@link Algorithm}. This
* promotes a model of the following call graph of initialization by the static initializer,
- * followed by calls to getCodec() and createCompressionStream/DecompressionStream. In some cases,
- * the compression and decompression call methods will include a different buffer size for the
- * stream. Note that if the compressed buffer size requested in these calls is zero, we will not
- * set the buffer size for that algorithm. Instead, we will use the default within the codec.
- *
- * The buffer size is configured in the Codec by way of a Hadoop Configuration reference. One
- * approach may be to use the same Configuration object, but when calls are made to
- * createCompressionStream and DecompressionStream, with non default buffer sizes, the
- * configuration object must be changed. In this case, concurrent calls to createCompressionStream
- * and DecompressionStream would mutate the configuration object beneath each other, requiring
- * synchronization to avoid undesirable activity via co-modification. To avoid synchronization
- * entirely, we will create Codecs with their own Configuration object and cache them for re-use.
- * A default codec will be statically created, as mentioned above to ensure we always have a codec
- * available at loader initialization.
- *
+ * followed by calls to {@link #getCodec()},
+ * {@link #createCompressionStream(OutputStream, Compressor, int)}, and
+ * {@link #createDecompressionStream(InputStream, Decompressor, int)}. In some cases, the
+ * compression and decompression call methods will include a different buffer size for the stream.
+ * Note that if the compressed buffer size requested in these calls is zero, we will not set the
+ * buffer size for that algorithm. Instead, we will use the default within the codec.
+ * <p>
+ * The buffer size is configured in the Codec by way of a Hadoop {@link Configuration} reference.
+ * One approach may be to use the same Configuration object, but when calls are made to
+ * {@code createCompressionStream} and {@code createDecompressionStream} with non default buffer
+ * sizes, the configuration object must be changed. In this case, concurrent calls to
+ * {@code createCompressionStream} and {@code createDecompressionStream} would mutate the
+ * configuration object beneath each other, requiring synchronization to avoid undesirable
+ * activity via co-modification. To avoid synchronization entirely, we will create Codecs with
+ * their own Configuration object and cache them for re-use. A default codec will be statically
+ * created, as mentioned above to ensure we always have a codec available at loader
+ * initialization.
+ * <p>
* There is a Guava cache defined within Algorithm that allows us to cache Codecs for re-use.
* Since they will have their own configuration object and thus do not need to be mutable, there
* is no concern for using them concurrently; however, the Guava cache exists to ensure a maximal
* size of the cache and efficient and concurrent read/write access to the cache itself.
- *
+ * <p>
* To provide Algorithm specific details and to describe what is in code:
- *
+ * <p>
* LZO will always have the default LZO codec because the buffer size is never overridden within
* it.
- *
+ * <p>
* GZ will use the default GZ codec for the compression stream, but can potentially use a
* different codec instance for the decompression stream if the requested buffer size does not
* match the default GZ buffer size of 32k.
- *
+ * <p>
* Snappy will use the default Snappy codec with the default buffer size of 64k for the
* compression stream, but will use a cached codec if the buffer size differs from the default.
*/
- public static enum Algorithm {
+ public enum Algorithm {
LZO(COMPRESSION_LZO) {
+
/**
- * determines if we've checked the codec status. ensures we don't recreate the default codec
+ * The default codec class.
*/
- private final AtomicBoolean checked = new AtomicBoolean(false);
- private static final String defaultClazz = "org.apache.hadoop.io.compress.LzoCodec";
- private transient CompressionCodec codec = null;
+ private static final String DEFAULT_CLAZZ = "org.apache.hadoop.io.compress.LzoCodec";
/**
- * Configuration option for LZO buffer size
+ * Configuration option for LZO buffer size.
*/
private static final String BUFFER_SIZE_OPT = "io.compression.codec.lzo.buffersize";
/**
- * Default buffer size
+ * Default buffer size.
*/
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ /**
+ * Whether or not the codec status has been checked. Ensures the default codec is not
+ * recreated.
+ */
+ private final AtomicBoolean checked = new AtomicBoolean(false);
+
+ private transient CompressionCodec codec = null;
+
@Override
public boolean isSupported() {
return codec != null;
@@ -150,29 +176,12 @@ public final class Compression {
@Override
public void initializeDefaultCodec() {
- if (!checked.get()) {
- checked.set(true);
- codec = createNewCodec(DEFAULT_BUFFER_SIZE);
- }
+ codec = initCodec(checked, DEFAULT_BUFFER_SIZE, codec);
}
@Override
CompressionCodec createNewCodec(int bufferSize) {
- String extClazz =
- (conf.get(CONF_LZO_CLASS) == null ? System.getProperty(CONF_LZO_CLASS) : null);
- String clazz = (extClazz != null) ? extClazz : defaultClazz;
- try {
- log.info("Trying to load Lzo codec class: {}", clazz);
- Configuration myConf = new Configuration(conf);
- // only use the buffersize if > 0, otherwise we'll use
- // the default defined within the codec
- if (bufferSize > 0)
- myConf.setInt(BUFFER_SIZE_OPT, bufferSize);
- return (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
- } catch (ClassNotFoundException e) {
- // that is okay
- }
- return null;
+ return createNewCodec(CONF_LZO_CLASS, DEFAULT_CLAZZ, bufferSize, BUFFER_SIZE_OPT);
}
@Override
@@ -187,13 +196,8 @@ public final class Compression {
throw new IOException("LZO codec class not specified. Did you forget to set property "
+ CONF_LZO_CLASS + "?");
}
- InputStream bis1 = null;
- if (downStreamBufferSize > 0) {
- bis1 = new BufferedInputStream(downStream, downStreamBufferSize);
- } else {
- bis1 = downStream;
- }
- CompressionInputStream cis = codec.createInputStream(bis1, decompressor);
+ InputStream bis = bufferStream(downStream, downStreamBufferSize);
+ CompressionInputStream cis = codec.createInputStream(bis, decompressor);
return new BufferedInputStream(cis, DATA_IBUF_SIZE);
}
@@ -204,14 +208,7 @@ public final class Compression {
throw new IOException("LZO codec class not specified. Did you forget to set property "
+ CONF_LZO_CLASS + "?");
}
- OutputStream bos1 = null;
- if (downStreamBufferSize > 0) {
- bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
- } else {
- bos1 = downStream;
- }
- CompressionOutputStream cos = codec.createOutputStream(bos1, compressor);
- return new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
+ return createFinishedOnFlushCompressionStream(downStream, compressor, downStreamBufferSize);
}
},
@@ -241,54 +238,28 @@ public final class Compression {
}
/**
- * Create a new GZ codec
- *
- * @param bufferSize
- * buffer size to for GZ
- * @return created codec
+ * Creates a new GZ codec
*/
@Override
protected CompressionCodec createNewCodec(final int bufferSize) {
- DefaultCodec myCodec = new DefaultCodec();
- Configuration myConf = new Configuration(conf);
- // only use the buffersize if > 0, otherwise we'll use
- // the default defined within the codec
- if (bufferSize > 0)
- myConf.setInt(BUFFER_SIZE_OPT, bufferSize);
- myCodec.setConf(myConf);
- return myCodec;
+ Configuration newConfig = new Configuration(conf);
+ updateBuffer(conf, BUFFER_SIZE_OPT, bufferSize);
+ DefaultCodec newCodec = new DefaultCodec();
+ newCodec.setConf(newConfig);
+ return newCodec;
}
@Override
public InputStream createDecompressionStream(InputStream downStream,
Decompressor decompressor, int downStreamBufferSize) throws IOException {
- // Set the internal buffer size to read from down stream.
- CompressionCodec decomCodec = codec;
- // if we're not using the default, let's pull from the loading cache
- if (downStreamBufferSize != DEFAULT_BUFFER_SIZE) {
- Entry<Algorithm,Integer> sizeOpt = Maps.immutableEntry(GZ, downStreamBufferSize);
- try {
- decomCodec = codecCache.get(sizeOpt);
- } catch (ExecutionException e) {
- throw new IOException(e);
- }
- }
- CompressionInputStream cis = decomCodec.createInputStream(downStream, decompressor);
- return new BufferedInputStream(cis, DATA_IBUF_SIZE);
+ return createDecompressionStream(downStream, decompressor, downStreamBufferSize,
+ DEFAULT_BUFFER_SIZE, GZ, codec);
}
@Override
public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
int downStreamBufferSize) throws IOException {
- OutputStream bos1 = null;
- if (downStreamBufferSize > 0) {
- bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
- } else {
- bos1 = downStream;
- }
- // always uses the default buffer size
- CompressionOutputStream cos = codec.createOutputStream(bos1, compressor);
- return new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
+ return createFinishedOnFlushCompressionStream(downStream, compressor, downStreamBufferSize);
}
@Override
@@ -306,16 +277,11 @@ public final class Compression {
@Override
public InputStream createDecompressionStream(InputStream downStream,
Decompressor decompressor, int downStreamBufferSize) {
- if (downStreamBufferSize > 0) {
- return new BufferedInputStream(downStream, downStreamBufferSize);
- }
- return downStream;
+ return bufferStream(downStream, downStreamBufferSize);
}
@Override
- public void initializeDefaultCodec() {
-
- }
+ public void initializeDefaultCodec() {}
@Override
protected CompressionCodec createNewCodec(final int bufferSize) {
@@ -325,11 +291,7 @@ public final class Compression {
@Override
public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
int downStreamBufferSize) {
- if (downStreamBufferSize > 0) {
- return new BufferedOutputStream(downStream, downStreamBufferSize);
- }
-
- return downStream;
+ return bufferStream(downStream, downStreamBufferSize);
}
@Override
@@ -339,85 +301,56 @@ public final class Compression {
},
SNAPPY(COMPRESSION_SNAPPY) {
- // Use base type to avoid compile-time dependencies.
- private transient CompressionCodec snappyCodec = null;
+
/**
- * determines if we've checked the codec status. ensures we don't recreate the default codec
+ * The default codec class.
*/
- private final AtomicBoolean checked = new AtomicBoolean(false);
- private static final String defaultClazz = "org.apache.hadoop.io.compress.SnappyCodec";
+ private static final String DEFAULT_CLAZZ = "org.apache.hadoop.io.compress.SnappyCodec";
/**
- * Buffer size option
+ * Configuration option for LZO buffer size.
*/
private static final String BUFFER_SIZE_OPT = "io.compression.codec.snappy.buffersize";
/**
- * Default buffer size value
+ * Default buffer size.
*/
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ /**
+ * Whether or not the codec status has been checked. Ensures the default codec is not
+ * recreated.
+ */
+ private final AtomicBoolean checked = new AtomicBoolean(false);
+
+ private transient CompressionCodec codec = null;
+
@Override
public CompressionCodec getCodec() {
- return snappyCodec;
+ return codec;
}
@Override
public void initializeDefaultCodec() {
- if (!checked.get()) {
- checked.set(true);
- snappyCodec = createNewCodec(DEFAULT_BUFFER_SIZE);
- }
+ codec = initCodec(checked, DEFAULT_BUFFER_SIZE, codec);
}
/**
* Creates a new snappy codec.
- *
- * @param bufferSize
- * incoming buffer size
- * @return new codec or null, depending on if installed
*/
@Override
protected CompressionCodec createNewCodec(final int bufferSize) {
-
- String extClazz =
- (conf.get(CONF_SNAPPY_CLASS) == null ? System.getProperty(CONF_SNAPPY_CLASS) : null);
- String clazz = (extClazz != null) ? extClazz : defaultClazz;
- try {
- log.info("Trying to load snappy codec class: {}", clazz);
-
- Configuration myConf = new Configuration(conf);
- // only use the buffersize if > 0, otherwise we'll use
- // the default defined within the codec
- if (bufferSize > 0)
- myConf.setInt(BUFFER_SIZE_OPT, bufferSize);
-
- return (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
-
- } catch (ClassNotFoundException e) {
- // that is okay
- }
-
- return null;
+ return createNewCodec(CONF_SNAPPY_CLASS, DEFAULT_CLAZZ, bufferSize, BUFFER_SIZE_OPT);
}
@Override
public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
int downStreamBufferSize) throws IOException {
-
if (!isSupported()) {
throw new IOException("SNAPPY codec class not specified. Did you forget to set property "
+ CONF_SNAPPY_CLASS + "?");
}
- OutputStream bos1 = null;
- if (downStreamBufferSize > 0) {
- bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
- } else {
- bos1 = downStream;
- }
- // use the default codec
- CompressionOutputStream cos = snappyCodec.createOutputStream(bos1, compressor);
- return new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
+ return createFinishedOnFlushCompressionStream(downStream, compressor, downStreamBufferSize);
}
@Override
@@ -427,112 +360,68 @@ public final class Compression {
throw new IOException("SNAPPY codec class not specified. Did you forget to set property "
+ CONF_SNAPPY_CLASS + "?");
}
-
- CompressionCodec decomCodec = snappyCodec;
- // if we're not using the same buffer size, we'll pull the codec from the loading cache
- if (downStreamBufferSize != DEFAULT_BUFFER_SIZE) {
- Entry<Algorithm,Integer> sizeOpt = Maps.immutableEntry(SNAPPY, downStreamBufferSize);
- try {
- decomCodec = codecCache.get(sizeOpt);
- } catch (ExecutionException e) {
- throw new IOException(e);
- }
- }
-
- CompressionInputStream cis = decomCodec.createInputStream(downStream, decompressor);
- return new BufferedInputStream(cis, DATA_IBUF_SIZE);
+ return createDecompressionStream(downStream, decompressor, downStreamBufferSize,
+ DEFAULT_BUFFER_SIZE, SNAPPY, codec);
}
@Override
public boolean isSupported() {
-
- return snappyCodec != null;
+ return codec != null;
}
},
ZSTANDARD(COMPRESSION_ZSTD) {
- // Use base type to avoid compile-time dependencies.
- private transient CompressionCodec zstdCodec = null;
+
/**
- * determines if we've checked the codec status. ensures we don't recreate the default codec
+ * The default codec class.
*/
- private final AtomicBoolean checked = new AtomicBoolean(false);
- private static final String defaultClazz = "org.apache.hadoop.io.compress.ZStandardCodec";
+ private static final String DEFAULT_CLAZZ = "org.apache.hadoop.io.compress.ZStandardCodec";
/**
- * Buffer size option
+ * Configuration option for LZO buffer size.
*/
private static final String BUFFER_SIZE_OPT = "io.compression.codec.zstd.buffersize";
/**
- * Default buffer size value
+ * Default buffer size.
*/
private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ /**
+ * Whether or not the codec status has been checked. Ensures the default codec is not
+ * recreated.
+ */
+ private final AtomicBoolean checked = new AtomicBoolean(false);
+
+ private transient CompressionCodec codec = null;
+
@Override
public CompressionCodec getCodec() {
- return zstdCodec;
+ return codec;
}
@Override
public void initializeDefaultCodec() {
- if (!checked.get()) {
- checked.set(true);
- zstdCodec = createNewCodec(DEFAULT_BUFFER_SIZE);
- }
+ codec = initCodec(checked, DEFAULT_BUFFER_SIZE, codec);
}
/**
* Creates a new ZStandard codec.
- *
- * @param bufferSize
- * incoming buffer size
- * @return new codec or null, depending on if installed
*/
@Override
protected CompressionCodec createNewCodec(final int bufferSize) {
-
- String extClazz =
- (conf.get(CONF_ZSTD_CLASS) == null ? System.getProperty(CONF_ZSTD_CLASS) : null);
- String clazz = (extClazz != null) ? extClazz : defaultClazz;
- try {
- log.info("Trying to load ZStandard codec class: {}", clazz);
-
- Configuration myConf = new Configuration(conf);
- // only use the buffersize if > 0, otherwise we'll use
- // the default defined within the codec
- if (bufferSize > 0)
- myConf.setInt(BUFFER_SIZE_OPT, bufferSize);
-
- return (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), myConf);
-
- } catch (ClassNotFoundException e) {
- // that is okay
- }
-
- return null;
+ return createNewCodec(CONF_ZSTD_CLASS, DEFAULT_CLAZZ, bufferSize, BUFFER_SIZE_OPT);
}
@Override
public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor,
int downStreamBufferSize) throws IOException {
-
if (!isSupported()) {
throw new IOException(
"ZStandard codec class not specified. Did you forget to set property "
+ CONF_ZSTD_CLASS + "?");
}
- OutputStream bos1;
- if (downStreamBufferSize > 0) {
- bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
- } else {
- bos1 = downStream;
- }
- // use the default codec
- CompressionOutputStream cos = zstdCodec.createOutputStream(bos1, compressor);
- BufferedOutputStream bos2 =
- new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
- return bos2;
+ return createFinishedOnFlushCompressionStream(downStream, compressor, downStreamBufferSize);
}
@Override
@@ -543,44 +432,48 @@ public final class Compression {
"ZStandard codec class not specified. Did you forget to set property "
+ CONF_ZSTD_CLASS + "?");
}
-
- CompressionCodec decomCodec = zstdCodec;
- // if we're not using the same buffer size, we'll pull the codec from the loading cache
- if (downStreamBufferSize != DEFAULT_BUFFER_SIZE) {
- Entry<Algorithm,Integer> sizeOpt = Maps.immutableEntry(ZSTANDARD, downStreamBufferSize);
- try {
- decomCodec = codecCache.get(sizeOpt);
- } catch (ExecutionException e) {
- throw new IOException(e);
- }
- }
-
- CompressionInputStream cis = decomCodec.createInputStream(downStream, decompressor);
- BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
- return bis2;
+ return createDecompressionStream(downStream, decompressor, downStreamBufferSize,
+ DEFAULT_BUFFER_SIZE, ZSTANDARD, codec);
}
@Override
public boolean isSupported() {
- return zstdCodec != null;
+ return codec != null;
}
};
/**
- * The model defined by the static block, below, creates a singleton for each defined codec in
- * the Algorithm enumeration. By creating the codecs, each call to isSupported shall return
- * true/false depending on if the codec singleton is defined. The static initializer, below,
- * will ensure this occurs when the Enumeration is loaded. Furthermore, calls to getCodec will
- * return the singleton, whether it is null or not.
- *
- * Calls to createCompressionStream and createDecompressionStream may return a different codec
- * than getCodec, if the incoming downStreamBufferSize is different than the default. In such a
- * case, we will place the resulting codec into the codecCache, defined below, to ensure we have
- * cache codecs.
- *
- * Since codecs are immutable, there is no concern about concurrent access to the
- * CompressionCodec objects within the guava cache.
+ * Guava cache to have a limited factory pattern defined in the Algorithm enum.
*/
+ private static LoadingCache<Entry<Algorithm,Integer>,CompressionCodec> codecCache =
+ CacheBuilder.newBuilder().maximumSize(25).build(new CacheLoader<>() {
+ @Override
+ public CompressionCodec load(Entry<Algorithm,Integer> key) {
+ return key.getKey().createNewCodec(key.getValue());
+ }
+ });
+
+ public static final String CONF_LZO_CLASS = "io.compression.codec.lzo.class";
+ public static final String CONF_SNAPPY_CLASS = "io.compression.codec.snappy.class";
+ public static final String CONF_ZSTD_CLASS = "io.compression.codec.zstd.class";
+
+ // All compression-related settings are required to be configured statically in the
+ // Configuration object.
+ protected static final Configuration conf;
+
+ // The model defined by the static block below creates a singleton for each defined codec in the
+ // Algorithm enumeration. By creating the codecs, each call to isSupported shall return
+ // true/false depending on if the codec singleton is defined. The static initializer, below,
+ // will ensure this occurs when the Enumeration is loaded. Furthermore, calls to getCodec will
+ // return the singleton, whether it is null or not.
+ //
+ // Calls to createCompressionStream and createDecompressionStream may return a different codec
+ // than getCodec, if the incoming downStreamBufferSize is different than the default. In such a
+ // case, we will place the resulting codec into the codecCache, defined below, to ensure we have
+ // cache codecs.
+ //
+ // Since codecs are immutable, there is no concern about concurrent access to the
+ // CompressionCodec objects within the guava cache.
static {
conf = new Configuration();
for (final Algorithm al : Algorithm.values()) {
@@ -588,75 +481,54 @@ public final class Compression {
}
}
- /**
- * Guava cache to have a limited factory pattern defined in the Algorithm enum.
- */
- private static LoadingCache<Entry<Algorithm,Integer>,CompressionCodec> codecCache =
- CacheBuilder.newBuilder().maximumSize(25)
- .build(new CacheLoader<Entry<Algorithm,Integer>,CompressionCodec>() {
- @Override
- public CompressionCodec load(Entry<Algorithm,Integer> key) {
- return key.getKey().createNewCodec(key.getValue());
- }
- });
-
- // We require that all compression related settings are configured
- // statically in the Configuration object.
- protected static final Configuration conf;
- private final String compressName;
- // data input buffer size to absorb small reads from application.
- private static final int DATA_IBUF_SIZE = 1 * 1024;
- // data output buffer size to absorb small writes from application.
+ // Data input buffer size to absorb small reads from application.
+ private static final int DATA_IBUF_SIZE = 1024;
+
+ // Data output buffer size to absorb small writes from application.
private static final int DATA_OBUF_SIZE = 4 * 1024;
- public static final String CONF_LZO_CLASS = "io.compression.codec.lzo.class";
- public static final String CONF_SNAPPY_CLASS = "io.compression.codec.snappy.class";
- public static final String CONF_ZSTD_CLASS = "io.compression.codec.zstd.class";
+
+ // The name of the compression algorithm.
+ private final String name;
Algorithm(String name) {
- this.compressName = name;
+ this.name = name;
}
+ public abstract InputStream createDecompressionStream(InputStream downStream,
+ Decompressor decompressor, int downStreamBufferSize) throws IOException;
+
+ public abstract OutputStream createCompressionStream(OutputStream downStream,
+ Compressor compressor, int downStreamBufferSize) throws IOException;
+
+ public abstract boolean isSupported();
+
abstract CompressionCodec getCodec();
/**
- * function to create the default codec object.
+ * Create the default codec object.
*/
abstract void initializeDefaultCodec();
/**
* Shared function to create new codec objects. It is expected that if buffersize is invalid, a
- * codec will be created with the default buffer size
- *
- * @param bufferSize
- * configured buffer size.
- * @return new codec
+ * codec will be created with the default buffer size.
*/
abstract CompressionCodec createNewCodec(int bufferSize);
- public abstract InputStream createDecompressionStream(InputStream downStream,
- Decompressor decompressor, int downStreamBufferSize) throws IOException;
-
- public abstract OutputStream createCompressionStream(OutputStream downStream,
- Compressor compressor, int downStreamBufferSize) throws IOException;
-
- public abstract boolean isSupported();
-
public Compressor getCompressor() {
CompressionCodec codec = getCodec();
if (codec != null) {
Compressor compressor = CodecPool.getCompressor(codec);
if (compressor != null) {
if (compressor.finished()) {
- // Somebody returns the compressor to CodecPool but is still using
- // it.
+ // Somebody returns the compressor to CodecPool but is still using it.
log.warn("Compressor obtained from CodecPool already finished()");
} else {
log.debug("Got a compressor: {}", compressor.hashCode());
}
- /**
- * Following statement is necessary to get around bugs in 0.18 where a compressor is
- * referenced after returned back to the codec pool.
- */
+ // The following statement is necessary to get around bugs in 0.18 where a compressor is
+ // referenced after it's
+ // returned back to the codec pool.
compressor.reset();
}
return compressor;
@@ -664,7 +536,7 @@ public final class Compression {
return null;
}
- public void returnCompressor(Compressor compressor) {
+ public void returnCompressor(final Compressor compressor) {
if (compressor != null) {
log.debug("Return a compressor: {}", compressor.hashCode());
CodecPool.returnCompressor(compressor);
@@ -677,57 +549,157 @@ public final class Compression {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
if (decompressor.finished()) {
- // Somebody returns the decompressor to CodecPool but is still using
- // it.
+ // Somebody returns the decompressor to CodecPool but is still using it.
log.warn("Decompressor obtained from CodecPool already finished()");
} else {
log.debug("Got a decompressor: {}", decompressor.hashCode());
}
- /**
- * Following statement is necessary to get around bugs in 0.18 where a decompressor is
- * referenced after returned back to the codec pool.
- */
+ // The following statement is necessary to get around bugs in 0.18 where a decompressor is
+ // referenced after
+ // it's returned back to the codec pool.
decompressor.reset();
}
return decompressor;
}
-
return null;
}
- public void returnDecompressor(Decompressor decompressor) {
+ /**
+ * Returns the specified {@link Decompressor} to the codec cache if it is not null.
+ */
+ public void returnDecompressor(final Decompressor decompressor) {
if (decompressor != null) {
log.debug("Returned a decompressor: {}", decompressor.hashCode());
CodecPool.returnDecompressor(decompressor);
}
}
+ /**
+ * Returns the name of the compression algorithm.
+ *
+ * @return the name
+ */
public String getName() {
- return compressName;
+ return name;
+ }
+
+ /**
+ * Initializes and returns a new codec with the specified buffer size if and only if the
+ * specified {@link AtomicBoolean} has a value of false, or returns the specified original coded
+ * otherwise.
+ */
+ CompressionCodec initCodec(final AtomicBoolean checked, final int bufferSize,
+ final CompressionCodec originalCodec) {
+ if (!checked.get()) {
+ checked.set(true);
+ return createNewCodec(bufferSize);
+ }
+ return originalCodec;
}
- }
- static Algorithm getCompressionAlgorithmByName(String compressName) {
- Algorithm[] algos = Algorithm.class.getEnumConstants();
+ /**
+ * Returns a new {@link CompressionCodec} of the specified type, or the default type if no
+ * primary type is specified. If the specified buffer size is greater than 0, the specified
+ * buffer size configuration option will be updated in the codec's configuration with the buffer
+ * size. If the neither the specified codec type or the default codec type can be found, null
+ * will be returned.
+ */
+ CompressionCodec createNewCodec(final String codecClazzProp, final String defaultClazz,
+ final int bufferSize, final String bufferSizeConfigOpt) {
+ String extClazz = (conf.get(codecClazzProp) == null ? System.getProperty(codecClazzProp) : null);
+ String clazz = (extClazz != null) ? extClazz : defaultClazz;
+ try {
+ log.info("Trying to load codec class {} for {}", clazz, codecClazzProp);
+ Configuration config = new Configuration(conf);
+ updateBuffer(config, bufferSizeConfigOpt, bufferSize);
+ return (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz), config);
+ } catch (ClassNotFoundException e) {
+ // This is okay.
+ }
+ return null;
+ }
- for (Algorithm a : algos) {
- if (a.getName().equals(compressName)) {
- return a;
+ InputStream createDecompressionStream(final InputStream stream, final Decompressor decompressor,
+ final int bufferSize, final int defaultBufferSize, final Algorithm algorithm,
+ CompressionCodec codec) throws IOException {
+ // If the default buffer size is not being used, pull from the loading cache.
+ if (bufferSize != defaultBufferSize) {
+ Entry<Algorithm,Integer> sizeOpt = Maps.immutableEntry(algorithm, bufferSize);
+ try {
+ codec = codecCache.get(sizeOpt);
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ }
}
+ CompressionInputStream cis = codec.createInputStream(stream, decompressor);
+ return new BufferedInputStream(cis, DATA_IBUF_SIZE);
}
- throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
+ /**
+ * Returns a new {@link FinishOnFlushCompressionStream} initialized for the specified output
+ * stream and compressor.
+ */
+ OutputStream createFinishedOnFlushCompressionStream(final OutputStream downStream,
+ final Compressor compressor, final int downStreamBufferSize) throws IOException {
+ OutputStream out = bufferStream(downStream, downStreamBufferSize);
+ CompressionOutputStream cos = getCodec().createOutputStream(out, compressor);
+ return new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE);
+ }
+
+ /**
+ * Return the given stream wrapped as a {@link BufferedOutputStream} with the given buffer size
+ * if the buffer size is greater than 0, or return the original stream otherwise.
+ */
+ OutputStream bufferStream(final OutputStream stream, final int bufferSize) {
+ if (bufferSize > 0) {
+ return new BufferedOutputStream(stream, bufferSize);
+ }
+ return stream;
+ }
+
+ /**
+ * Return the given stream wrapped as a {@link BufferedInputStream} with the given buffer size
+ * if the buffer size is greater than 0, or return the original stream otherwise.
+ */
+ InputStream bufferStream(final InputStream stream, final int bufferSize) {
+ if (bufferSize > 0) {
+ return new BufferedInputStream(stream, bufferSize);
+ }
+ return stream;
+ }
+
+ /**
+ * Updates the value of the specified buffer size opt in the given {@link Configuration} if the
+ * new buffer size is greater than 0.
+ */
+ void updateBuffer(final Configuration config, final String bufferSizeOpt,
+ final int bufferSize) {
+ // Use the buffersize only if it is greater than 0, otherwise use the default defined within
+ // the codec.
+ if (bufferSize > 0) {
+ config.setInt(bufferSizeOpt, bufferSize);
+ }
+ }
}
public static String[] getSupportedAlgorithms() {
- Algorithm[] algos = Algorithm.class.getEnumConstants();
+ Algorithm[] algorithms = Algorithm.class.getEnumConstants();
+ ArrayList<String> supportedAlgorithms = new ArrayList<>();
+ for (Algorithm algorithm : algorithms) {
+ if (algorithm.isSupported()) {
+ supportedAlgorithms.add(algorithm.getName());
+ }
+ }
+ return supportedAlgorithms.toArray(new String[0]);
+ }
- ArrayList<String> ret = new ArrayList<>();
- for (Algorithm a : algos) {
- if (a.isSupported()) {
- ret.add(a.getName());
+ static Algorithm getCompressionAlgorithmByName(final String name) {
+ Algorithm[] algorithms = Algorithm.class.getEnumConstants();
+ for (Algorithm algorithm : algorithms) {
+ if (algorithm.getName().equals(name)) {
+ return algorithm;
}
}
- return ret.toArray(new String[ret.size()]);
+ throw new IllegalArgumentException("Unsupported compression algorithm name: " + name);
}
}