You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/10/06 07:01:36 UTC

[tez] branch branch-0.10.0 updated (2358bd8 -> 6c2028a)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a change to branch branch-0.10.0
in repository https://gitbox.apache.org/repos/asf/tez.git.


    from 2358bd8  TEZ-4225: CHANGES.txt for 0.10.0 Release (László Bodor reviewed by Jonathan Turner Eagles) - addendum
     new 40c8662  TEZ-4234: Compressor can cause IllegalArgumentException in Buffer.limit where limit exceeds capacity (László Bodor reviewed by Rajesh Balamohan, Jonathan Turner Eagles)
     new 6c2028a  CHANGES.txt updated with TEZ-4234

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   3 +-
 .../hadoop/TestConfigTranslationMRToTez.java       |   1 -
 .../tez/runtime/library/common/ConfigUtils.java    |  23 ----
 .../runtime/library/common/TezRuntimeUtils.java    |  18 ++-
 .../common/shuffle/orderedgrouped/Shuffle.java     |  15 +--
 .../library/common/sort/impl/ExternalSorter.java   |  29 +----
 .../runtime/library/common/sort/impl/IFile.java    |  25 +---
 .../writers/BaseUnorderedPartitionedKVWriter.java  |  17 ++-
 .../runtime/library/input/UnorderedKVInput.java    |  14 +--
 .../tez/runtime/library/utils/CodecUtils.java      | 127 +++++++++++++++++++++
 .../library/common/shuffle/TestShuffleUtils.java   |   8 +-
 .../library/common/sort/impl/TestIFile.java        |  59 +++++++++-
 12 files changed, 219 insertions(+), 120 deletions(-)
 create mode 100644 tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java


[tez] 02/02: CHANGES.txt updated with TEZ-4234

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch branch-0.10.0
in repository https://gitbox.apache.org/repos/asf/tez.git

commit 6c2028aa0571a78e4d4aae74d5b4c1efe0881f73
Author: Laszlo Bodor <bo...@gmail.com>
AuthorDate: Tue Oct 6 09:01:04 2020 +0200

    CHANGES.txt updated with TEZ-4234
---
 CHANGES.txt | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 296de23..6e69a6e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,10 +1,11 @@
 Apache Tez Change Log
 =====================
-Release 0.10.0: 2020-09-01
+Release 0.10.0: 2020-10-15
 
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+TEZ-4234: Compressor can cause IllegalArgumentException in Buffer.limit where limit exceeds capacity (László Bodor reviewed by Rajesh Balamohan, Jonathan Turner Eagles)
 TEZ-4230: LocalContainerLauncher can kill task future too early, causing app hang (László Bodor reviewed by Jonathan Turner Eagles)
 TEZ-4228: TezClassLoader should be used in TezChild and for Configuration objects
 TEZ-3645: Reuse SerializationFactory while sorting, merging, and writing IFiles (Jonathan Turner Eagles reviewed by Rajesh Balamohan, Laszlo Bodor)


[tez] 01/02: TEZ-4234: Compressor can cause IllegalArgumentException in Buffer.limit where limit exceeds capacity (László Bodor reviewed by Rajesh Balamohan, Jonathan Turner Eagles)

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch branch-0.10.0
in repository https://gitbox.apache.org/repos/asf/tez.git

commit 40c86627143a51e8da869615f1bf54ed4a22a641
Author: László Bodor <bo...@gmail.com>
AuthorDate: Tue Oct 6 08:56:57 2020 +0200

    TEZ-4234: Compressor can cause IllegalArgumentException in Buffer.limit where limit exceeds capacity (László Bodor reviewed by Rajesh Balamohan, Jonathan Turner Eagles)
    
    Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
 .../hadoop/TestConfigTranslationMRToTez.java       |   1 -
 .../tez/runtime/library/common/ConfigUtils.java    |  23 ----
 .../runtime/library/common/TezRuntimeUtils.java    |  18 ++-
 .../common/shuffle/orderedgrouped/Shuffle.java     |  15 +--
 .../library/common/sort/impl/ExternalSorter.java   |  29 +----
 .../runtime/library/common/sort/impl/IFile.java    |  25 +---
 .../writers/BaseUnorderedPartitionedKVWriter.java  |  17 ++-
 .../runtime/library/input/UnorderedKVInput.java    |  14 +--
 .../tez/runtime/library/utils/CodecUtils.java      | 127 +++++++++++++++++++++
 .../library/common/shuffle/TestShuffleUtils.java   |   8 +-
 .../library/common/sort/impl/TestIFile.java        |  59 +++++++++-
 11 files changed, 217 insertions(+), 119 deletions(-)

diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
index deab64f..df68c8d 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
@@ -70,6 +70,5 @@ public class TestConfigTranslationMRToTez {
     assertEquals(LongWritable.class.getName(), ConfigUtils
         .getIntermediateInputValueClass(confVertex1).getName());
     assertTrue(ConfigUtils.shouldCompressIntermediateOutput(confVertex1));
-    assertTrue(ConfigUtils.isIntermediateInputCompressed(confVertex1));
   }
 }
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
index 76d3dff..f83fdc9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
@@ -56,24 +56,6 @@ public class ConfigUtils {
     }
     return codecClass;
   }
-  
-  public static Class<? extends CompressionCodec> getIntermediateInputCompressorClass(
-      Configuration conf, Class<DefaultCodec> defaultValue) {
-    Class<? extends CompressionCodec> codecClass = defaultValue;
-    String name = conf
-        .get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
-    if (name != null) {
-      try {
-        codecClass = conf.getClassByName(name).asSubclass(
-            CompressionCodec.class);
-      } catch (ClassNotFoundException e) {
-        throw new IllegalArgumentException("Compression codec " + name
-            + " was not found.", e);
-      }
-    }
-    return codecClass;
-  }
-
 
   // TODO Move defaults over to a constants file.
   
@@ -82,11 +64,6 @@ public class ConfigUtils {
         TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
   }
 
-  public static boolean isIntermediateInputCompressed(Configuration conf) {
-    return conf.getBoolean(
-        TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
-  }
-
   public static <V> Class<V> getIntermediateOutputValueClass(Configuration conf) {
     Class<V> retv = (Class<V>) conf.getClass(
         TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, null,
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index 8be8fa2..daeafbc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -265,16 +265,22 @@ public class TezRuntimeUtils {
   }
 
   public static String getBufferSizeProperty(CompressionCodec codec) {
-    switch (codec.getClass().getSimpleName().toString()) {
-    case "DefaultCodec":
+    return getBufferSizeProperty(codec.getClass().getName());
+  }
+
+  public static String getBufferSizeProperty(String className) {
+    switch (className) {
+    case "org.apache.hadoop.io.compress.DefaultCodec":
       return "io.file.buffer.size";
-    case "SnappyCodec":
+    case "org.apache.hadoop.io.compress.SnappyCodec":
       return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY;
-    case "ZStandardCodec":
+    case "org.apache.hadoop.io.compress.ZStandardCodec":
       return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY;
-    case "LzoCodec":
+    case "org.apache.hadoop.io.compress.LzoCodec":
+      return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
+    case "com.hadoop.compression.lzo.LzoCodec":
       return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
-    case "Lz4Codec":
+    case "org.apache.hadoop.io.compress.Lz4Codec":
       return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY;
     default:
       return null;
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 38f079a..db5ef73 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -39,8 +39,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.GuavaShim;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
@@ -51,12 +49,11 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.combine.Combiner;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException;
-
+import org.apache.tez.runtime.library.utils.CodecUtils;
 import org.apache.tez.common.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -114,16 +111,8 @@ public class Shuffle implements ExceptionReporter {
 
     this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName());
     
+    this.codec = CodecUtils.getCodec(conf);
 
-    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
-      Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, conf);
-      // Work around needed for HADOOP-12191. Avoids the native initialization synchronization race
-      codec.getDecompressorType();
-    } else {
-      codec = null;
-    }
     this.ifileReadAhead = conf.getBoolean(
         TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
         TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 194e899..3ff74f7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -42,8 +42,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.util.IndexedSorter;
 import org.apache.hadoop.util.Progressable;
@@ -63,7 +61,7 @@ import org.apache.tez.runtime.library.common.serializer.SerializationContext;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
-
+import org.apache.tez.runtime.library.utils.CodecUtils;
 import org.apache.tez.common.Preconditions;
 
 @SuppressWarnings({"rawtypes"})
@@ -224,30 +222,7 @@ public abstract class ExternalSorter {
     numShuffleChunks = outputContext.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT);
 
     // compression
-    if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
-      Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, this.conf);
-
-      if (codec != null) {
-        Class<? extends Compressor> compressorType = null;
-        Throwable cause = null;
-        try {
-          compressorType = codec.getCompressorType();
-        } catch (RuntimeException e) {
-          cause = e;
-        }
-        if (compressorType == null) {
-          String errMsg =
-              String.format("Unable to get CompressorType for codec (%s). This is most" +
-                      " likely due to missing native libraries for the codec.",
-                  conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC));
-          throw new IOException(errMsg, cause);
-        }
-      }
-    } else {
-      codec = null;
-    }
+    this.codec = CodecUtils.getCodec(conf);
 
     this.ifileReadAhead = this.conf.getBoolean(
         TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index 6aa44e2..1b2aeff 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -30,20 +30,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.tez.runtime.library.utils.BufferUtils;
+import org.apache.tez.runtime.library.utils.CodecUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.compress.CodecPool;
@@ -823,7 +821,8 @@ public class IFile {
         decompressor = CodecPool.getDecompressor(codec);
         if (decompressor != null) {
           decompressor.reset();
-          in = getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor, compressedLength);
+          in = CodecUtils.getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor,
+              compressedLength);
         } else {
           LOG.warn("Could not obtain decompressor from CodecPool");
           in = checksumIn;
@@ -859,24 +858,6 @@ public class IFile {
       }
     }
 
-    private static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec,
-        IFileInputStream checksumIn, Decompressor decompressor, int compressedLength)
-        throws IOException {
-      String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec);
-
-      if (bufferSizeProp != null) {
-        Configurable configurableCodec = (Configurable) codec;
-        Configuration conf = configurableCodec.getConf();
-
-        int bufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
-        LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
-            DEFAULT_BUFFER_SIZE, bufferSizeProp, bufSize);
-        conf.setInt(bufferSizeProp, bufSize);
-      }
-
-      return codec.createInputStream(checksumIn, decompressor);
-    }
-
     /**
      * Read entire IFile content to disk.
      *
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
index ecc9e03..adea49f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
@@ -29,10 +29,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.Event;
@@ -43,6 +41,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+import org.apache.tez.runtime.library.utils.CodecUtils;
 
 @SuppressWarnings("rawtypes")
 public abstract class BaseUnorderedPartitionedKVWriter extends KeyValuesWriter {
@@ -141,16 +140,14 @@ public abstract class BaseUnorderedPartitionedKVWriter extends KeyValuesWriter {
     additionalSpillBytesReadCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
     numAdditionalSpillsCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
     dataViaEventSize = outputContext.getCounters().findCounter(TaskCounter.DATA_BYTES_VIA_EVENT);
-    
+
     // compression
-    if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
-      Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, this.conf);
-    } else {
-      codec = null;
+    try {
+      this.codec = CodecUtils.getCodec(conf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
-    
+
     this.ifileReadAhead = this.conf.getBoolean(
         TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
         TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 1db7869..c67c405 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -35,8 +35,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
@@ -46,14 +44,13 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
 import org.apache.tez.runtime.library.common.readers.UnorderedKVReader;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
 import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleInputEventHandlerImpl;
 import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager;
 import org.apache.tez.runtime.library.common.shuffle.impl.SimpleFetchedInputAllocator;
-
+import org.apache.tez.runtime.library.utils.CodecUtils;
 import org.apache.tez.common.Preconditions;
 
 /**
@@ -114,14 +111,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
     if (!isStarted.get()) {
       ////// Initial configuration
       memoryUpdateCallbackHandler.validateUpdateReceived();
-      CompressionCodec codec;
-      if (ConfigUtils.isIntermediateInputCompressed(conf)) {
-        Class<? extends CompressionCodec> codecClass = ConfigUtils
-            .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
-        codec = ReflectionUtils.newInstance(codecClass, conf);
-      } else {
-        codec = null;
-      }
+      CompressionCodec codec = CodecUtils.getCodec(conf);
 
       boolean compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
       boolean ifileReadAhead = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java
new file mode 100644
index 0000000..99d22c5
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class CodecUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(IFile.class);
+  private static final int DEFAULT_BUFFER_SIZE = 128 * 1024;
+
+  private CodecUtils() {
+  }
+
+  public static CompressionCodec getCodec(Configuration conf) throws IOException {
+    if (ConfigUtils.shouldCompressIntermediateOutput(conf)) {
+      Class<? extends CompressionCodec> codecClass =
+          ConfigUtils.getIntermediateOutputCompressorClass(conf, DefaultCodec.class);
+      CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
+
+      if (codec != null) {
+        Class<? extends Compressor> compressorType = null;
+        Throwable cause = null;
+        try {
+          compressorType = codec.getCompressorType();
+        } catch (RuntimeException e) {
+          cause = e;
+        }
+        if (compressorType == null) {
+          String errMsg = String.format(
+              "Unable to get CompressorType for codec (%s). This is most"
+                  + " likely due to missing native libraries for the codec.",
+              conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC));
+          throw new IOException(errMsg, cause);
+        }
+      }
+      return codec;
+    } else {
+      return null;
+    }
+  }
+
+  public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec,
+      IFileInputStream checksumIn, Decompressor decompressor, int compressedLength)
+      throws IOException {
+    String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec);
+    Configurable configurableCodec = (Configurable) codec;
+    int originalSize = configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE);
+
+    CompressionInputStream in = null;
+
+    if (bufferSizeProp != null) {
+      Configuration conf = configurableCodec.getConf();
+      int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
+      LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
+          DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize);
+
+      synchronized (codec) {
+        conf.setInt(bufferSizeProp, newBufSize);
+
+        in = codec.createInputStream(checksumIn, decompressor);
+        /*
+         * We would better reset the original buffer size into the codec. Basically the buffer size
+         * is used at 2 places.
+         *
+         * 1. It can tell the inputstream/outputstream buffersize (which is created by
+         * codec.createInputStream/codec.createOutputStream). This is something which might and
+         * should be optimized in config, as inputstreams instantiate and use their own buffer and
+         * won't reuse buffers from previous streams (TEZ-4135).
+         *
+         * 2. The same buffersize is used when a codec creates a new Compressor/Decompressor. The
+         * fundamental difference is that Compressor/Decompressor instances are expensive and reused
+         * by hadoop's CodecPool. Here is a hidden mismatch, which can happen when a codec is
+         * created with a small buffersize config. Once it creates a Compressor/Decompressor
+         * instance from its config field, the reused Compressor/Decompressor instance will be
+         * reused later, even when application handles large amount of data. This way we can end up
+         * in large stream buffers + small compressor/decompressor buffers, which can be suboptimal,
+         * moreover, it can lead to strange errors, when a compressed output exceeds the size of the
+         * buffer (TEZ-4234).
+         *
+         * An interesting outcome is that - as the codec buffersize config affects both
+         * compressor(output) and decompressor(input) paths - an altered codec config can cause the
+         * issues above for Compressor instances as well, even when we tried to leverage from
+         * smaller buffer size only on decompression paths.
+         */
+        configurableCodec.getConf().setInt(bufferSizeProp, originalSize);
+      }
+    } else {
+      in = codec.createInputStream(checksumIn, decompressor);
+    }
+
+    return in;
+  }
+}
\ No newline at end of file
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index 520dec7..446801a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -290,6 +290,7 @@ public class TestShuffleUtils {
         .thenThrow(new InternalError(codecErrorMsg));
     Decompressor mockDecoder = mock(Decompressor.class);
     CompressionCodec mockCodec = mock(ConfigurableCodecForTest.class);
+    when(((ConfigurableCodecForTest) mockCodec).getConf()).thenReturn(mock(Configuration.class));
     when(mockCodec.createDecompressor()).thenReturn(mockDecoder);
     when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class)))
         .thenReturn(mockCodecStream);
@@ -312,6 +313,7 @@ public class TestShuffleUtils {
         .thenThrow(new IllegalArgumentException(codecErrorMsg));
     Decompressor mockDecoder = mock(Decompressor.class);
     CompressionCodec mockCodec = mock(ConfigurableCodecForTest.class);
+    when(((ConfigurableCodecForTest) mockCodec).getConf()).thenReturn(mock(Configuration.class));
     when(mockCodec.createDecompressor()).thenReturn(mockDecoder);
     when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class)))
         .thenReturn(mockCodecStream);
@@ -327,7 +329,8 @@ public class TestShuffleUtils {
     CompressionInputStream mockCodecStream1 = mock(CompressionInputStream.class);
     when(mockCodecStream1.read(any(byte[].class), anyInt(), anyInt()))
         .thenThrow(new SocketTimeoutException(codecErrorMsg));
-    CompressionCodec mockCodec1 = mock(CompressionCodec.class);
+    CompressionCodec mockCodec1 = mock(ConfigurableCodecForTest.class);
+    when(((ConfigurableCodecForTest) mockCodec1).getConf()).thenReturn(mock(Configuration.class));
     when(mockCodec1.createDecompressor()).thenReturn(mockDecoder);
     when(mockCodec1.createInputStream(any(InputStream.class), any(Decompressor.class)))
         .thenReturn(mockCodecStream1);
@@ -342,7 +345,8 @@ public class TestShuffleUtils {
     CompressionInputStream mockCodecStream2 = mock(CompressionInputStream.class);
     when(mockCodecStream2.read(any(byte[].class), anyInt(), anyInt()))
         .thenThrow(new InternalError(codecErrorMsg));
-    CompressionCodec mockCodec2 = mock(CompressionCodec.class);
+    CompressionCodec mockCodec2 = mock(ConfigurableCodecForTest.class);
+    when(((ConfigurableCodecForTest) mockCodec2).getConf()).thenReturn(mock(Configuration.class));
     when(mockCodec2.createDecompressor()).thenReturn(mockDecoder);
     when(mockCodec2.createInputStream(any(InputStream.class), any(Decompressor.class)))
         .thenReturn(mockCodecStream2);
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index c74496e..bf35955 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -50,9 +50,11 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.util.NativeCodeLoader;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
@@ -66,6 +68,7 @@ import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
 import org.apache.tez.runtime.library.utils.BufferUtils;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -729,13 +732,16 @@ public class TestIFile {
 
   @Test
   public void testInMemoryBufferSize() throws IOException {
+    Configurable configurableCodec = (Configurable) codec;
+    int originalCodecBufferSize =
+        configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), -1);
+
     // for smaller amount of data, codec buffer should be sized according to compressed data length
     List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
     Writer writer = writeTestFile(false, false, data, codec);
     readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
 
-    Configurable configurableCodec = (Configurable) codec;
-    Assert.assertEquals(writer.getCompressedLength(),
+    Assert.assertEquals(originalCodecBufferSize, // original size is repaired
         configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
 
     // buffer size cannot grow infinitely with compressed data size
@@ -743,10 +749,57 @@ public class TestIFile {
     writer = writeTestFile(false, false, data, codec);
     readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
 
-    Assert.assertEquals(128*1024,
+    Assert.assertEquals(originalCodecBufferSize, // original size is repaired
         configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
   }
 
+  @Test(expected = IllegalArgumentException.class)
+  public void testSmallDataCompression() throws IOException {
+    Assume.assumeTrue(NativeCodeLoader.isNativeCodeLoaded());
+
+    tryWriteFileWithBufferSize(17, "org.apache.hadoop.io.compress.Lz4Codec");
+    tryWriteFileWithBufferSize(32, "org.apache.hadoop.io.compress.Lz4Codec");
+  }
+
+  private void tryWriteFileWithBufferSize(int bufferSize, String codecClassName)
+      throws IOException {
+    Configuration conf = new Configuration();
+
+    System.out.println("trying with buffer size: " + bufferSize);
+    conf.set(TezRuntimeUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize));
+    CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
+    CompressionCodec codecToTest =
+        codecFactory.getCodecByClassName(codecClassName);
+    List<KVPair> data = KVDataGen.generateTestDataOfKeySize(false, 1, 0);
+    writeTestFile(false, false, data, codecToTest);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testLz4CompressedDataIsLargerThanOriginal() throws IOException {
+    Assume.assumeTrue(NativeCodeLoader.isNativeCodeLoaded());
+
+    // this one succeeds
+    byte[] buf = new byte[32];
+    initBufWithNumbers(buf, 24, 45, 55, 49, 54, 55, 55, 54, 49, 48, 50, 55, 49, 56, 54, 48, 57, 48);
+    Lz4Compressor comp = new Lz4Compressor(32, false);
+    comp.setInput(buf, 0, 32);
+    comp.compress(buf, 0, 32);
+
+    // adding 1 more element makes that fail
+    buf = new byte[32];
+    initBufWithNumbers(buf, 24, 45, 55, 49, 54, 55, 55, 54, 49, 48, 50, 55, 49, 56, 54, 48, 57, 48,
+        50);
+    comp = new Lz4Compressor(32, false);
+    comp.setInput(buf, 0, 32);
+    comp.compress(buf, 0, 32);
+  }
+
+  private void initBufWithNumbers(byte[] buf, int... args) {
+    for (int i = 0; i < args.length; i++) {
+      buf[i] = (byte) args[i];
+    }
+  }
+
   /**
    * Test different options (RLE, repeat keys, compression) on reader/writer
    *