You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/05/28 15:26:17 UTC

[tez] branch branch-0.9 updated: TEZ-4135: Improve memory allocation when executing in-memory reads (László Bodor reviewed by Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new fb7a816  TEZ-4135: Improve memory allocation when executing in-memory reads (László Bodor reviewed by Ashutosh Chauhan)
fb7a816 is described below

commit fb7a8160e895848767fee25f857ba68c06de670b
Author: László Bodor <bo...@gmail.com>
AuthorDate: Thu May 28 17:16:40 2020 +0200

    TEZ-4135: Improve memory allocation when executing in-memory reads (László Bodor reviewed by Ashutosh Chauhan)
    
    Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
 .../runtime/library/common/TezRuntimeUtils.java    | 19 ++++++
 .../runtime/library/common/sort/impl/IFile.java    | 22 ++++++-
 .../library/common/shuffle/TestShuffleUtils.java   | 69 +++++++++++++++++++++-
 .../library/common/sort/impl/TestIFile.java        | 38 +++++++++++-
 .../tez/runtime/library/testutils/KVDataGen.java   | 20 +++++--
 5 files changed, 159 insertions(+), 9 deletions(-)

diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index 8e13c13..8be8fa2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -26,6 +26,7 @@ import java.net.URL;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.http.BaseHttpConnection;
 import org.apache.tez.http.HttpConnection;
@@ -36,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.TaskContext;
@@ -261,4 +263,21 @@ public class TezRuntimeUtils {
       in.close();
     }
   }
+
+  public static String getBufferSizeProperty(CompressionCodec codec) {
+    switch (codec.getClass().getSimpleName().toString()) {
+    case "DefaultCodec":
+      return "io.file.buffer.size";
+    case "SnappyCodec":
+      return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY;
+    case "ZStandardCodec":
+      return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY;
+    case "LzoCodec":
+      return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
+    case "Lz4Codec":
+      return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY;
+    default:
+      return null;
+    }
+  }
 }
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index e460859..52a9202 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -32,12 +32,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.utils.BufferUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableUtils;
@@ -623,7 +625,7 @@ public class IFile {
         decompressor = CodecPool.getDecompressor(codec);
         if (decompressor != null) {
           decompressor.reset();
-          in = codec.createInputStream(checksumIn, decompressor);
+          in = getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor, compressedLength);
         } else {
           LOG.warn("Could not obtain decompressor from CodecPool");
           in = checksumIn;
@@ -659,6 +661,24 @@ public class IFile {
       }
     }
 
+    private static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec,
+        IFileInputStream checksumIn, Decompressor decompressor, int compressedLength)
+        throws IOException {
+      String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec);
+
+      if (bufferSizeProp != null) {
+        Configurable configurableCodec = (Configurable) codec;
+        Configuration conf = configurableCodec.getConf();
+
+        int bufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
+        LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
+            DEFAULT_BUFFER_SIZE, bufferSizeProp, bufSize);
+        conf.setInt(bufferSizeProp, bufSize);
+      }
+
+      return codec.createInputStream(checksumIn, decompressor);
+    }
+
     /**
      * Read entire IFile content to disk.
      *
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index cc918fa..520dec7 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -3,6 +3,7 @@ package org.apache.tez.runtime.library.common.shuffle;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -10,6 +11,8 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezCommonUtils;
@@ -286,7 +289,7 @@ public class TestShuffleUtils {
     when(mockCodecStream.read(any(byte[].class), anyInt(), anyInt()))
         .thenThrow(new InternalError(codecErrorMsg));
     Decompressor mockDecoder = mock(Decompressor.class);
-    CompressionCodec mockCodec = mock(CompressionCodec.class);
+    CompressionCodec mockCodec = mock(ConfigurableCodecForTest.class);
     when(mockCodec.createDecompressor()).thenReturn(mockDecoder);
     when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class)))
         .thenReturn(mockCodecStream);
@@ -308,7 +311,7 @@ public class TestShuffleUtils {
     when(mockCodecStream.read(any(byte[].class), anyInt(), anyInt()))
         .thenThrow(new IllegalArgumentException(codecErrorMsg));
     Decompressor mockDecoder = mock(Decompressor.class);
-    CompressionCodec mockCodec = mock(CompressionCodec.class);
+    CompressionCodec mockCodec = mock(ConfigurableCodecForTest.class);
     when(mockCodec.createDecompressor()).thenReturn(mockDecoder);
     when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class)))
         .thenReturn(mockCodecStream);
@@ -396,4 +399,66 @@ public class TestShuffleUtils {
     verify(activeLogger, times(1000)).info(anyString());
     verify(aggregateLogger, times(1)).info(anyString(), Matchers.<Object[]>anyVararg());
   }
+
+  /**
+   * A codec class which implements CompressionCodec, Configurable for testing purposes.
+   */
+  public static class ConfigurableCodecForTest implements CompressionCodec, Configurable {
+
+    @Override
+    public Compressor createCompressor() {
+      return null;
+    }
+
+    @Override
+    public Decompressor createDecompressor() {
+      return null;
+    }
+
+    @Override
+    public CompressionInputStream createInputStream(InputStream arg0) throws IOException {
+      return null;
+    }
+
+    @Override
+    public CompressionInputStream createInputStream(InputStream arg0, Decompressor arg1)
+        throws IOException {
+      return null;
+    }
+
+    @Override
+    public CompressionOutputStream createOutputStream(OutputStream arg0) throws IOException {
+      return null;
+    }
+
+    @Override
+    public CompressionOutputStream createOutputStream(OutputStream arg0, Compressor arg1)
+        throws IOException {
+      return null;
+    }
+
+    @Override
+    public Class<? extends Compressor> getCompressorType() {
+      return null;
+    }
+
+    @Override
+    public Class<? extends Decompressor> getDecompressorType() {
+      return null;
+    }
+
+    @Override
+    public String getDefaultExtension() {
+      return null;
+    }
+
+    @Override
+    public Configuration getConf() {
+      return null;
+    }
+
+    @Override
+    public void setConf(Configuration arg0) {
+    }
+  }
 }
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index 90f5374..9189f50 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -18,6 +18,13 @@
 
 package org.apache.tez.runtime.library.common.sort.impl;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -32,6 +39,7 @@ import java.util.Random;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -39,7 +47,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.tez.runtime.library.utils.BufferUtils;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IntWritable;
@@ -48,18 +55,25 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
 import org.apache.tez.runtime.library.testutils.KVDataGen;
 import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
+import org.apache.tez.runtime.library.utils.BufferUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.*;
+import com.google.protobuf.ByteString;
 
 public class TestIFile {
 
@@ -590,6 +604,26 @@ public class TestIFile {
     reader.close();
   }
 
+  @Test
+  public void testInMemoryBufferSize() throws IOException {
+    // for smaller amount of data, codec buffer should be sized according to compressed data length
+    List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
+    Writer writer = writeTestFile(false, false, data, codec);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
+
+    Configurable configurableCodec = (Configurable) codec;
+    Assert.assertEquals(writer.getCompressedLength(),
+        configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
+
+    // buffer size cannot grow infinitely with compressed data size
+    data = KVDataGen.generateTestDataOfKeySize(false, 20000, rnd.nextInt(100));
+    writer = writeTestFile(false, false, data, codec);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
+
+    Assert.assertEquals(128*1024,
+        configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
+  }
+
   /**
    * Test different options (RLE, repeat keys, compression) on reader/writer
    *
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
index 318cfc5..960d371 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
@@ -34,17 +34,29 @@ public class KVDataGen {
   }
 
   /**
-   * Generate key value pair
+   * Generate key value pair.
    *
    * @param sorted whether data should be sorted by key
    * @param repeatCount number of keys to be repeated
    * @return
    */
   public static List<KVPair> generateTestData(boolean sorted, int repeatCount) {
+    return generateTestDataOfKeySize(sorted, 5, repeatCount);
+  }
+
+  /**
+   * Generate key value pair of given amount of keys.
+   *
+   * @param sorted whether data should be sorted by key
+   * @param keys number of keys
+   * @param repeatCount number of keys to be repeated
+   * @return
+   */
+  public static List<KVPair> generateTestDataOfKeySize(boolean sorted, int keys, int repeatCount) {
     List<KVPair> data = new LinkedList<KVPair>();
     Random rnd = new Random();
     KVPair kvp = null;
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < keys; i++) {
       String keyStr = (sorted) ? ("key" + i) : (rnd.nextLong() + "key" + i);
       Text key = new Text(keyStr);
       IntWritable value = new IntWritable(i + repeatCount);
@@ -52,7 +64,7 @@ public class KVDataGen {
       data.add(kvp);
       if ((repeatCount > 0) && (i % 2 == 0)) { // Repeat this key for random number of times
         int count = rnd.nextInt(5);
-        for(int j = 0; j < count; j++) {
+        for (int j = 0; j < count; j++) {
           repeatCount++;
           value.set(i + rnd.nextInt());
           kvp = new KVPair(key, value);
@@ -60,7 +72,7 @@ public class KVDataGen {
         }
       }
     }
-    //If we need to generated repeated keys, try to add some repeated keys to the end of file also.
+    // If we need to generated repeated keys, try to add some repeated keys to the end of file also.
     if (repeatCount > 0 && kvp != null) {
       data.add(kvp);
       data.add(kvp);