You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ie...@apache.org on 2021/03/01 23:40:25 UTC
[avro] branch master updated: AVRO-3060: Support ZSTD level and
BufferPool options
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new cff3bda AVRO-3060: Support ZSTD level and BufferPool options
cff3bda is described below
commit cff3bdad6e32faf74cae7321e0bcef83846d5351
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Sun Feb 28 17:22:55 2021 -0800
AVRO-3060: Support ZSTD level and BufferPool options
This PR aims to support additional ZSTD options.
- avro.mapred.zstd.level
- avro.mapred.zstd.bufferpool
---
.../java/org/apache/avro/file/CodecFactory.java | 23 ++++++++++++++++++----
.../java/org/apache/avro/file/ZstandardCodec.java | 20 ++++++++++++-------
.../java/org/apache/avro/file/ZstandardLoader.java | 14 +++++++++----
.../test/java/org/apache/avro/TestDataFile.java | 2 ++
.../org/apache/avro/mapred/AvroOutputFormat.java | 13 ++++++++++++
.../avro/mapreduce/AvroOutputFormatBase.java | 9 +++++++++
6 files changed, 66 insertions(+), 15 deletions(-)
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java b/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
index 13833ca..351c036 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
@@ -87,7 +87,7 @@ public abstract class CodecFactory {
* use a lot of memory.
*/
public static CodecFactory zstandardCodec(int level) {
- return new ZstandardCodec.Option(level, false);
+ return new ZstandardCodec.Option(level, false, false);
}
/**
@@ -100,7 +100,21 @@ public abstract class CodecFactory {
* @param useChecksum if true, will include a checksum with each data block
*/
public static CodecFactory zstandardCodec(int level, boolean useChecksum) {
- return new ZstandardCodec.Option(level, useChecksum);
+ return new ZstandardCodec.Option(level, useChecksum, false);
+ }
+
+ /**
+ * zstandard codec, with specific compression level, checksum, and bufferPool
+ *
+ * @param level The compression level should be between -5 and 22,
+ * inclusive. Negative levels are 'fast' modes akin to lz4
+ * or snappy, levels above 9 are generally for archival
+ * purposes, and levels above 18 use a lot of memory.
+ * @param useChecksum if true, will include a checksum with each data block
+ * @param useBufferPool if true, will use recycling buffer pool
+ */
+ public static CodecFactory zstandardCodec(int level, boolean useChecksum, boolean useBufferPool) {
+ return new ZstandardCodec.Option(level, useChecksum, useBufferPool);
}
/** Creates internal Codec. */
@@ -114,14 +128,15 @@ public abstract class CodecFactory {
public static final int DEFAULT_DEFLATE_LEVEL = Deflater.DEFAULT_COMPRESSION;
public static final int DEFAULT_XZ_LEVEL = XZCodec.DEFAULT_COMPRESSION;
- public static final int DEFAULT_ZSTANDARD_LEVEL = 3;
+ public static final int DEFAULT_ZSTANDARD_LEVEL = ZstandardCodec.DEFAULT_COMPRESSION;
+ public static final boolean DEFAULT_ZSTANDARD_BUFFERPOOL = ZstandardCodec.DEFAULT_USE_BUFFERPOOL;
static {
addCodec(DataFileConstants.NULL_CODEC, nullCodec());
addCodec(DataFileConstants.DEFLATE_CODEC, deflateCodec(DEFAULT_DEFLATE_LEVEL));
addCodec(DataFileConstants.BZIP2_CODEC, bzip2Codec());
addCodec(DataFileConstants.XZ_CODEC, xzCodec(DEFAULT_XZ_LEVEL));
- addCodec(DataFileConstants.ZSTANDARD_CODEC, zstandardCodec(DEFAULT_ZSTANDARD_LEVEL));
+ addCodec(DataFileConstants.ZSTANDARD_CODEC, zstandardCodec(DEFAULT_ZSTANDARD_LEVEL, DEFAULT_ZSTANDARD_BUFFERPOOL));
addCodec(DataFileConstants.SNAPPY_CODEC, snappyCodec());
}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
index b3d5327..7773d1e 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
@@ -27,33 +27,39 @@ import java.nio.ByteBuffer;
import org.apache.commons.compress.utils.IOUtils;
public class ZstandardCodec extends Codec {
+ public final static int DEFAULT_COMPRESSION = 3;
+ public final static boolean DEFAULT_USE_BUFFERPOOL = false;
static class Option extends CodecFactory {
private final int compressionLevel;
private final boolean useChecksum;
+ private final boolean useBufferPool;
- Option(int compressionLevel, boolean useChecksum) {
+ Option(int compressionLevel, boolean useChecksum, boolean useBufferPool) {
this.compressionLevel = compressionLevel;
this.useChecksum = useChecksum;
+ this.useBufferPool = useBufferPool;
}
@Override
protected Codec createInstance() {
- return new ZstandardCodec(compressionLevel, useChecksum);
+ return new ZstandardCodec(compressionLevel, useChecksum, useBufferPool);
}
}
private final int compressionLevel;
private final boolean useChecksum;
+ private final boolean useBufferPool;
private ByteArrayOutputStream outputBuffer;
/**
- * Create a ZstandardCodec instance with the given compressionLevel and checksum
- * option
+ * Create a ZstandardCodec instance with the given compressionLevel, checksum,
+ * and bufferPool option
**/
- public ZstandardCodec(int compressionLevel, boolean useChecksum) {
+ public ZstandardCodec(int compressionLevel, boolean useChecksum, boolean useBufferPool) {
this.compressionLevel = compressionLevel;
this.useChecksum = useChecksum;
+ this.useBufferPool = useBufferPool;
}
@Override
@@ -64,7 +70,7 @@ public class ZstandardCodec extends Codec {
@Override
public ByteBuffer compress(ByteBuffer data) throws IOException {
ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
- try (OutputStream outputStream = ZstandardLoader.output(baos, compressionLevel, useChecksum)) {
+ try (OutputStream outputStream = ZstandardLoader.output(baos, compressionLevel, useChecksum, useBufferPool)) {
outputStream.write(data.array(), computeOffset(data), data.remaining());
}
return ByteBuffer.wrap(baos.toByteArray());
@@ -75,7 +81,7 @@ public class ZstandardCodec extends Codec {
ByteArrayOutputStream baos = getOutputBuffer(compressedData.remaining());
InputStream bytesIn = new ByteArrayInputStream(compressedData.array(), computeOffset(compressedData),
compressedData.remaining());
- try (InputStream ios = ZstandardLoader.input(bytesIn)) {
+ try (InputStream ios = ZstandardLoader.input(bytesIn, useBufferPool)) {
IOUtils.copy(ios, baos);
}
return ByteBuffer.wrap(baos.toByteArray());
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardLoader.java b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardLoader.java
index d6f2a4a..1819a65 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardLoader.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardLoader.java
@@ -21,6 +21,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import com.github.luben.zstd.BufferPool;
+import com.github.luben.zstd.NoPool;
+import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdInputStream;
import com.github.luben.zstd.ZstdOutputStream;
@@ -30,13 +33,16 @@ import com.github.luben.zstd.ZstdOutputStream;
* or decompress methods rather than when we instantiate it */
final class ZstandardLoader {
- static InputStream input(InputStream compressed) throws IOException {
- return new ZstdInputStream(compressed);
+ static InputStream input(InputStream compressed, boolean useBufferPool) throws IOException {
+ BufferPool pool = useBufferPool ? RecyclingBufferPool.INSTANCE : NoPool.INSTANCE;
+ return new ZstdInputStream(compressed, pool);
}
- static OutputStream output(OutputStream compressed, int level, boolean checksum) throws IOException {
+ static OutputStream output(OutputStream compressed, int level, boolean checksum, boolean useBufferPool)
+ throws IOException {
int bounded = Math.max(Math.min(level, Zstd.maxCompressionLevel()), Zstd.minCompressionLevel());
- ZstdOutputStream zstdOutputStream = new ZstdOutputStream(compressed, bounded);
+ BufferPool pool = useBufferPool ? RecyclingBufferPool.INSTANCE : NoPool.INSTANCE;
+ ZstdOutputStream zstdOutputStream = new ZstdOutputStream(compressed, pool).setLevel(bounded);
zstdOutputStream.setCloseFrameOnFlush(false);
zstdOutputStream.setChecksum(checksum);
return zstdOutputStream;
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
index 81eb012..9340c5e 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
@@ -74,6 +74,8 @@ public class TestDataFile {
r.add(new Object[] { CodecFactory.zstandardCodec(0, true) });
r.add(new Object[] { CodecFactory.zstandardCodec(5, false) });
r.add(new Object[] { CodecFactory.zstandardCodec(18, true) });
+ r.add(new Object[] { CodecFactory.zstandardCodec(0, false, false) });
+ r.add(new Object[] { CodecFactory.zstandardCodec(0, false, true) });
return r;
}
diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
index 52007c0..88ac1cd 100644
--- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
+++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
@@ -42,8 +42,11 @@ import org.apache.avro.hadoop.file.HadoopCodecFactory;
import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
import static org.apache.avro.file.DataFileConstants.XZ_CODEC;
+import static org.apache.avro.file.DataFileConstants.ZSTANDARD_CODEC;
import static org.apache.avro.file.CodecFactory.DEFAULT_DEFLATE_LEVEL;
import static org.apache.avro.file.CodecFactory.DEFAULT_XZ_LEVEL;
+import static org.apache.avro.file.CodecFactory.DEFAULT_ZSTANDARD_LEVEL;
+import static org.apache.avro.file.CodecFactory.DEFAULT_ZSTANDARD_BUFFERPOOL;
/**
* An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files.
@@ -63,6 +66,12 @@ public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWr
/** The configuration key for Avro XZ level. */
public static final String XZ_LEVEL_KEY = "avro.mapred.xz.level";
+ /** The configuration key for Avro ZSTD level. */
+ public static final String ZSTD_LEVEL_KEY = "avro.mapred.zstd.level";
+
+ /** The configuration key for Avro ZSTD buffer pool. */
+ public static final String ZSTD_BUFFERPOOL_KEY = "avro.mapred.zstd.bufferpool";
+
/** The configuration key for Avro sync interval. */
public static final String SYNC_INTERVAL_KEY = "avro.mapred.sync.interval";
@@ -116,6 +125,8 @@ public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWr
if (FileOutputFormat.getCompressOutput(job)) {
int deflateLevel = job.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
int xzLevel = job.getInt(XZ_LEVEL_KEY, DEFAULT_XZ_LEVEL);
+ int zstdLevel = job.getInt(ZSTD_LEVEL_KEY, DEFAULT_ZSTANDARD_LEVEL);
+ boolean zstdBufferPool = job.getBoolean(ZSTD_BUFFERPOOL_KEY, DEFAULT_ZSTANDARD_BUFFERPOOL);
String codecName = job.get(AvroJob.OUTPUT_CODEC);
if (codecName == null) {
@@ -133,6 +144,8 @@ public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWr
factory = CodecFactory.deflateCodec(deflateLevel);
} else if (codecName.equals(XZ_CODEC)) {
factory = CodecFactory.xzCodec(xzLevel);
+ } else if (codecName.equals(ZSTANDARD_CODEC)) {
+ factory = CodecFactory.zstandardCodec(zstdLevel, false, zstdBufferPool);
} else {
factory = CodecFactory.fromString(codecName);
}
diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
index 0c3594f..587d787 100644
--- a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
+++ b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
@@ -29,6 +29,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import static org.apache.avro.file.CodecFactory.DEFAULT_ZSTANDARD_BUFFERPOOL;
+import static org.apache.avro.file.CodecFactory.DEFAULT_ZSTANDARD_LEVEL;
+
/**
* Abstract base class for output formats that write Avro container files.
*
@@ -50,6 +53,10 @@ public abstract class AvroOutputFormatBase<K, V> extends FileOutputFormat<K, V>
CodecFactory.DEFAULT_DEFLATE_LEVEL);
int xzLevel = context.getConfiguration().getInt(org.apache.avro.mapred.AvroOutputFormat.XZ_LEVEL_KEY,
CodecFactory.DEFAULT_XZ_LEVEL);
+ int zstdLevel = context.getConfiguration().getInt(org.apache.avro.mapred.AvroOutputFormat.ZSTD_LEVEL_KEY,
+ DEFAULT_ZSTANDARD_LEVEL);
+ boolean zstdBufferPool = context.getConfiguration()
+ .getBoolean(org.apache.avro.mapred.AvroOutputFormat.ZSTD_BUFFERPOOL_KEY, DEFAULT_ZSTANDARD_BUFFERPOOL);
String outputCodec = context.getConfiguration().get(AvroJob.CONF_OUTPUT_CODEC);
@@ -66,6 +73,8 @@ public abstract class AvroOutputFormatBase<K, V> extends FileOutputFormat<K, V>
return CodecFactory.deflateCodec(deflateLevel);
} else if (DataFileConstants.XZ_CODEC.equals(outputCodec)) {
return CodecFactory.xzCodec(xzLevel);
+ } else if (DataFileConstants.ZSTANDARD_CODEC.equals(outputCodec)) {
+ return CodecFactory.zstandardCodec(zstdLevel, false, zstdBufferPool);
} else {
return CodecFactory.fromString(outputCodec);
}