You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2021/06/16 08:44:29 UTC

[tez] branch branch-0.9 updated: TEZ-4295: Could not decompress data. Buffer length is too small. (#130) (Laszlo Bodor reviewed by Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 46e2d98  TEZ-4295: Could not decompress data. Buffer length is too small. (#130) (Laszlo Bodor reviewed by Ashutosh Chauhan)
46e2d98 is described below

commit 46e2d9885cc09f672abf363a1c547010a7394534
Author: Bodor Laszlo <bo...@gmail.com>
AuthorDate: Wed Jun 16 10:37:09 2021 +0200

    TEZ-4295: Could not decompress data. Buffer length is too small. (#130) (Laszlo Bodor reviewed by Ashutosh Chauhan)
---
 .../runtime/library/common/TezRuntimeUtils.java    |  26 ---
 .../runtime/library/common/sort/impl/IFile.java    |  10 +-
 .../tez/runtime/library/utils/CodecUtils.java      | 105 ++++++++--
 .../orderedgrouped/DummyCompressionCodec.java      |  20 +-
 .../shuffle/orderedgrouped/TestMergeManager.java   |   7 +-
 .../library/common/sort/impl/TestIFile.java        |  10 +-
 .../tez/runtime/library/utils/TestCodecUtils.java  | 221 +++++++++++++++++++++
 7 files changed, 344 insertions(+), 55 deletions(-)

diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index 815018c..9d9b8c1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -26,7 +26,6 @@ import java.net.URL;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.http.BaseHttpConnection;
 import org.apache.tez.http.HttpConnection;
@@ -37,8 +36,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.TaskContext;
@@ -262,27 +259,4 @@ public class TezRuntimeUtils {
       in.close();
     }
   }
-
-  public static String getBufferSizeProperty(CompressionCodec codec) {
-    return getBufferSizeProperty(codec.getClass().getName());
-  }
-
-  public static String getBufferSizeProperty(String className) {
-    switch (className) {
-    case "org.apache.hadoop.io.compress.DefaultCodec":
-    case "org.apache.hadoop.io.compress.BZip2Codec":
-    case "org.apache.hadoop.io.compress.GzipCodec":
-      return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
-    case "org.apache.hadoop.io.compress.SnappyCodec":
-      return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY;
-    case "org.apache.hadoop.io.compress.LzoCodec":
-      return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
-    case "com.hadoop.compression.lzo.LzoCodec":
-      return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
-    case "org.apache.hadoop.io.compress.Lz4Codec":
-      return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY;
-    default:
-      return null;
-    }
-  }
 }
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index f424ff0..26e860e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -152,10 +152,10 @@ public class IFile {
       this.start = this.rawOut.getPos();
       this.rle = rle;
       if (codec != null) {
-        this.compressor = CodecPool.getCompressor(codec);
+        this.compressor = CodecUtils.getCompressor(codec);
         if (this.compressor != null) {
           this.compressor.reset();
-          this.compressedOut = codec.createOutputStream(checksumOut, compressor);
+          this.compressedOut = CodecUtils.createOutputStream(codec, checksumOut, compressor);
           this.out = new FSDataOutputStream(this.compressedOut,  null);
           this.compressOutput = true;
         } else {
@@ -573,9 +573,9 @@ public class IFile {
         checksumIn = new IFileInputStream(in, length, readAhead,
             readAheadLength/* , isCompressed */);
         if (isCompressed && codec != null) {
-          decompressor = CodecPool.getDecompressor(codec);
+          decompressor = CodecUtils.getDecompressor(codec);
           if (decompressor != null) {
-            this.in = codec.createInputStream(checksumIn, decompressor);
+            this.in = CodecUtils.createInputStream(codec, checksumIn, decompressor);
           } else {
             LOG.warn("Could not obtain decompressor from CodecPool");
             this.in = checksumIn;
@@ -618,7 +618,7 @@ public class IFile {
       in = checksumIn;
       Decompressor decompressor = null;
       if (isCompressed && codec != null) {
-        decompressor = CodecPool.getDecompressor(codec);
+        decompressor = CodecUtils.getDecompressor(codec);
         if (decompressor != null) {
           decompressor.reset();
           in = CodecUtils.getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor,
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java
index 8e5154f..8857cb9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java
@@ -20,27 +20,33 @@ package org.apache.tez.runtime.library.utils;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.common.TezRuntimeUtils;
-import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public final class CodecUtils {
 
-  private static final Logger LOG = LoggerFactory.getLogger(IFile.class);
-  private static final int DEFAULT_BUFFER_SIZE = 128 * 1024;
+  private static final Logger LOG = LoggerFactory.getLogger(CodecUtils.class);
+  @VisibleForTesting
+  static final int DEFAULT_BUFFER_SIZE = 256 * 1024;
 
   private CodecUtils() {
   }
@@ -76,20 +82,21 @@ public final class CodecUtils {
   public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec,
       IFileInputStream checksumIn, Decompressor decompressor, int compressedLength)
       throws IOException {
-    String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec);
-    Configurable configurableCodec = (Configurable) codec;
-    int originalSize = bufferSizeProp == null ? DEFAULT_BUFFER_SIZE :
-            configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE);
-
+    String bufferSizeProp = getBufferSizeProperty(codec);
     CompressionInputStream in = null;
 
     if (bufferSizeProp != null) {
+      Configurable configurableCodec = (Configurable) codec;
       Configuration conf = configurableCodec.getConf();
-      int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
-      LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
-          DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize);
 
-      synchronized (codec) {
+      synchronized (conf) {
+        int defaultBufferSize = getDefaultBufferSize(conf, codec);
+        int originalSize = conf.getInt(bufferSizeProp, defaultBufferSize);
+
+        int newBufSize = Math.min(compressedLength, defaultBufferSize);
+        LOG.debug("buffer size was set according to min({}, {}) => {}={}", compressedLength,
+            defaultBufferSize, bufferSizeProp, newBufSize);
+
         conf.setInt(bufferSizeProp, newBufSize);
 
         in = codec.createInputStream(checksumIn, decompressor);
@@ -117,7 +124,7 @@ public final class CodecUtils {
          * issues above for Compressor instances as well, even when we tried to leverage from
          * smaller buffer size only on decompression paths.
          */
-        configurableCodec.getConf().setInt(bufferSizeProp, originalSize);
+        conf.setInt(bufferSizeProp, originalSize);
       }
     } else {
       in = codec.createInputStream(checksumIn, decompressor);
@@ -125,4 +132,74 @@ public final class CodecUtils {
 
     return in;
   }
+
+  public static Compressor getCompressor(CompressionCodec codec) {
+    synchronized (((Configurable) codec).getConf()) {
+      return CodecPool.getCompressor(codec);
+    }
+  }
+
+  public static Decompressor getDecompressor(CompressionCodec codec) {
+    synchronized (((Configurable) codec).getConf()) {
+      return CodecPool.getDecompressor(codec);
+    }
+  }
+
+  public static CompressionInputStream createInputStream(CompressionCodec codec,
+      InputStream checksumIn, Decompressor decompressor) throws IOException {
+    synchronized (((Configurable) codec).getConf()) {
+      return codec.createInputStream(checksumIn, decompressor);
+    }
+  }
+
+  public static CompressionOutputStream createOutputStream(CompressionCodec codec,
+      OutputStream checksumOut, Compressor compressor) throws IOException {
+    synchronized (((Configurable) codec).getConf()) {
+      return codec.createOutputStream(checksumOut, compressor);
+    }
+  }
+
+  public static String getBufferSizeProperty(CompressionCodec codec) {
+    return getBufferSizeProperty(codec.getClass().getName());
+  }
+
+  public static String getBufferSizeProperty(String codecClassName) {
+    switch (codecClassName) {
+    case "org.apache.hadoop.io.compress.DefaultCodec":
+    case "org.apache.hadoop.io.compress.BZip2Codec":
+    case "org.apache.hadoop.io.compress.GzipCodec":
+      return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
+    case "org.apache.hadoop.io.compress.SnappyCodec":
+      return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY;
+    case "org.apache.hadoop.io.compress.LzoCodec":
+    case "com.hadoop.compression.lzo.LzoCodec":
+      return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
+    case "org.apache.hadoop.io.compress.Lz4Codec":
+      return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY;
+    default:
+      return null;
+    }
+  }
+
+  public static int getDefaultBufferSize(Configuration conf, CompressionCodec codec) {
+    return getDefaultBufferSize(conf, codec.getClass().getName());
+  }
+
+  public static int getDefaultBufferSize(Configuration conf, String codecClassName) {
+    switch (codecClassName) {
+    case "org.apache.hadoop.io.compress.DefaultCodec":
+    case "org.apache.hadoop.io.compress.BZip2Codec":
+    case "org.apache.hadoop.io.compress.GzipCodec":
+      return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+    case "org.apache.hadoop.io.compress.SnappyCodec":
+      return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT;
+    case "org.apache.hadoop.io.compress.LzoCodec":
+    case "com.hadoop.compression.lzo.LzoCodec":
+      return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT;
+    case "org.apache.hadoop.io.compress.Lz4Codec":
+      return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT;
+    default:
+      return DEFAULT_BUFFER_SIZE;
+    }
+  }
 }
\ No newline at end of file
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java
index 962a9e0..530b9a3 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java
@@ -18,12 +18,16 @@
 
 package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionInputStream;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -33,7 +37,10 @@ import static org.mockito.Mockito.mock;
 /**
  * A dummy codec. It passes everything to underlying stream
  */
-public class DummyCompressionCodec implements CompressionCodec {
+public class DummyCompressionCodec implements CompressionCodec, Configurable {
+  @VisibleForTesting
+  int createInputStreamCalled = 0;
+  private Configuration conf;
 
   @Override
   public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
@@ -62,6 +69,7 @@ public class DummyCompressionCodec implements CompressionCodec {
 
   @Override
   public CompressionInputStream createInputStream(InputStream in, Decompressor decompressor) throws IOException {
+    createInputStreamCalled += 1;
     return new DummyCompressionInputStream(in);
   }
 
@@ -128,4 +136,14 @@ public class DummyCompressionCodec implements CompressionCodec {
       //no-op
     }
   }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
 }
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 13f090c..dde067b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -21,7 +21,6 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -37,7 +36,6 @@ import java.util.UUID;
 
 import com.google.common.collect.Sets;
 
-import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.serializer.WritableSerialization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -273,7 +271,8 @@ public class TestMergeManager {
     InputContext inputContext = createMockInputContext(UUID.randomUUID().toString());
 
     // Create a mock compressor. We will check if it is used.
-    CompressionCodec dummyCodec = spy(new DummyCompressionCodec());
+    DummyCompressionCodec dummyCodec = new DummyCompressionCodec();
+    dummyCodec.setConf(conf);
 
     MergeManager mergeManager =
             new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null,
@@ -312,7 +311,7 @@ public class TestMergeManager {
     mo4.commit();
 
     mergeManager.close(true);
-    verify(dummyCodec, atLeastOnce()).createOutputStream(any(), any());
+    Assert.assertTrue(dummyCodec.createInputStreamCalled > 0);
   }
 
   @Test(timeout = 60000l)
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index a17e971..bce5d3d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -60,7 +60,6 @@ import org.apache.hadoop.io.serializer.WritableSerialization;
 import org.apache.hadoop.util.NativeCodeLoader;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
@@ -69,6 +68,7 @@ import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFile
 import org.apache.tez.runtime.library.testutils.KVDataGen;
 import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
 import org.apache.tez.runtime.library.utils.BufferUtils;
+import org.apache.tez.runtime.library.utils.CodecUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -612,7 +612,7 @@ public class TestIFile {
   public void testInMemoryBufferSize() throws IOException {
     Configurable configurableCodec = (Configurable) codec;
     int originalCodecBufferSize =
-        configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), -1);
+        configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), -1);
 
     // for smaller amount of data, codec buffer should be sized according to compressed data length
     List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
@@ -620,7 +620,7 @@ public class TestIFile {
     readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
 
     Assert.assertEquals(originalCodecBufferSize, // original size is repaired
-        configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
+        configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), 0));
 
     // buffer size cannot grow infinitely with compressed data size
     data = KVDataGen.generateTestDataOfKeySize(false, 20000, rnd.nextInt(100));
@@ -628,7 +628,7 @@ public class TestIFile {
     readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
 
     Assert.assertEquals(originalCodecBufferSize, // original size is repaired
-        configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
+        configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), 0));
   }
 
   @Test(expected = IllegalArgumentException.class)
@@ -644,7 +644,7 @@ public class TestIFile {
     Configuration conf = new Configuration();
 
     System.out.println("trying with buffer size: " + bufferSize);
-    conf.set(TezRuntimeUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize));
+    conf.set(CodecUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize));
     CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
     CompressionCodec codecToTest =
         codecFactory.getCodecByClassName(codecClassName);
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java
new file mode 100644
index 0000000..c18620f
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DecompressorStream;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.DummyCompressionCodec;
+import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestCodecUtils {
+
+  @Test
+  public void testConcurrentDecompressorCreationWithModifiedBuffersize() throws Exception {
+    testConcurrentDecompressorCreationWithModifiedBuffersizeOnCodec(new DefaultCodec());
+  }
+
+  private void testConcurrentDecompressorCreationWithModifiedBuffersizeOnCodec(
+      CompressionCodec codec) throws InterruptedException, ExecutionException {
+    int modifiedBufferSize = 1000;
+    int numberOfThreads = 1000;
+
+    ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, true);
+    ((Configurable) codec).setConf(conf);
+
+    Future<?>[] futures = new Future[numberOfThreads];
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    for (int i = 0; i < numberOfThreads; i++) {
+      futures[i] = service.submit(() -> {
+        try {
+          waitForLatch(latch);
+
+          Decompressor decompressor = CodecUtils.getDecompressor(codec);
+          DecompressorStream stream =
+              (DecompressorStream) CodecUtils.getDecompressedInputStreamWithBufferSize(codec,
+                  Mockito.mock(IFileInputStream.class), decompressor, modifiedBufferSize);
+
+          Assert.assertEquals("stream buffer size is incorrect", modifiedBufferSize,
+              getBufferSize(stream));
+
+          CodecPool.returnDecompressor(decompressor);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      });
+    }
+    latch.countDown();
+
+    for (Future<?> f : futures) {
+      f.get();
+    }
+  }
+
+  @Test
+  public void testConcurrentCompressorDecompressorCreation() throws Exception {
+    testConcurrentCompressorDecompressorCreationOnCodec(new DefaultCodec());
+  }
+
+  private void testConcurrentCompressorDecompressorCreationOnCodec(CompressionCodec codec)
+      throws IOException, InterruptedException, ExecutionException {
+    int modifiedBufferSize = 1000;
+    int numberOfThreads = 1000;
+
+    ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, true);
+    ((Configurable) codec).setConf(conf);
+
+    Future<?>[] futures = new Future[numberOfThreads];
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    for (int i = 0; i < numberOfThreads; i++) {
+      // let's "randomly" choose from scenarios and test them concurrently
+      // 1. getDecompressedInputStreamWithBufferSize
+      if (i % 3 == 0) {
+        futures[i] = service.submit(() -> {
+          try {
+            waitForLatch(latch);
+
+            Decompressor decompressor = CodecUtils.getDecompressor(codec);
+            CompressionInputStream stream =
+                (CompressionInputStream) CodecUtils.getDecompressedInputStreamWithBufferSize(codec,
+                    Mockito.mock(IFileInputStream.class), decompressor, modifiedBufferSize);
+
+            Assert.assertEquals("stream buffer size is incorrect", modifiedBufferSize,
+                getBufferSize(stream));
+
+            CodecPool.returnDecompressor(decompressor);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        });
+        // 2. getCompressor
+      } else if (i % 3 == 1) {
+        futures[i] = service.submit(() -> {
+          try {
+            waitForLatch(latch);
+
+            Compressor compressor = CodecUtils.getCompressor(codec);
+            CompressionOutputStream stream =
+                CodecUtils.createOutputStream(codec, Mockito.mock(OutputStream.class), compressor);
+
+            Assert.assertEquals("stream buffer size is incorrect",
+                CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT, getBufferSize(stream));
+
+            CodecPool.returnCompressor(compressor);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        });
+        // 3. getDecompressor
+      } else if (i % 3 == 2) {
+        futures[i] = service.submit(() -> {
+          try {
+            waitForLatch(latch);
+
+            Decompressor decompressor = CodecUtils.getDecompressor(codec);
+            CompressionInputStream stream =
+                CodecUtils.createInputStream(codec, Mockito.mock(InputStream.class), decompressor);
+
+            Assert.assertEquals("stream buffer size is incorrect",
+                CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT, getBufferSize(stream));
+
+            CodecPool.returnDecompressor(decompressor);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        });
+      }
+    }
+    latch.countDown();
+
+    for (Future<?> f : futures) {
+      f.get();
+    }
+  }
+
+  @Test
+  public void testDefaultBufferSize() {
+    Configuration conf = new Configuration(); // config with no buffersize set
+
+    Assert.assertEquals(CodecUtils.DEFAULT_BUFFER_SIZE,
+        CodecUtils.getDefaultBufferSize(conf, new DummyCompressionCodec()));
+    Assert.assertEquals(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT,
+        CodecUtils.getDefaultBufferSize(conf, new DefaultCodec()));
+    Assert.assertEquals(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT,
+        CodecUtils.getDefaultBufferSize(conf, new BZip2Codec()));
+    Assert.assertEquals(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT,
+        CodecUtils.getDefaultBufferSize(conf, new GzipCodec()));
+    Assert.assertEquals(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT,
+        CodecUtils.getDefaultBufferSize(conf, new SnappyCodec()));
+    Assert.assertEquals(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT,
+        CodecUtils.getDefaultBufferSize(conf, new Lz4Codec()));
+  }
+
+  private void waitForLatch(CountDownLatch latch) {
+    try {
+      latch.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private int getBufferSize(Object stream) {
+    try {
+      Field field = stream.getClass().getDeclaredField("buffer");
+      field.setAccessible(true);
+      byte[] buffer = (byte[]) field.get(stream);
+      return buffer.length;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}