You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2021/06/16 08:37:18 UTC
[tez] branch master updated: TEZ-4295: Could not decompress data.
Buffer length is too small. (#130) (Laszlo Bodor reviewed by Ashutosh
Chauhan)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new b6c7fed TEZ-4295: Could not decompress data. Buffer length is too small. (#130) (Laszlo Bodor reviewed by Ashutosh Chauhan)
b6c7fed is described below
commit b6c7fedfb74e8222f5b12bf2f53967a04f99860c
Author: Bodor Laszlo <bo...@gmail.com>
AuthorDate: Wed Jun 16 10:37:09 2021 +0200
TEZ-4295: Could not decompress data. Buffer length is too small. (#130) (Laszlo Bodor reviewed by Ashutosh Chauhan)
---
.../runtime/library/common/TezRuntimeUtils.java | 28 ---
.../runtime/library/common/sort/impl/IFile.java | 10 +-
.../tez/runtime/library/utils/CodecUtils.java | 109 ++++++++--
.../orderedgrouped/DummyCompressionCodec.java | 20 +-
.../shuffle/orderedgrouped/TestMergeManager.java | 7 +-
.../library/common/sort/impl/TestIFile.java | 10 +-
.../tez/runtime/library/utils/TestCodecUtils.java | 224 +++++++++++++++++++++
7 files changed, 351 insertions(+), 57 deletions(-)
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index a1df131..9d9b8c1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -26,7 +26,6 @@ import java.net.URL;
import java.nio.ByteBuffer;
import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.http.BaseHttpConnection;
import org.apache.tez.http.HttpConnection;
@@ -37,8 +36,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.TaskContext;
@@ -262,29 +259,4 @@ public class TezRuntimeUtils {
in.close();
}
}
-
- public static String getBufferSizeProperty(CompressionCodec codec) {
- return getBufferSizeProperty(codec.getClass().getName());
- }
-
- public static String getBufferSizeProperty(String className) {
- switch (className) {
- case "org.apache.hadoop.io.compress.DefaultCodec":
- case "org.apache.hadoop.io.compress.BZip2Codec":
- case "org.apache.hadoop.io.compress.GzipCodec":
- return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
- case "org.apache.hadoop.io.compress.SnappyCodec":
- return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY;
- case "org.apache.hadoop.io.compress.ZStandardCodec":
- return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY;
- case "org.apache.hadoop.io.compress.LzoCodec":
- return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
- case "com.hadoop.compression.lzo.LzoCodec":
- return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
- case "org.apache.hadoop.io.compress.Lz4Codec":
- return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY;
- default:
- return null;
- }
- }
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index a4bbf5a..8f67318 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -363,10 +363,10 @@ public class IFile {
void setupOutputStream(CompressionCodec codec) throws IOException {
this.checksumOut = new IFileOutputStream(this.rawOut);
if (codec != null) {
- this.compressor = CodecPool.getCompressor(codec);
+ this.compressor = CodecUtils.getCompressor(codec);
if (this.compressor != null) {
this.compressor.reset();
- this.compressedOut = codec.createOutputStream(checksumOut, compressor);
+ this.compressedOut = CodecUtils.createOutputStream(codec, checksumOut, compressor);
this.out = new FSDataOutputStream(this.compressedOut, null);
this.compressOutput = true;
} else {
@@ -773,9 +773,9 @@ public class IFile {
checksumIn = new IFileInputStream(in, length, readAhead,
readAheadLength/* , isCompressed */);
if (isCompressed && codec != null) {
- decompressor = CodecPool.getDecompressor(codec);
+ decompressor = CodecUtils.getDecompressor(codec);
if (decompressor != null) {
- this.in = codec.createInputStream(checksumIn, decompressor);
+ this.in = CodecUtils.createInputStream(codec, checksumIn, decompressor);
} else {
LOG.warn("Could not obtain decompressor from CodecPool");
this.in = checksumIn;
@@ -818,7 +818,7 @@ public class IFile {
in = checksumIn;
Decompressor decompressor = null;
if (isCompressed && codec != null) {
- decompressor = CodecPool.getDecompressor(codec);
+ decompressor = CodecUtils.getDecompressor(codec);
if (decompressor != null) {
decompressor.reset();
in = CodecUtils.getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor,
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java
index 8e5154f..340ecce 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java
@@ -20,27 +20,33 @@ package org.apache.tez.runtime.library.utils;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
-import org.apache.tez.runtime.library.common.TezRuntimeUtils;
-import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
public final class CodecUtils {
- private static final Logger LOG = LoggerFactory.getLogger(IFile.class);
- private static final int DEFAULT_BUFFER_SIZE = 128 * 1024;
+ private static final Logger LOG = LoggerFactory.getLogger(CodecUtils.class);
+ @VisibleForTesting
+ static final int DEFAULT_BUFFER_SIZE = 256 * 1024;
private CodecUtils() {
}
@@ -76,20 +82,21 @@ public final class CodecUtils {
public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec,
IFileInputStream checksumIn, Decompressor decompressor, int compressedLength)
throws IOException {
- String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec);
- Configurable configurableCodec = (Configurable) codec;
- int originalSize = bufferSizeProp == null ? DEFAULT_BUFFER_SIZE :
- configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE);
-
+ String bufferSizeProp = getBufferSizeProperty(codec);
CompressionInputStream in = null;
if (bufferSizeProp != null) {
+ Configurable configurableCodec = (Configurable) codec;
Configuration conf = configurableCodec.getConf();
- int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
- LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
- DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize);
- synchronized (codec) {
+ synchronized (conf) {
+ int defaultBufferSize = getDefaultBufferSize(conf, codec);
+ int originalSize = conf.getInt(bufferSizeProp, defaultBufferSize);
+
+ int newBufSize = Math.min(compressedLength, defaultBufferSize);
+ LOG.debug("buffer size was set according to min({}, {}) => {}={}", compressedLength,
+ defaultBufferSize, bufferSizeProp, newBufSize);
+
conf.setInt(bufferSizeProp, newBufSize);
in = codec.createInputStream(checksumIn, decompressor);
@@ -117,7 +124,7 @@ public final class CodecUtils {
* issues above for Compressor instances as well, even when we tried to leverage from
* smaller buffer size only on decompression paths.
*/
- configurableCodec.getConf().setInt(bufferSizeProp, originalSize);
+ conf.setInt(bufferSizeProp, originalSize);
}
} else {
in = codec.createInputStream(checksumIn, decompressor);
@@ -125,4 +132,78 @@ public final class CodecUtils {
return in;
}
+
+ public static Compressor getCompressor(CompressionCodec codec) {
+ synchronized (((Configurable) codec).getConf()) {
+ return CodecPool.getCompressor(codec);
+ }
+ }
+
+ public static Decompressor getDecompressor(CompressionCodec codec) {
+ synchronized (((Configurable) codec).getConf()) {
+ return CodecPool.getDecompressor(codec);
+ }
+ }
+
+ public static CompressionInputStream createInputStream(CompressionCodec codec,
+ InputStream checksumIn, Decompressor decompressor) throws IOException {
+ synchronized (((Configurable) codec).getConf()) {
+ return codec.createInputStream(checksumIn, decompressor);
+ }
+ }
+
+ public static CompressionOutputStream createOutputStream(CompressionCodec codec,
+ OutputStream checksumOut, Compressor compressor) throws IOException {
+ synchronized (((Configurable) codec).getConf()) {
+ return codec.createOutputStream(checksumOut, compressor);
+ }
+ }
+
+ public static String getBufferSizeProperty(CompressionCodec codec) {
+ return getBufferSizeProperty(codec.getClass().getName());
+ }
+
+ public static String getBufferSizeProperty(String codecClassName) {
+ switch (codecClassName) {
+ case "org.apache.hadoop.io.compress.DefaultCodec":
+ case "org.apache.hadoop.io.compress.BZip2Codec":
+ case "org.apache.hadoop.io.compress.GzipCodec":
+ return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
+ case "org.apache.hadoop.io.compress.SnappyCodec":
+ return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY;
+ case "org.apache.hadoop.io.compress.ZStandardCodec":
+ return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY;
+ case "org.apache.hadoop.io.compress.LzoCodec":
+ case "com.hadoop.compression.lzo.LzoCodec":
+ return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
+ case "org.apache.hadoop.io.compress.Lz4Codec":
+ return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY;
+ default:
+ return null;
+ }
+ }
+
+ public static int getDefaultBufferSize(Configuration conf, CompressionCodec codec) {
+ return getDefaultBufferSize(conf, codec.getClass().getName());
+ }
+
+ public static int getDefaultBufferSize(Configuration conf, String codecClassName) {
+ switch (codecClassName) {
+ case "org.apache.hadoop.io.compress.DefaultCodec":
+ case "org.apache.hadoop.io.compress.BZip2Codec":
+ case "org.apache.hadoop.io.compress.GzipCodec":
+ return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+ case "org.apache.hadoop.io.compress.SnappyCodec":
+ return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT;
+ case "org.apache.hadoop.io.compress.ZStandardCodec":
+ return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT;
+ case "org.apache.hadoop.io.compress.LzoCodec":
+ case "com.hadoop.compression.lzo.LzoCodec":
+ return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT;
+ case "org.apache.hadoop.io.compress.Lz4Codec":
+ return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT;
+ default:
+ return DEFAULT_BUFFER_SIZE;
+ }
+ }
}
\ No newline at end of file
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java
index 962a9e0..530b9a3 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java
@@ -18,12 +18,16 @@
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
+import com.google.common.annotations.VisibleForTesting;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -33,7 +37,10 @@ import static org.mockito.Mockito.mock;
/**
* A dummy codec. It passes everything to underlying stream
*/
-public class DummyCompressionCodec implements CompressionCodec {
+public class DummyCompressionCodec implements CompressionCodec, Configurable {
+ @VisibleForTesting
+ int createInputStreamCalled = 0;
+ private Configuration conf;
@Override
public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
@@ -62,6 +69,7 @@ public class DummyCompressionCodec implements CompressionCodec {
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor decompressor) throws IOException {
+ createInputStreamCalled += 1;
return new DummyCompressionInputStream(in);
}
@@ -128,4 +136,14 @@ public class DummyCompressionCodec implements CompressionCodec {
//no-op
}
}
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
}
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 13f090c..dde067b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -21,7 +21,6 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -37,7 +36,6 @@ import java.util.UUID;
import com.google.common.collect.Sets;
-import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.serializer.WritableSerialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -273,7 +271,8 @@ public class TestMergeManager {
InputContext inputContext = createMockInputContext(UUID.randomUUID().toString());
// Create a mock compressor. We will check if it is used.
- CompressionCodec dummyCodec = spy(new DummyCompressionCodec());
+ DummyCompressionCodec dummyCodec = new DummyCompressionCodec();
+ dummyCodec.setConf(conf);
MergeManager mergeManager =
new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null,
@@ -312,7 +311,7 @@ public class TestMergeManager {
mo4.commit();
mergeManager.close(true);
- verify(dummyCodec, atLeastOnce()).createOutputStream(any(), any());
+ Assert.assertTrue(dummyCodec.createInputStreamCalled > 0);
}
@Test(timeout = 60000l)
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index bf35955..960aee3 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -57,7 +57,6 @@ import org.apache.hadoop.io.serializer.WritableSerialization;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
@@ -66,6 +65,7 @@ import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFile
import org.apache.tez.runtime.library.testutils.KVDataGen;
import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
import org.apache.tez.runtime.library.utils.BufferUtils;
+import org.apache.tez.runtime.library.utils.CodecUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -734,7 +734,7 @@ public class TestIFile {
public void testInMemoryBufferSize() throws IOException {
Configurable configurableCodec = (Configurable) codec;
int originalCodecBufferSize =
- configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), -1);
+ configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), -1);
// for smaller amount of data, codec buffer should be sized according to compressed data length
List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
@@ -742,7 +742,7 @@ public class TestIFile {
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
Assert.assertEquals(originalCodecBufferSize, // original size is repaired
- configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
+ configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), 0));
// buffer size cannot grow infinitely with compressed data size
data = KVDataGen.generateTestDataOfKeySize(false, 20000, rnd.nextInt(100));
@@ -750,7 +750,7 @@ public class TestIFile {
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
Assert.assertEquals(originalCodecBufferSize, // original size is repaired
- configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
+ configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), 0));
}
@Test(expected = IllegalArgumentException.class)
@@ -766,7 +766,7 @@ public class TestIFile {
Configuration conf = new Configuration();
System.out.println("trying with buffer size: " + bufferSize);
- conf.set(TezRuntimeUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize));
+ conf.set(CodecUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize));
CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
CompressionCodec codecToTest =
codecFactory.getCodecByClassName(codecClassName);
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java
new file mode 100644
index 0000000..afa6459
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DecompressorStream;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.io.compress.ZStandardCodec;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.DummyCompressionCodec;
+import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestCodecUtils {
+
+ @Test
+ public void testConcurrentDecompressorCreationWithModifiedBuffersize() throws Exception {
+ testConcurrentDecompressorCreationWithModifiedBuffersizeOnCodec(new DefaultCodec());
+ }
+
+ private void testConcurrentDecompressorCreationWithModifiedBuffersizeOnCodec(
+ CompressionCodec codec) throws InterruptedException, ExecutionException {
+ int modifiedBufferSize = 1000;
+ int numberOfThreads = 1000;
+
+ ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);
+
+ Configuration conf = new Configuration();
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, true);
+ ((Configurable) codec).setConf(conf);
+
+ Future<?>[] futures = new Future[numberOfThreads];
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ for (int i = 0; i < numberOfThreads; i++) {
+ futures[i] = service.submit(() -> {
+ try {
+ waitForLatch(latch);
+
+ Decompressor decompressor = CodecUtils.getDecompressor(codec);
+ DecompressorStream stream =
+ (DecompressorStream) CodecUtils.getDecompressedInputStreamWithBufferSize(codec,
+ Mockito.mock(IFileInputStream.class), decompressor, modifiedBufferSize);
+
+ Assert.assertEquals("stream buffer size is incorrect", modifiedBufferSize,
+ getBufferSize(stream));
+
+ CodecPool.returnDecompressor(decompressor);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ latch.countDown();
+
+ for (Future<?> f : futures) {
+ f.get();
+ }
+ }
+
+ @Test
+ public void testConcurrentCompressorDecompressorCreation() throws Exception {
+ testConcurrentCompressorDecompressorCreationOnCodec(new DefaultCodec());
+ }
+
+ private void testConcurrentCompressorDecompressorCreationOnCodec(CompressionCodec codec)
+ throws IOException, InterruptedException, ExecutionException {
+ int modifiedBufferSize = 1000;
+ int numberOfThreads = 1000;
+
+ ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);
+
+ Configuration conf = new Configuration();
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, true);
+ ((Configurable) codec).setConf(conf);
+
+ Future<?>[] futures = new Future[numberOfThreads];
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ for (int i = 0; i < numberOfThreads; i++) {
+ // let's "randomly" choose from scenarios and test them concurrently
+ // 1. getDecompressedInputStreamWithBufferSize
+ if (i % 3 == 0) {
+ futures[i] = service.submit(() -> {
+ try {
+ waitForLatch(latch);
+
+ Decompressor decompressor = CodecUtils.getDecompressor(codec);
+ CompressionInputStream stream =
+ (CompressionInputStream) CodecUtils.getDecompressedInputStreamWithBufferSize(codec,
+ Mockito.mock(IFileInputStream.class), decompressor, modifiedBufferSize);
+
+ Assert.assertEquals("stream buffer size is incorrect", modifiedBufferSize,
+ getBufferSize(stream));
+
+ CodecPool.returnDecompressor(decompressor);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ // 2. getCompressor
+ } else if (i % 3 == 1) {
+ futures[i] = service.submit(() -> {
+ try {
+ waitForLatch(latch);
+
+ Compressor compressor = CodecUtils.getCompressor(codec);
+ CompressionOutputStream stream =
+ CodecUtils.createOutputStream(codec, Mockito.mock(OutputStream.class), compressor);
+
+ Assert.assertEquals("stream buffer size is incorrect",
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT, getBufferSize(stream));
+
+ CodecPool.returnCompressor(compressor);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ // 3. getDecompressor
+ } else if (i % 3 == 2) {
+ futures[i] = service.submit(() -> {
+ try {
+ waitForLatch(latch);
+
+ Decompressor decompressor = CodecUtils.getDecompressor(codec);
+ CompressionInputStream stream =
+ CodecUtils.createInputStream(codec, Mockito.mock(InputStream.class), decompressor);
+
+ Assert.assertEquals("stream buffer size is incorrect",
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT, getBufferSize(stream));
+
+ CodecPool.returnDecompressor(decompressor);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+ latch.countDown();
+
+ for (Future<?> f : futures) {
+ f.get();
+ }
+ }
+
+ @Test
+ public void testDefaultBufferSize() {
+ Configuration conf = new Configuration(); // config with no buffersize set
+
+ Assert.assertEquals(CodecUtils.DEFAULT_BUFFER_SIZE,
+ CodecUtils.getDefaultBufferSize(conf, new DummyCompressionCodec()));
+ Assert.assertEquals(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT,
+ CodecUtils.getDefaultBufferSize(conf, new DefaultCodec()));
+ Assert.assertEquals(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT,
+ CodecUtils.getDefaultBufferSize(conf, new BZip2Codec()));
+ Assert.assertEquals(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT,
+ CodecUtils.getDefaultBufferSize(conf, new GzipCodec()));
+ Assert.assertEquals(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT,
+ CodecUtils.getDefaultBufferSize(conf, new SnappyCodec()));
+ Assert.assertEquals(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT,
+ CodecUtils.getDefaultBufferSize(conf, new ZStandardCodec()));
+ Assert.assertEquals(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT,
+ CodecUtils.getDefaultBufferSize(conf, new Lz4Codec()));
+ }
+
+ private void waitForLatch(CountDownLatch latch) {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private int getBufferSize(Object stream) {
+ try {
+ Field field = stream.getClass().getDeclaredField("buffer");
+ field.setAccessible(true);
+ byte[] buffer = (byte[]) field.get(stream);
+ return buffer.length;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}