You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/05/28 15:26:17 UTC
[tez] branch branch-0.9 updated: TEZ-4135: Improve memory allocation when executing in-memory reads (László Bodor reviewed by Ashutosh Chauhan)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new fb7a816 TEZ-4135: Improve memory allocation when executing in-memory reads (László Bodor reviewed by Ashutosh Chauhan)
fb7a816 is described below
commit fb7a8160e895848767fee25f857ba68c06de670b
Author: László Bodor <bo...@gmail.com>
AuthorDate: Thu May 28 17:16:40 2020 +0200
TEZ-4135: Improve memory allocation when executing in-memory reads (László Bodor reviewed by Ashutosh Chauhan)
Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
.../runtime/library/common/TezRuntimeUtils.java | 19 ++++++
.../runtime/library/common/sort/impl/IFile.java | 22 ++++++-
.../library/common/shuffle/TestShuffleUtils.java | 69 +++++++++++++++++++++-
.../library/common/sort/impl/TestIFile.java | 38 +++++++++++-
.../tez/runtime/library/testutils/KVDataGen.java | 20 +++++--
5 files changed, 159 insertions(+), 9 deletions(-)
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index 8e13c13..8be8fa2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -26,6 +26,7 @@ import java.net.URL;
import java.nio.ByteBuffer;
import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.http.BaseHttpConnection;
import org.apache.tez.http.HttpConnection;
@@ -36,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.TaskContext;
@@ -261,4 +263,21 @@ public class TezRuntimeUtils {
in.close();
}
}
+
+ public static String getBufferSizeProperty(CompressionCodec codec) {
+ switch (codec.getClass().getSimpleName().toString()) {
+ case "DefaultCodec":
+ return "io.file.buffer.size";
+ case "SnappyCodec":
+ return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY;
+ case "ZStandardCodec":
+ return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY;
+ case "LzoCodec":
+ return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
+ case "Lz4Codec":
+ return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY;
+ default:
+ return null;
+ }
+ }
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index e460859..52a9202 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -32,12 +32,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.utils.BufferUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils;
@@ -623,7 +625,7 @@ public class IFile {
decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
decompressor.reset();
- in = codec.createInputStream(checksumIn, decompressor);
+ in = getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor, compressedLength);
} else {
LOG.warn("Could not obtain decompressor from CodecPool");
in = checksumIn;
@@ -659,6 +661,24 @@ public class IFile {
}
}
+ private static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec,
+ IFileInputStream checksumIn, Decompressor decompressor, int compressedLength)
+ throws IOException {
+ String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec);
+
+ if (bufferSizeProp != null) {
+ Configurable configurableCodec = (Configurable) codec;
+ Configuration conf = configurableCodec.getConf();
+
+ int bufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
+ LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
+ DEFAULT_BUFFER_SIZE, bufferSizeProp, bufSize);
+ conf.setInt(bufferSizeProp, bufSize);
+ }
+
+ return codec.createInputStream(checksumIn, decompressor);
+ }
+
/**
* Read entire IFile content to disk.
*
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index cc918fa..520dec7 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -3,6 +3,7 @@ package org.apache.tez.runtime.library.common.shuffle;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -10,6 +11,8 @@ import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezCommonUtils;
@@ -286,7 +289,7 @@ public class TestShuffleUtils {
when(mockCodecStream.read(any(byte[].class), anyInt(), anyInt()))
.thenThrow(new InternalError(codecErrorMsg));
Decompressor mockDecoder = mock(Decompressor.class);
- CompressionCodec mockCodec = mock(CompressionCodec.class);
+ CompressionCodec mockCodec = mock(ConfigurableCodecForTest.class);
when(mockCodec.createDecompressor()).thenReturn(mockDecoder);
when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class)))
.thenReturn(mockCodecStream);
@@ -308,7 +311,7 @@ public class TestShuffleUtils {
when(mockCodecStream.read(any(byte[].class), anyInt(), anyInt()))
.thenThrow(new IllegalArgumentException(codecErrorMsg));
Decompressor mockDecoder = mock(Decompressor.class);
- CompressionCodec mockCodec = mock(CompressionCodec.class);
+ CompressionCodec mockCodec = mock(ConfigurableCodecForTest.class);
when(mockCodec.createDecompressor()).thenReturn(mockDecoder);
when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class)))
.thenReturn(mockCodecStream);
@@ -396,4 +399,66 @@ public class TestShuffleUtils {
verify(activeLogger, times(1000)).info(anyString());
verify(aggregateLogger, times(1)).info(anyString(), Matchers.<Object[]>anyVararg());
}
+
+ /**
+ * A codec class which implements CompressionCodec, Configurable for testing purposes.
+ */
+ public static class ConfigurableCodecForTest implements CompressionCodec, Configurable {
+
+ @Override
+ public Compressor createCompressor() {
+ return null;
+ }
+
+ @Override
+ public Decompressor createDecompressor() {
+ return null;
+ }
+
+ @Override
+ public CompressionInputStream createInputStream(InputStream arg0) throws IOException {
+ return null;
+ }
+
+ @Override
+ public CompressionInputStream createInputStream(InputStream arg0, Decompressor arg1)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream arg0) throws IOException {
+ return null;
+ }
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream arg0, Compressor arg1)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public Class<? extends Compressor> getCompressorType() {
+ return null;
+ }
+
+ @Override
+ public Class<? extends Decompressor> getDecompressorType() {
+ return null;
+ }
+
+ @Override
+ public String getDefaultExtension() {
+ return null;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return null;
+ }
+
+ @Override
+ public void setConf(Configuration arg0) {
+ }
+ }
}
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index 90f5374..9189f50 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -18,6 +18,13 @@
package org.apache.tez.runtime.library.common.sort.impl;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -32,6 +39,7 @@ import java.util.Random;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -39,7 +47,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.tez.runtime.library.utils.BufferUtils;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IntWritable;
@@ -48,18 +55,25 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.testutils.KVDataGen;
import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
+import org.apache.tez.runtime.library.utils.BufferUtils;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import static org.junit.Assert.*;
+import com.google.protobuf.ByteString;
public class TestIFile {
@@ -590,6 +604,26 @@ public class TestIFile {
reader.close();
}
+ @Test
+ public void testInMemoryBufferSize() throws IOException {
+ // for smaller amount of data, codec buffer should be sized according to compressed data length
+ List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
+ Writer writer = writeTestFile(false, false, data, codec);
+ readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
+
+ Configurable configurableCodec = (Configurable) codec;
+ Assert.assertEquals(writer.getCompressedLength(),
+ configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
+
+ // buffer size cannot grow infinitely with compressed data size
+ data = KVDataGen.generateTestDataOfKeySize(false, 20000, rnd.nextInt(100));
+ writer = writeTestFile(false, false, data, codec);
+ readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
+
+ Assert.assertEquals(128*1024,
+ configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
+ }
+
/**
* Test different options (RLE, repeat keys, compression) on reader/writer
*
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
index 318cfc5..960d371 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
@@ -34,17 +34,29 @@ public class KVDataGen {
}
/**
- * Generate key value pair
+ * Generate key value pair.
*
* @param sorted whether data should be sorted by key
* @param repeatCount number of keys to be repeated
* @return
*/
public static List<KVPair> generateTestData(boolean sorted, int repeatCount) {
+ return generateTestDataOfKeySize(sorted, 5, repeatCount);
+ }
+
+ /**
+ * Generate key value pair of given amount of keys.
+ *
+ * @param sorted whether data should be sorted by key
+ * @param keys number of keys
+ * @param repeatCount number of keys to be repeated
+ * @return
+ */
+ public static List<KVPair> generateTestDataOfKeySize(boolean sorted, int keys, int repeatCount) {
List<KVPair> data = new LinkedList<KVPair>();
Random rnd = new Random();
KVPair kvp = null;
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < keys; i++) {
String keyStr = (sorted) ? ("key" + i) : (rnd.nextLong() + "key" + i);
Text key = new Text(keyStr);
IntWritable value = new IntWritable(i + repeatCount);
@@ -52,7 +64,7 @@ public class KVDataGen {
data.add(kvp);
if ((repeatCount > 0) && (i % 2 == 0)) { // Repeat this key for random number of times
int count = rnd.nextInt(5);
- for(int j = 0; j < count; j++) {
+ for (int j = 0; j < count; j++) {
repeatCount++;
value.set(i + rnd.nextInt());
kvp = new KVPair(key, value);
@@ -60,7 +72,7 @@ public class KVDataGen {
}
}
}
- //If we need to generated repeated keys, try to add some repeated keys to the end of file also.
+ // If we need to generated repeated keys, try to add some repeated keys to the end of file also.
if (repeatCount > 0 && kvp != null) {
data.add(kvp);
data.add(kvp);