You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2019/10/11 04:28:13 UTC
[tez] branch master updated: * TEZ-4088: Create in-memory ifile
writer for transferring smaller payloads (follow up of TEZ-4075)
This is an automated email from the ASF dual-hosted git repository.
rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 3dce6c9 * TEZ-4088: Create in-memory ifile writer for transferring smaller payloads (follow up of TEZ-4075)
3dce6c9 is described below
commit 3dce6c9ae5e02d02dd84eecf0cd920ef07298b5b
Author: rbalamohan <rb...@apache.org>
AuthorDate: Fri Oct 11 09:58:09 2019 +0530
* TEZ-4088: Create in-memory ifile writer for transferring smaller payloads (follow up of TEZ-4075)
* TEZ-4088: Create in-memory ifile writer for transferring smaller payloads (follow up of TEZ-4075) (rbalamohan, reviewed by sseth)
---
.../library/api/TezRuntimeConfiguration.java | 9 +
.../runtime/library/common/sort/impl/IFile.java | 226 +++++++++++++++++++--
.../common/sort/impl/IFileOutputStream.java | 4 +
.../writers/UnorderedPartitionedKVWriter.java | 38 +++-
.../runtime/library/output/UnorderedKVOutput.java | 1 +
.../output/UnorderedPartitionedKVOutput.java | 1 +
.../library/common/sort/impl/TestIFile.java | 124 +++++++++++
.../TestUnorderedPartitionedKVOutputConfig.java | 5 +
8 files changed, 385 insertions(+), 23 deletions(-)
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index d4532c9..00bb20c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -496,6 +496,14 @@ public class TezRuntimeConfiguration {
@Private
public static final int TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT = 512;
+ @Private
+ public static final String TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE =
+ TEZ_RUNTIME_PREFIX + "transfer.data-via-events.support.in-mem.file";
+
+ @Private
+ public static final boolean TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE_DEFAULT
+ = true;
+
/**
* If the shuffle input is on the local host bypass the http fetch and access the files directly
*/
@@ -633,6 +641,7 @@ public class TezRuntimeConfiguration {
tezRuntimeKeys.add(TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
+ tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE);
tezRuntimeKeys.add(TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS);
tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index e460859..ab82b01 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -23,11 +23,14 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -72,6 +75,195 @@ public class IFile {
private static final String REQ_BUFFER_SIZE_TOO_LARGE = "Size of data %d is greater than the max allowed of %d";
/**
+ * IFileWriter which stores data in memory for specified limit, beyond
+ * which it falls back to file based writer. It creates files lazily on
+ * need basis and avoids any disk hit (in cases, where data fits entirely in mem).
+ * <p>
+ * This class should not make any changes to IFile logic and should just flip streams
+ * from mem to disk on need basis.
+ *
+ * During write, it verifies whether uncompressed payload can fit in memory. If so, it would
+ * store in buffer. Otherwise, it falls back to file based writer. Note that data stored
+ * internally would be in compressed format (if codec is provided). However, for easier
+ * comparison and spill over, uncompressed payload check is done. This is
+ * done intentionally, as it is not possible to know compressed data length
+ * upfront.
+ */
+ public static class FileBackedInMemIFileWriter extends Writer {
+
+ private FileSystem fs;
+ private boolean bufferFull;
+
+ // For lazy creation of file
+ private TezTaskOutput taskOutput;
+ private int totalSize;
+
+ @VisibleForTesting
+ private Path outputPath;
+ private CompressionCodec fileCodec;
+ private BoundedByteArrayOutputStream cacheStream;
+
+ private static final int checksumSize = IFileOutputStream.getCheckSumSize();
+
+ /**
+ * Note that we do not allow compression in in-mem stream.
+ * When spilled over to file, compression gets enabled.
+ *
+ * @param conf
+ * @param fs
+ * @param taskOutput
+ * @param keyClass
+ * @param valueClass
+ * @param codec
+ * @param writesCounter
+ * @param serializedBytesCounter
+ * @param cacheSize
+ * @throws IOException
+ */
+ public FileBackedInMemIFileWriter(Configuration conf, FileSystem fs,
+ TezTaskOutput taskOutput, Class keyClass, Class valueClass,
+ CompressionCodec codec,
+ TezCounter writesCounter,
+ TezCounter serializedBytesCounter,
+ int cacheSize) throws IOException {
+ super(conf, new FSDataOutputStream(createBoundedBuffer(cacheSize), null),
+ keyClass, valueClass, null, writesCounter, serializedBytesCounter);
+ this.fs = fs;
+ this.cacheStream = (BoundedByteArrayOutputStream) this.rawOut.getWrappedStream();
+ this.taskOutput = taskOutput;
+ this.bufferFull = (cacheStream == null);
+ this.totalSize = getBaseCacheSize();
+ this.fileCodec = codec;
+ }
+
+ /**
+ * For basic cache size checks: header + checksum + EOF marker
+ *
+ * @return size of the base cache needed
+ */
+ static int getBaseCacheSize() {
+ return (HEADER.length + checksumSize
+ + (2 * WritableUtils.getVIntSize(EOF_MARKER)));
+ }
+
+ boolean shouldWriteToDisk() {
+ return totalSize >= cacheStream.getLimit();
+ }
+
+ /**
+ * Create in mem stream. In it is too small, adjust it's size
+ *
+ * @param size
+ * @return in memory stream
+ */
+ public static BoundedByteArrayOutputStream createBoundedBuffer(int size) {
+ int resize = Math.max(getBaseCacheSize(), size);
+ return new BoundedByteArrayOutputStream(resize);
+ }
+
+ /**
+ * Flip over from memory to file based writer.
+ *
+ * 1. Content format: HEADER + real data + CHECKSUM. Checksum is for real
+ * data.
+ * 2. Before flipping, close checksum stream, so that checksum is written
+ * out.
+ * 3. Create relevant file based writer.
+ * 4. Write header and then real data.
+ *
+ * @throws IOException
+ */
+ private void resetToFileBasedWriter() throws IOException {
+ // Close out stream, so that data checksums are written.
+ // Buf contents = HEADER + real data + CHECKSUM
+ this.out.close();
+
+ // Get the buffer which contains data in memory
+ BoundedByteArrayOutputStream bout =
+ (BoundedByteArrayOutputStream) this.rawOut.getWrappedStream();
+
+ // Create new file based writer
+ if (outputPath == null) {
+ outputPath = taskOutput.getOutputFileForWrite();
+ }
+ LOG.info("Switching from mem stream to disk stream. File: " + outputPath);
+ FSDataOutputStream newRawOut = fs.create(outputPath);
+ this.rawOut = newRawOut;
+ this.ownOutputStream = true;
+
+ setupOutputStream(fileCodec);
+
+ // Write header to file
+ headerWritten = false;
+ writeHeader(newRawOut);
+
+ // write real data
+ int sPos = HEADER.length;
+ int len = (bout.size() - checksumSize - HEADER.length);
+ this.out.write(bout.getBuffer(), sPos, len);
+
+ bufferFull = true;
+ bout.reset();
+ }
+
+
+ @Override
+ protected void writeKVPair(byte[] keyData, int keyPos, int keyLength,
+ byte[] valueData, int valPos, int valueLength) throws IOException {
+ if (!bufferFull) {
+ // Compute actual payload size: write RLE marker, length info and then entire data.
+ totalSize += ((prevKey == REPEAT_KEY) ? V_END_MARKER_SIZE : 0)
+ + WritableUtils.getVIntSize(keyLength) + keyLength
+ + WritableUtils.getVIntSize(valueLength) + valueLength;
+
+ if (shouldWriteToDisk()) {
+ resetToFileBasedWriter();
+ }
+ }
+ super.writeKVPair(keyData, keyPos, keyLength, valueData, valPos, valueLength);
+ }
+
+ @Override
+ protected void writeValue(byte[] data, int offset, int length) throws IOException {
+ if (!bufferFull) {
+ totalSize += ((prevKey != REPEAT_KEY) ? RLE_MARKER_SIZE : 0)
+ + WritableUtils.getVIntSize(length) + length;
+
+ if (shouldWriteToDisk()) {
+ resetToFileBasedWriter();
+ }
+ }
+ super.writeValue(data, offset, length);
+ }
+
+ /**
+ * Check if data was flushed to disk.
+ *
+ * @return whether data is flushed to disk ot not
+ */
+ public boolean isDataFlushedToDisk() {
+ return bufferFull;
+ }
+
+ /**
+ * Get cached data if any
+ *
+ * @return if data is not flushed to disk, it returns in-mem contents
+ */
+ public ByteBuffer getData() {
+ if (!isDataFlushedToDisk()) {
+ return ByteBuffer.wrap(cacheStream.getBuffer(), 0, cacheStream.size());
+ }
+ return null;
+ }
+
+ @VisibleForTesting
+ void setOutputPath(Path outputPath) {
+ this.outputPath = outputPath;
+ }
+ }
+
+ /**
* <code>IFile.Writer</code> to write out intermediate map-outputs.
*/
@InterfaceAudience.Private
@@ -148,9 +340,28 @@ public class IFile {
this.rawOut = outputStream;
this.writtenRecordsCounter = writesCounter;
this.serializedUncompressedBytes = serializedBytesCounter;
- this.checksumOut = new IFileOutputStream(outputStream);
this.start = this.rawOut.getPos();
this.rle = rle;
+
+ setupOutputStream(codec);
+
+ writeHeader(outputStream);
+
+ if (keyClass != null) {
+ this.closeSerializers = true;
+ SerializationFactory serializationFactory =
+ new SerializationFactory(conf);
+ this.keySerializer = serializationFactory.getSerializer(keyClass);
+ this.keySerializer.open(buffer);
+ this.valueSerializer = serializationFactory.getSerializer(valueClass);
+ this.valueSerializer.open(buffer);
+ } else {
+ this.closeSerializers = false;
+ }
+ }
+
+ void setupOutputStream(CompressionCodec codec) throws IOException {
+ this.checksumOut = new IFileOutputStream(this.rawOut);
if (codec != null) {
this.compressor = CodecPool.getCompressor(codec);
if (this.compressor != null) {
@@ -165,19 +376,6 @@ public class IFile {
} else {
this.out = new FSDataOutputStream(checksumOut,null);
}
- writeHeader(outputStream);
-
- if (keyClass != null) {
- this.closeSerializers = true;
- SerializationFactory serializationFactory =
- new SerializationFactory(conf);
- this.keySerializer = serializationFactory.getSerializer(keyClass);
- this.keySerializer.open(buffer);
- this.valueSerializer = serializationFactory.getSerializer(valueClass);
- this.valueSerializer.open(buffer);
- } else {
- this.closeSerializers = false;
- }
}
public Writer(Configuration conf, FileSystem fs, Path file) throws IOException {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileOutputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileOutputStream.java
index 3198446..5ec0537 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileOutputStream.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileOutputStream.java
@@ -59,6 +59,10 @@ public class IFileOutputStream extends FilterOutputStream {
offset = 0;
}
+ public static int getCheckSumSize() {
+ return DataChecksum.Type.CRC32.size;
+ }
+
@Override
public void close() throws IOException {
if (closed) {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 7165205..1197dde 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -196,6 +196,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
private List<WrappedBuffer> filledBuffers = new ArrayList<>();
+ // When enabled, uses in-mem ifile writer
+ private final boolean useCachedStream;
+
public UnorderedPartitionedKVWriter(OutputContext outputContext, Configuration conf,
int numOutputs, long availableMemoryBytes) throws IOException {
super(outputContext, conf, numOutputs);
@@ -223,6 +226,13 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE,
TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT);
+ boolean useCachedStreamConfig = conf.getBoolean(
+ TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE,
+ TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE_DEFAULT);
+
+ this.useCachedStream = useCachedStreamConfig && (this.dataViaEventsEnabled && (numPartitions == 1)
+ && !pipelinedShuffle);
+
if (availableMemoryBytes == 0) {
Preconditions.checkArgument(((numPartitions == 1) && !pipelinedShuffle), "availableMemory "
+ "can be set to 0 only when numPartitions=1 and " + TezRuntimeConfiguration
@@ -287,10 +297,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
if (numPartitions == 1 && !pipelinedShuffle) {
//special case, where in only one partition is available.
- finalOutPath = outputFileHandler.getOutputFileForWrite();
skipBuffers = true;
- writer = new IFile.Writer(conf, rfs, finalOutPath, keyClass, valClass,
- codec, outputRecordsCounter, outputRecordBytesCounter);
+ if (this.useCachedStream) {
+ writer = new IFile.FileBackedInMemIFileWriter(conf, rfs, outputFileHandler, keyClass,
+ valClass, codec, outputRecordsCounter, outputRecordBytesCounter,
+ dataViaEventsMaxSize);
+ } else {
+ finalOutPath = outputFileHandler.getOutputFileForWrite();
+ writer = new IFile.Writer(conf, rfs, finalOutPath, keyClass, valClass,
+ codec, outputRecordsCounter, outputRecordBytesCounter);
+ }
if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) {
rfs.setPermission(finalOutPath, SPILL_FILE_PERMS);
}
@@ -311,6 +327,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
+ ", reportPartitionStats=" + reportPartitionStats
+ ", dataViaEventsEnabled=" + dataViaEventsEnabled
+ ", dataViaEventsMaxSize=" + dataViaEventsMaxSize
+ + ", useCachedStreamConfig=" + useCachedStreamConfig
+ + ", useCachedStream=" + useCachedStream
);
}
@@ -702,14 +720,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
&& (writer.getCompressedLength() <= dataViaEventsMaxSize);
}
- private byte[] readDataForDME() throws IOException {
- // TODO: Not introducing a caching layer in IFile yet.
- byte[] buf = null;
- try (FSDataInputStream inStream = rfs.open(finalOutPath)) {
- buf = new byte[(int) writer.getCompressedLength()];
+ private ByteBuffer readDataForDME() throws IOException {
+ if (this.useCachedStream) {
+ return ((IFile.FileBackedInMemIFileWriter) writer).getData();
+ } else {
+ try (FSDataInputStream inStream = rfs.open(finalOutPath)) {
+ byte[] buf = new byte[(int) writer.getCompressedLength()];
IOUtils.readFully(inStream, buf, 0, (int) writer.getCompressedLength());
+ return ByteBuffer.wrap(buf);
+ }
}
- return buf;
}
@Override
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 50e1e8b..85368f6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -169,6 +169,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 5c56083..536ee32 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -142,6 +142,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index 90f5374..518f733 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -29,6 +29,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Random;
+import com.google.protobuf.ByteString;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,6 +86,7 @@ public class TestIFile {
new Path(System.getProperty("test.build.data", "/tmp")), TestIFile.class.getName())
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
LOG.info("Using workDir: " + workDir);
+ defaultConf.set(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workDir.toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -500,6 +504,126 @@ public class TestIFile {
}
@Test(timeout = 5000)
+ // Basic test
+ public void testFileBackedInMemIFileWriter() throws IOException {
+ List<KVPair> data = new ArrayList<>();
+ List<IntWritable> values = new ArrayList<>();
+ Text key = new Text("key");
+ IntWritable val = new IntWritable(1);
+ for(int i = 0; i < 5; i++) {
+ data.add(new KVPair(key, val));
+ values.add(val);
+ }
+
+ TezTaskOutputFiles tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
+ IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(defaultConf, localFs, tezTaskOutput,
+ Text.class, IntWritable.class, codec, null, null,
+ 200);
+
+ writer.appendKeyValues(data.get(0).getKey(), values.iterator());
+ Text lastKey = new Text("key3");
+ IntWritable lastVal = new IntWritable(10);
+ data.add(new KVPair(lastKey, lastVal));
+ writer.append(lastKey, lastVal);
+ writer.close();
+
+ byte[] bytes = new byte[(int) writer.getRawLength()];
+ IFile.Reader.readToMemory(bytes,
+ new ByteArrayInputStream(ByteString.copyFrom(writer.getData()).toByteArray()),
+ (int) writer.getCompressedLength(), codec, false, -1);
+ readUsingInMemoryReader(bytes, data);
+ }
+
+ @Test(timeout = 5000)
+ // Basic test
+ public void testFileBackedInMemIFileWriterWithSmallBuffer() throws IOException {
+ List<KVPair> data = new ArrayList<>();
+ TezTaskOutputFiles tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
+ IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(defaultConf, localFs, tezTaskOutput,
+ Text.class, IntWritable.class, codec, null, null,
+ 2);
+
+ // empty ifile
+ writer.close();
+
+ // Buffer should have self adjusted. So for this empty file, it shouldn't
+ // hit disk.
+ assertFalse("Data should have been flushed to disk", writer.isDataFlushedToDisk());
+
+ byte[] bytes = new byte[(int) writer.getRawLength()];
+ IFile.Reader.readToMemory(bytes,
+ new ByteArrayInputStream(ByteString.copyFrom(writer.getData()).toByteArray()),
+ (int) writer.getCompressedLength(), codec, false, -1);
+
+ readUsingInMemoryReader(bytes, data);
+ }
+
+ @Test(timeout = 20000)
+ // Test file spill over scenario
+ public void testFileBackedInMemIFileWriter_withSpill() throws IOException {
+ List<KVPair> data = new ArrayList<>();
+ List<IntWritable> values = new ArrayList<>();
+
+ Text key = new Text("key");
+ IntWritable val = new IntWritable(1);
+ for(int i = 0; i < 5; i++) {
+ data.add(new KVPair(key, val));
+ values.add(val);
+ }
+
+ // Setting cache limit to 20. Actual data would be around 43 bytes, so it would spill over.
+ TezTaskOutputFiles tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
+ IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(defaultConf, localFs, tezTaskOutput,
+ Text.class, IntWritable.class, codec, null, null,
+ 20);
+ writer.setOutputPath(outputPath);
+
+ writer.appendKeyValues(data.get(0).getKey(), values.iterator());
+ Text lastKey = new Text("key3");
+ IntWritable lastVal = new IntWritable(10);
+
+ data.add(new KVPair(lastKey, lastVal));
+ writer.append(lastKey, lastVal);
+ writer.close();
+
+ assertTrue("Data should have been flushed to disk", writer.isDataFlushedToDisk());
+
+ // Read output content to memory
+ FSDataInputStream inStream = localFs.open(outputPath);
+ byte[] bytes = new byte[(int) writer.getRawLength()];
+
+ IFile.Reader.readToMemory(bytes, inStream,
+ (int) writer.getCompressedLength(), codec, false, -1);
+ inStream.close();
+
+ readUsingInMemoryReader(bytes, data);
+ }
+
+ @Test(timeout = 5000)
+ // Test empty file case
+ public void testEmptyFileBackedInMemIFileWriter() throws IOException {
+ List<KVPair> data = new ArrayList<>();
+ TezTaskOutputFiles
+ tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
+
+ IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(defaultConf, localFs, tezTaskOutput,
+ Text.class, IntWritable.class, codec, null, null,
+ 100);
+
+ // empty ifile
+ writer.close();
+
+ byte[] bytes = new byte[(int) writer.getRawLength()];
+
+ IFile.Reader.readToMemory(bytes,
+ new ByteArrayInputStream(ByteString.copyFrom(writer.getData()).toByteArray()),
+ (int) writer.getCompressedLength(), codec, false, -1);
+
+ readUsingInMemoryReader(bytes, data);
+ }
+
+
+ @Test(timeout = 5000)
//Test appendKeyValues feature
public void testAppendKeyValues() throws IOException {
List<KVPair> data = new ArrayList<KVPair>();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
index 4bcff88..bff2868 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
@@ -87,6 +87,9 @@ public class TestUnorderedPartitionedKVOutputConfig {
.setAdditionalConfiguration(TezRuntimeConfiguration
.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE, "5120")
.setAdditionalConfiguration(TezRuntimeConfiguration
+ .TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE,
+ "false")
+ .setAdditionalConfiguration(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, "true")
.setAdditionalConfiguration(TezRuntimeConfiguration
.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, "false")
@@ -129,6 +132,8 @@ public class TestUnorderedPartitionedKVOutputConfig {
conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, false));
assertEquals(5120,
conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE, 512));
+ assertEquals(false,
+ conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE, true));
assertEquals("io", conf.get("io.shouldExist"));
assertEquals("file", conf.get("file.shouldExist"));
assertEquals("fs", conf.get("fs.shouldExist"));