You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ie...@apache.org on 2021/03/01 23:40:25 UTC

[avro] branch master updated: AVRO-3060: Support ZSTD level and BufferPool options

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new cff3bda  AVRO-3060: Support ZSTD level and BufferPool options
cff3bda is described below

commit cff3bdad6e32faf74cae7321e0bcef83846d5351
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Sun Feb 28 17:22:55 2021 -0800

    AVRO-3060: Support ZSTD level and BufferPool options
    
    This PR aims to support additional ZSTD options.
    - avro.mapred.zstd.level
    - avro.mapred.zstd.bufferpool
---
 .../java/org/apache/avro/file/CodecFactory.java    | 23 ++++++++++++++++++----
 .../java/org/apache/avro/file/ZstandardCodec.java  | 20 ++++++++++++-------
 .../java/org/apache/avro/file/ZstandardLoader.java | 14 +++++++++----
 .../test/java/org/apache/avro/TestDataFile.java    |  2 ++
 .../org/apache/avro/mapred/AvroOutputFormat.java   | 13 ++++++++++++
 .../avro/mapreduce/AvroOutputFormatBase.java       |  9 +++++++++
 6 files changed, 66 insertions(+), 15 deletions(-)

diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java b/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
index 13833ca..351c036 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java
@@ -87,7 +87,7 @@ public abstract class CodecFactory {
    *              use a lot of memory.
    */
   public static CodecFactory zstandardCodec(int level) {
-    return new ZstandardCodec.Option(level, false);
+    return new ZstandardCodec.Option(level, false, false);
   }
 
   /**
@@ -100,7 +100,21 @@ public abstract class CodecFactory {
    * @param useChecksum if true, will include a checksum with each data block
    */
   public static CodecFactory zstandardCodec(int level, boolean useChecksum) {
-    return new ZstandardCodec.Option(level, useChecksum);
+    return new ZstandardCodec.Option(level, useChecksum, false);
+  }
+
+  /**
+   * zstandard codec, with specific compression level, checksum, and bufferPool
+   *
+   * @param level         The compression level should be between -5 and 22,
+   *                      inclusive. Negative levels are 'fast' modes akin to lz4
+   *                      or snappy, levels above 9 are generally for archival
+   *                      purposes, and levels above 18 use a lot of memory.
+   * @param useChecksum   if true, will include a checksum with each data block
+   * @param useBufferPool if true, will use recycling buffer pool
+   */
+  public static CodecFactory zstandardCodec(int level, boolean useChecksum, boolean useBufferPool) {
+    return new ZstandardCodec.Option(level, useChecksum, useBufferPool);
   }
 
   /** Creates internal Codec. */
@@ -114,14 +128,15 @@ public abstract class CodecFactory {
 
   public static final int DEFAULT_DEFLATE_LEVEL = Deflater.DEFAULT_COMPRESSION;
   public static final int DEFAULT_XZ_LEVEL = XZCodec.DEFAULT_COMPRESSION;
-  public static final int DEFAULT_ZSTANDARD_LEVEL = 3;
+  public static final int DEFAULT_ZSTANDARD_LEVEL = ZstandardCodec.DEFAULT_COMPRESSION;
+  public static final boolean DEFAULT_ZSTANDARD_BUFFERPOOL = ZstandardCodec.DEFAULT_USE_BUFFERPOOL;
 
   static {
     addCodec(DataFileConstants.NULL_CODEC, nullCodec());
     addCodec(DataFileConstants.DEFLATE_CODEC, deflateCodec(DEFAULT_DEFLATE_LEVEL));
     addCodec(DataFileConstants.BZIP2_CODEC, bzip2Codec());
     addCodec(DataFileConstants.XZ_CODEC, xzCodec(DEFAULT_XZ_LEVEL));
-    addCodec(DataFileConstants.ZSTANDARD_CODEC, zstandardCodec(DEFAULT_ZSTANDARD_LEVEL));
+    addCodec(DataFileConstants.ZSTANDARD_CODEC, zstandardCodec(DEFAULT_ZSTANDARD_LEVEL, DEFAULT_ZSTANDARD_BUFFERPOOL));
     addCodec(DataFileConstants.SNAPPY_CODEC, snappyCodec());
   }
 
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
index b3d5327..7773d1e 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java
@@ -27,33 +27,39 @@ import java.nio.ByteBuffer;
 import org.apache.commons.compress.utils.IOUtils;
 
 public class ZstandardCodec extends Codec {
+  public final static int DEFAULT_COMPRESSION = 3;
+  public final static boolean DEFAULT_USE_BUFFERPOOL = false;
 
   static class Option extends CodecFactory {
     private final int compressionLevel;
     private final boolean useChecksum;
+    private final boolean useBufferPool;
 
-    Option(int compressionLevel, boolean useChecksum) {
+    Option(int compressionLevel, boolean useChecksum, boolean useBufferPool) {
       this.compressionLevel = compressionLevel;
       this.useChecksum = useChecksum;
+      this.useBufferPool = useBufferPool;
     }
 
     @Override
     protected Codec createInstance() {
-      return new ZstandardCodec(compressionLevel, useChecksum);
+      return new ZstandardCodec(compressionLevel, useChecksum, useBufferPool);
     }
   }
 
   private final int compressionLevel;
   private final boolean useChecksum;
+  private final boolean useBufferPool;
   private ByteArrayOutputStream outputBuffer;
 
   /**
-   * Create a ZstandardCodec instance with the given compressionLevel and checksum
-   * option
+   * Create a ZstandardCodec instance with the given compressionLevel, checksum,
+   * and bufferPool option
    **/
-  public ZstandardCodec(int compressionLevel, boolean useChecksum) {
+  public ZstandardCodec(int compressionLevel, boolean useChecksum, boolean useBufferPool) {
     this.compressionLevel = compressionLevel;
     this.useChecksum = useChecksum;
+    this.useBufferPool = useBufferPool;
   }
 
   @Override
@@ -64,7 +70,7 @@ public class ZstandardCodec extends Codec {
   @Override
   public ByteBuffer compress(ByteBuffer data) throws IOException {
     ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
-    try (OutputStream outputStream = ZstandardLoader.output(baos, compressionLevel, useChecksum)) {
+    try (OutputStream outputStream = ZstandardLoader.output(baos, compressionLevel, useChecksum, useBufferPool)) {
       outputStream.write(data.array(), computeOffset(data), data.remaining());
     }
     return ByteBuffer.wrap(baos.toByteArray());
@@ -75,7 +81,7 @@ public class ZstandardCodec extends Codec {
     ByteArrayOutputStream baos = getOutputBuffer(compressedData.remaining());
     InputStream bytesIn = new ByteArrayInputStream(compressedData.array(), computeOffset(compressedData),
         compressedData.remaining());
-    try (InputStream ios = ZstandardLoader.input(bytesIn)) {
+    try (InputStream ios = ZstandardLoader.input(bytesIn, useBufferPool)) {
       IOUtils.copy(ios, baos);
     }
     return ByteBuffer.wrap(baos.toByteArray());
diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardLoader.java b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardLoader.java
index d6f2a4a..1819a65 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardLoader.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardLoader.java
@@ -21,6 +21,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import com.github.luben.zstd.BufferPool;
+import com.github.luben.zstd.NoPool;
+import com.github.luben.zstd.RecyclingBufferPool;
 import com.github.luben.zstd.Zstd;
 import com.github.luben.zstd.ZstdInputStream;
 import com.github.luben.zstd.ZstdOutputStream;
@@ -30,13 +33,16 @@ import com.github.luben.zstd.ZstdOutputStream;
  * or decompress methods rather than when we instantiate it */
 final class ZstandardLoader {
 
-  static InputStream input(InputStream compressed) throws IOException {
-    return new ZstdInputStream(compressed);
+  static InputStream input(InputStream compressed, boolean useBufferPool) throws IOException {
+    BufferPool pool = useBufferPool ? RecyclingBufferPool.INSTANCE : NoPool.INSTANCE;
+    return new ZstdInputStream(compressed, pool);
   }
 
-  static OutputStream output(OutputStream compressed, int level, boolean checksum) throws IOException {
+  static OutputStream output(OutputStream compressed, int level, boolean checksum, boolean useBufferPool)
+      throws IOException {
     int bounded = Math.max(Math.min(level, Zstd.maxCompressionLevel()), Zstd.minCompressionLevel());
-    ZstdOutputStream zstdOutputStream = new ZstdOutputStream(compressed, bounded);
+    BufferPool pool = useBufferPool ? RecyclingBufferPool.INSTANCE : NoPool.INSTANCE;
+    ZstdOutputStream zstdOutputStream = new ZstdOutputStream(compressed, pool).setLevel(bounded);
     zstdOutputStream.setCloseFrameOnFlush(false);
     zstdOutputStream.setChecksum(checksum);
     return zstdOutputStream;
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
index 81eb012..9340c5e 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
@@ -74,6 +74,8 @@ public class TestDataFile {
     r.add(new Object[] { CodecFactory.zstandardCodec(0, true) });
     r.add(new Object[] { CodecFactory.zstandardCodec(5, false) });
     r.add(new Object[] { CodecFactory.zstandardCodec(18, true) });
+    r.add(new Object[] { CodecFactory.zstandardCodec(0, false, false) });
+    r.add(new Object[] { CodecFactory.zstandardCodec(0, false, true) });
     return r;
   }
 
diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
index 52007c0..88ac1cd 100644
--- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
+++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
@@ -42,8 +42,11 @@ import org.apache.avro.hadoop.file.HadoopCodecFactory;
 import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
 import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
 import static org.apache.avro.file.DataFileConstants.XZ_CODEC;
+import static org.apache.avro.file.DataFileConstants.ZSTANDARD_CODEC;
 import static org.apache.avro.file.CodecFactory.DEFAULT_DEFLATE_LEVEL;
 import static org.apache.avro.file.CodecFactory.DEFAULT_XZ_LEVEL;
+import static org.apache.avro.file.CodecFactory.DEFAULT_ZSTANDARD_LEVEL;
+import static org.apache.avro.file.CodecFactory.DEFAULT_ZSTANDARD_BUFFERPOOL;
 
 /**
  * An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files.
@@ -63,6 +66,12 @@ public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWr
   /** The configuration key for Avro XZ level. */
   public static final String XZ_LEVEL_KEY = "avro.mapred.xz.level";
 
+  /** The configuration key for Avro ZSTD level. */
+  public static final String ZSTD_LEVEL_KEY = "avro.mapred.zstd.level";
+
+  /** The configuration key for Avro ZSTD buffer pool. */
+  public static final String ZSTD_BUFFERPOOL_KEY = "avro.mapred.zstd.bufferpool";
+
   /** The configuration key for Avro sync interval. */
   public static final String SYNC_INTERVAL_KEY = "avro.mapred.sync.interval";
 
@@ -116,6 +125,8 @@ public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWr
     if (FileOutputFormat.getCompressOutput(job)) {
       int deflateLevel = job.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
       int xzLevel = job.getInt(XZ_LEVEL_KEY, DEFAULT_XZ_LEVEL);
+      int zstdLevel = job.getInt(ZSTD_LEVEL_KEY, DEFAULT_ZSTANDARD_LEVEL);
+      boolean zstdBufferPool = job.getBoolean(ZSTD_BUFFERPOOL_KEY, DEFAULT_ZSTANDARD_BUFFERPOOL);
       String codecName = job.get(AvroJob.OUTPUT_CODEC);
 
       if (codecName == null) {
@@ -133,6 +144,8 @@ public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWr
           factory = CodecFactory.deflateCodec(deflateLevel);
         } else if (codecName.equals(XZ_CODEC)) {
           factory = CodecFactory.xzCodec(xzLevel);
+        } else if (codecName.equals(ZSTANDARD_CODEC)) {
+          factory = CodecFactory.zstandardCodec(zstdLevel, false, zstdBufferPool);
         } else {
           factory = CodecFactory.fromString(codecName);
         }
diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
index 0c3594f..587d787 100644
--- a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
+++ b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroOutputFormatBase.java
@@ -29,6 +29,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
+import static org.apache.avro.file.CodecFactory.DEFAULT_ZSTANDARD_BUFFERPOOL;
+import static org.apache.avro.file.CodecFactory.DEFAULT_ZSTANDARD_LEVEL;
+
 /**
  * Abstract base class for output formats that write Avro container files.
  *
@@ -50,6 +53,10 @@ public abstract class AvroOutputFormatBase<K, V> extends FileOutputFormat<K, V>
           CodecFactory.DEFAULT_DEFLATE_LEVEL);
       int xzLevel = context.getConfiguration().getInt(org.apache.avro.mapred.AvroOutputFormat.XZ_LEVEL_KEY,
           CodecFactory.DEFAULT_XZ_LEVEL);
+      int zstdLevel = context.getConfiguration().getInt(org.apache.avro.mapred.AvroOutputFormat.ZSTD_LEVEL_KEY,
+          DEFAULT_ZSTANDARD_LEVEL);
+      boolean zstdBufferPool = context.getConfiguration()
+          .getBoolean(org.apache.avro.mapred.AvroOutputFormat.ZSTD_BUFFERPOOL_KEY, DEFAULT_ZSTANDARD_BUFFERPOOL);
 
       String outputCodec = context.getConfiguration().get(AvroJob.CONF_OUTPUT_CODEC);
 
@@ -66,6 +73,8 @@ public abstract class AvroOutputFormatBase<K, V> extends FileOutputFormat<K, V>
         return CodecFactory.deflateCodec(deflateLevel);
       } else if (DataFileConstants.XZ_CODEC.equals(outputCodec)) {
         return CodecFactory.xzCodec(xzLevel);
+      } else if (DataFileConstants.ZSTANDARD_CODEC.equals(outputCodec)) {
+        return CodecFactory.zstandardCodec(zstdLevel, false, zstdBufferPool);
       } else {
         return CodecFactory.fromString(outputCodec);
       }