You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/11/26 11:05:57 UTC
[tez] branch branch-0.9 updated: TEZ-4234: Compressor can cause IllegalArgumentException in Buffer.limit where limit exceeds capacity (László Bodor reviewed by Rajesh Balamohan, Jonathan Turner Eagles)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new bcea2d8 TEZ-4234: Compressor can cause IllegalArgumentException in Buffer.limit where limit exceeds capacity (László Bodor reviewed by Rajesh Balamohan, Jonathan Turner Eagles)
bcea2d8 is described below
commit bcea2d803bbdd90c12701f0e9d275409ecd6b796
Author: László Bodor <bo...@gmail.com>
AuthorDate: Thu Nov 26 11:39:56 2020 +0100
TEZ-4234: Compressor can cause IllegalArgumentException in Buffer.limit where limit exceeds capacity (László Bodor reviewed by Rajesh Balamohan, Jonathan Turner Eagles)
Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
.../hadoop/TestConfigTranslationMRToTez.java | 1 -
.../tez/runtime/library/common/ConfigUtils.java | 23 ----
.../runtime/library/common/TezRuntimeUtils.java | 16 ++-
.../common/shuffle/orderedgrouped/Shuffle.java | 15 +--
.../library/common/sort/impl/ExternalSorter.java | 29 +----
.../runtime/library/common/sort/impl/IFile.java | 25 +---
.../writers/BaseUnorderedPartitionedKVWriter.java | 15 +--
.../runtime/library/input/UnorderedKVInput.java | 14 +--
.../tez/runtime/library/utils/CodecUtils.java | 127 +++++++++++++++++++++
.../library/common/shuffle/TestShuffleUtils.java | 8 +-
.../library/common/sort/impl/TestIFile.java | 59 +++++++++-
11 files changed, 215 insertions(+), 117 deletions(-)
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
index deab64f..df68c8d 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
@@ -70,6 +70,5 @@ public class TestConfigTranslationMRToTez {
assertEquals(LongWritable.class.getName(), ConfigUtils
.getIntermediateInputValueClass(confVertex1).getName());
assertTrue(ConfigUtils.shouldCompressIntermediateOutput(confVertex1));
- assertTrue(ConfigUtils.isIntermediateInputCompressed(confVertex1));
}
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
index 76d3dff..f83fdc9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
@@ -56,24 +56,6 @@ public class ConfigUtils {
}
return codecClass;
}
-
- public static Class<? extends CompressionCodec> getIntermediateInputCompressorClass(
- Configuration conf, Class<DefaultCodec> defaultValue) {
- Class<? extends CompressionCodec> codecClass = defaultValue;
- String name = conf
- .get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
- if (name != null) {
- try {
- codecClass = conf.getClassByName(name).asSubclass(
- CompressionCodec.class);
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException("Compression codec " + name
- + " was not found.", e);
- }
- }
- return codecClass;
- }
-
// TODO Move defaults over to a constants file.
@@ -82,11 +64,6 @@ public class ConfigUtils {
TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
}
- public static boolean isIntermediateInputCompressed(Configuration conf) {
- return conf.getBoolean(
- TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
- }
-
public static <V> Class<V> getIntermediateOutputValueClass(Configuration conf) {
Class<V> retv = (Class<V>) conf.getClass(
TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, null,
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index 78460c9..c9ddb6c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -265,14 +265,20 @@ public class TezRuntimeUtils {
}
public static String getBufferSizeProperty(CompressionCodec codec) {
- switch (codec.getClass().getSimpleName().toString()) {
- case "DefaultCodec":
+ return getBufferSizeProperty(codec.getClass().getName());
+ }
+
+ public static String getBufferSizeProperty(String className) {
+ switch (className) {
+ case "org.apache.hadoop.io.compress.DefaultCodec":
return "io.file.buffer.size";
- case "SnappyCodec":
+ case "org.apache.hadoop.io.compress.SnappyCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY;
- case "LzoCodec":
+ case "org.apache.hadoop.io.compress.LzoCodec":
+ return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
+ case "com.hadoop.compression.lzo.LzoCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
- case "Lz4Codec":
+ case "org.apache.hadoop.io.compress.Lz4Codec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY;
default:
return null;
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 38f079a..db5ef73 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -39,8 +39,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.GuavaShim;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
@@ -51,12 +49,11 @@ import org.apache.tez.dag.api.TezException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException;
-
+import org.apache.tez.runtime.library.utils.CodecUtils;
import org.apache.tez.common.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -114,16 +111,8 @@ public class Shuffle implements ExceptionReporter {
this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
+ this.codec = CodecUtils.getCodec(conf);
- if (ConfigUtils.isIntermediateInputCompressed(conf)) {
- Class<? extends CompressionCodec> codecClass =
- ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, conf);
- // Work around needed for HADOOP-12191. Avoids the native initialization synchronization race
- codec.getDecompressorType();
- } else {
- codec = null;
- }
this.ifileReadAhead = conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 715d0e0..b26d609 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -39,8 +39,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progressable;
@@ -60,7 +58,7 @@ import org.apache.tez.runtime.library.common.serializer.SerializationContext;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
-
+import org.apache.tez.runtime.library.utils.CodecUtils;
import org.apache.tez.common.Preconditions;
@SuppressWarnings({"rawtypes"})
@@ -220,30 +218,7 @@ public abstract class ExternalSorter {
numShuffleChunks = outputContext.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT);
// compression
- if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
- Class<? extends CompressionCodec> codecClass =
- ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, this.conf);
-
- if (codec != null) {
- Class<? extends Compressor> compressorType = null;
- Throwable cause = null;
- try {
- compressorType = codec.getCompressorType();
- } catch (RuntimeException e) {
- cause = e;
- }
- if (compressorType == null) {
- String errMsg =
- String.format("Unable to get CompressorType for codec (%s). This is most" +
- " likely due to missing native libraries for the codec.",
- conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC));
- throw new IOException(errMsg, cause);
- }
- }
- } else {
- codec = null;
- }
+ this.codec = CodecUtils.getCodec(conf);
this.ifileReadAhead = this.conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index 69b524b..07fbd0c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -32,15 +32,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.utils.BufferUtils;
+import org.apache.tez.runtime.library.utils.CodecUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CodecPool;
@@ -623,7 +621,8 @@ public class IFile {
decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
decompressor.reset();
- in = getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor, compressedLength);
+ in = CodecUtils.getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor,
+ compressedLength);
} else {
LOG.warn("Could not obtain decompressor from CodecPool");
in = checksumIn;
@@ -659,24 +658,6 @@ public class IFile {
}
}
- private static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec,
- IFileInputStream checksumIn, Decompressor decompressor, int compressedLength)
- throws IOException {
- String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec);
-
- if (bufferSizeProp != null) {
- Configurable configurableCodec = (Configurable) codec;
- Configuration conf = configurableCodec.getConf();
-
- int bufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
- LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
- DEFAULT_BUFFER_SIZE, bufferSizeProp, bufSize);
- conf.setInt(bufferSizeProp, bufSize);
- }
-
- return codec.createInputStream(checksumIn, decompressor);
- }
-
/**
* Read entire IFile content to disk.
*
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
index 16291da..4487731 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
@@ -29,10 +29,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.Event;
@@ -43,6 +41,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+import org.apache.tez.runtime.library.utils.CodecUtils;
@SuppressWarnings("rawtypes")
public abstract class BaseUnorderedPartitionedKVWriter extends KeyValuesWriter {
@@ -137,14 +136,12 @@ public abstract class BaseUnorderedPartitionedKVWriter extends KeyValuesWriter {
numAdditionalSpillsCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
// compression
- if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
- Class<? extends CompressionCodec> codecClass =
- ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, this.conf);
- } else {
- codec = null;
+ try {
+ this.codec = CodecUtils.getCodec(conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
-
+
this.ifileReadAhead = this.conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 401066d..0637ee2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -34,8 +34,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.counters.TaskCounter;
@@ -46,14 +44,13 @@ import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
import org.apache.tez.runtime.library.common.readers.UnorderedKVReader;
import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleInputEventHandlerImpl;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager;
import org.apache.tez.runtime.library.common.shuffle.impl.SimpleFetchedInputAllocator;
-
+import org.apache.tez.runtime.library.utils.CodecUtils;
import org.apache.tez.common.Preconditions;
/**
@@ -114,14 +111,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
if (!isStarted.get()) {
////// Initial configuration
memoryUpdateCallbackHandler.validateUpdateReceived();
- CompressionCodec codec;
- if (ConfigUtils.isIntermediateInputCompressed(conf)) {
- Class<? extends CompressionCodec> codecClass = ConfigUtils
- .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, conf);
- } else {
- codec = null;
- }
+ CompressionCodec codec = CodecUtils.getCodec(conf);
boolean compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
boolean ifileReadAhead = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java
new file mode 100644
index 0000000..99d22c5
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class CodecUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(IFile.class);
+ private static final int DEFAULT_BUFFER_SIZE = 128 * 1024;
+
+ private CodecUtils() {
+ }
+
+ public static CompressionCodec getCodec(Configuration conf) throws IOException {
+ if (ConfigUtils.shouldCompressIntermediateOutput(conf)) {
+ Class<? extends CompressionCodec> codecClass =
+ ConfigUtils.getIntermediateOutputCompressorClass(conf, DefaultCodec.class);
+ CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
+
+ if (codec != null) {
+ Class<? extends Compressor> compressorType = null;
+ Throwable cause = null;
+ try {
+ compressorType = codec.getCompressorType();
+ } catch (RuntimeException e) {
+ cause = e;
+ }
+ if (compressorType == null) {
+ String errMsg = String.format(
+ "Unable to get CompressorType for codec (%s). This is most"
+ + " likely due to missing native libraries for the codec.",
+ conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC));
+ throw new IOException(errMsg, cause);
+ }
+ }
+ return codec;
+ } else {
+ return null;
+ }
+ }
+
+ public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec,
+ IFileInputStream checksumIn, Decompressor decompressor, int compressedLength)
+ throws IOException {
+ String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec);
+ Configurable configurableCodec = (Configurable) codec;
+ int originalSize = configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE);
+
+ CompressionInputStream in = null;
+
+ if (bufferSizeProp != null) {
+ Configuration conf = configurableCodec.getConf();
+ int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
+ LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
+ DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize);
+
+ synchronized (codec) {
+ conf.setInt(bufferSizeProp, newBufSize);
+
+ in = codec.createInputStream(checksumIn, decompressor);
+ /*
+ * We would better reset the original buffer size into the codec. Basically the buffer size
+ * is used at 2 places.
+ *
+ * 1. It can tell the inputstream/outputstream buffersize (which is created by
+ * codec.createInputStream/codec.createOutputStream). This is something which might and
+ * should be optimized in config, as inputstreams instantiate and use their own buffer and
+ * won't reuse buffers from previous streams (TEZ-4135).
+ *
+ * 2. The same buffersize is used when a codec creates a new Compressor/Decompressor. The
+ * fundamental difference is that Compressor/Decompressor instances are expensive and reused
+ * by hadoop's CodecPool. Here is a hidden mismatch, which can happen when a codec is
+ * created with a small buffersize config. Once it creates a Compressor/Decompressor
+ * instance from its config field, the reused Compressor/Decompressor instance will be
+ * reused later, even when application handles large amount of data. This way we can end up
+ * in large stream buffers + small compressor/decompressor buffers, which can be suboptimal,
+ * moreover, it can lead to strange errors, when a compressed output exceeds the size of the
+ * buffer (TEZ-4234).
+ *
+ * An interesting outcome is that - as the codec buffersize config affects both
+ * compressor(output) and decompressor(input) paths - an altered codec config can cause the
+ * issues above for Compressor instances as well, even when we tried to leverage from
+ * smaller buffer size only on decompression paths.
+ */
+ configurableCodec.getConf().setInt(bufferSizeProp, originalSize);
+ }
+ } else {
+ in = codec.createInputStream(checksumIn, decompressor);
+ }
+
+ return in;
+ }
+}
\ No newline at end of file
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index 520dec7..446801a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -290,6 +290,7 @@ public class TestShuffleUtils {
.thenThrow(new InternalError(codecErrorMsg));
Decompressor mockDecoder = mock(Decompressor.class);
CompressionCodec mockCodec = mock(ConfigurableCodecForTest.class);
+ when(((ConfigurableCodecForTest) mockCodec).getConf()).thenReturn(mock(Configuration.class));
when(mockCodec.createDecompressor()).thenReturn(mockDecoder);
when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class)))
.thenReturn(mockCodecStream);
@@ -312,6 +313,7 @@ public class TestShuffleUtils {
.thenThrow(new IllegalArgumentException(codecErrorMsg));
Decompressor mockDecoder = mock(Decompressor.class);
CompressionCodec mockCodec = mock(ConfigurableCodecForTest.class);
+ when(((ConfigurableCodecForTest) mockCodec).getConf()).thenReturn(mock(Configuration.class));
when(mockCodec.createDecompressor()).thenReturn(mockDecoder);
when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class)))
.thenReturn(mockCodecStream);
@@ -327,7 +329,8 @@ public class TestShuffleUtils {
CompressionInputStream mockCodecStream1 = mock(CompressionInputStream.class);
when(mockCodecStream1.read(any(byte[].class), anyInt(), anyInt()))
.thenThrow(new SocketTimeoutException(codecErrorMsg));
- CompressionCodec mockCodec1 = mock(CompressionCodec.class);
+ CompressionCodec mockCodec1 = mock(ConfigurableCodecForTest.class);
+ when(((ConfigurableCodecForTest) mockCodec1).getConf()).thenReturn(mock(Configuration.class));
when(mockCodec1.createDecompressor()).thenReturn(mockDecoder);
when(mockCodec1.createInputStream(any(InputStream.class), any(Decompressor.class)))
.thenReturn(mockCodecStream1);
@@ -342,7 +345,8 @@ public class TestShuffleUtils {
CompressionInputStream mockCodecStream2 = mock(CompressionInputStream.class);
when(mockCodecStream2.read(any(byte[].class), anyInt(), anyInt()))
.thenThrow(new InternalError(codecErrorMsg));
- CompressionCodec mockCodec2 = mock(CompressionCodec.class);
+ CompressionCodec mockCodec2 = mock(ConfigurableCodecForTest.class);
+ when(((ConfigurableCodecForTest) mockCodec2).getConf()).thenReturn(mock(Configuration.class));
when(mockCodec2.createDecompressor()).thenReturn(mockDecoder);
when(mockCodec2.createInputStream(any(InputStream.class), any(Decompressor.class)))
.thenReturn(mockCodecStream2);
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index 0678d04..a17e971 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -53,9 +53,11 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
@@ -69,6 +71,7 @@ import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
import org.apache.tez.runtime.library.utils.BufferUtils;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -607,13 +610,16 @@ public class TestIFile {
@Test
public void testInMemoryBufferSize() throws IOException {
+ Configurable configurableCodec = (Configurable) codec;
+ int originalCodecBufferSize =
+ configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), -1);
+
// for smaller amount of data, codec buffer should be sized according to compressed data length
List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
Writer writer = writeTestFile(false, false, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
- Configurable configurableCodec = (Configurable) codec;
- Assert.assertEquals(writer.getCompressedLength(),
+ Assert.assertEquals(originalCodecBufferSize, // original size is repaired
configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
// buffer size cannot grow infinitely with compressed data size
@@ -621,10 +627,57 @@ public class TestIFile {
writer = writeTestFile(false, false, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
- Assert.assertEquals(128*1024,
+ Assert.assertEquals(originalCodecBufferSize, // original size is repaired
configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
}
+ @Test(expected = IllegalArgumentException.class)
+ public void testSmallDataCompression() throws IOException {
+ Assume.assumeTrue(NativeCodeLoader.isNativeCodeLoaded());
+
+ tryWriteFileWithBufferSize(17, "org.apache.hadoop.io.compress.Lz4Codec");
+ tryWriteFileWithBufferSize(32, "org.apache.hadoop.io.compress.Lz4Codec");
+ }
+
+ private void tryWriteFileWithBufferSize(int bufferSize, String codecClassName)
+ throws IOException {
+ Configuration conf = new Configuration();
+
+ System.out.println("trying with buffer size: " + bufferSize);
+ conf.set(TezRuntimeUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize));
+ CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
+ CompressionCodec codecToTest =
+ codecFactory.getCodecByClassName(codecClassName);
+ List<KVPair> data = KVDataGen.generateTestDataOfKeySize(false, 1, 0);
+ writeTestFile(false, false, data, codecToTest);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testLz4CompressedDataIsLargerThanOriginal() throws IOException {
+ Assume.assumeTrue(NativeCodeLoader.isNativeCodeLoaded());
+
+ // this one succeeds
+ byte[] buf = new byte[32];
+ initBufWithNumbers(buf, 24, 45, 55, 49, 54, 55, 55, 54, 49, 48, 50, 55, 49, 56, 54, 48, 57, 48);
+ Lz4Compressor comp = new Lz4Compressor(32, false);
+ comp.setInput(buf, 0, 32);
+ comp.compress(buf, 0, 32);
+
+ // adding 1 more element makes that fail
+ buf = new byte[32];
+ initBufWithNumbers(buf, 24, 45, 55, 49, 54, 55, 55, 54, 49, 48, 50, 55, 49, 56, 54, 48, 57, 48,
+ 50);
+ comp = new Lz4Compressor(32, false);
+ comp.setInput(buf, 0, 32);
+ comp.compress(buf, 0, 32);
+ }
+
+ private void initBufWithNumbers(byte[] buf, int... args) {
+ for (int i = 0; i < args.length; i++) {
+ buf[i] = (byte) args[i];
+ }
+ }
+
/**
* Test different options (RLE, repeat keys, compression) on reader/writer
*