You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2019/10/11 04:28:13 UTC

[tez] branch master updated: * TEZ-4088: Create in-memory ifile writer for transferring smaller payloads (follow up of TEZ-4075)

This is an automated email from the ASF dual-hosted git repository.

rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dce6c9  * TEZ-4088: Create in-memory ifile writer for transferring smaller payloads (follow up of TEZ-4075)
3dce6c9 is described below

commit 3dce6c9ae5e02d02dd84eecf0cd920ef07298b5b
Author: rbalamohan <rb...@apache.org>
AuthorDate: Fri Oct 11 09:58:09 2019 +0530

    * TEZ-4088: Create in-memory ifile writer for transferring smaller payloads (follow up of TEZ-4075)
    
    * TEZ-4088: Create in-memory ifile writer for transferring smaller payloads (follow up of TEZ-4075) (rbalamohan, reviewed by sseth)
---
 .../library/api/TezRuntimeConfiguration.java       |   9 +
 .../runtime/library/common/sort/impl/IFile.java    | 226 +++++++++++++++++++--
 .../common/sort/impl/IFileOutputStream.java        |   4 +
 .../writers/UnorderedPartitionedKVWriter.java      |  38 +++-
 .../runtime/library/output/UnorderedKVOutput.java  |   1 +
 .../output/UnorderedPartitionedKVOutput.java       |   1 +
 .../library/common/sort/impl/TestIFile.java        | 124 +++++++++++
 .../TestUnorderedPartitionedKVOutputConfig.java    |   5 +
 8 files changed, 385 insertions(+), 23 deletions(-)

diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index d4532c9..00bb20c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -496,6 +496,14 @@ public class TezRuntimeConfiguration {
   @Private
   public static final int TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT = 512;
 
+  @Private
+  public static final String TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE =
+      TEZ_RUNTIME_PREFIX + "transfer.data-via-events.support.in-mem.file";
+
+  @Private
+  public static final boolean TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE_DEFAULT
+      = true;
+
   /**
    * If the shuffle input is on the local host bypass the http fetch and access the files directly
    */
@@ -633,6 +641,7 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
     tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
     tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
+    tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE);
     tezRuntimeKeys.add(TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS);
     tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
     tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index e460859..ab82b01 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -23,11 +23,14 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -72,6 +75,195 @@ public class IFile {
   private static final String REQ_BUFFER_SIZE_TOO_LARGE = "Size of data %d is greater than the max allowed of %d";
 
   /**
+   * IFileWriter which stores data in memory for specified limit, beyond
+   * which it falls back to file based writer. It creates files lazily on
+   * need basis and avoids any disk hit (in cases, where data fits entirely in mem).
+   * <p>
+   * This class should not make any changes to IFile logic and should just flip streams
+   * from mem to disk on need basis.
+   *
+   * During write, it verifies whether uncompressed payload can fit in memory. If so, it would
+   * store in buffer. Otherwise, it falls back to file based writer. Note that data stored
+   * internally would be in compressed format (if codec is provided). However, for easier
+   * comparison and spill over, uncompressed payload check is done. This is
+   * done intentionally, as it is not possible to know compressed data length
+   * upfront.
+   */
+  public static class FileBackedInMemIFileWriter extends Writer {
+
+    private FileSystem fs;
+    private boolean bufferFull;
+
+    // For lazy creation of file
+    private TezTaskOutput taskOutput;
+    private int totalSize;
+
+    @VisibleForTesting
+    private Path outputPath;
+    private CompressionCodec fileCodec;
+    private BoundedByteArrayOutputStream cacheStream;
+
+    private static final int checksumSize = IFileOutputStream.getCheckSumSize();
+
+    /**
+     * Note that we do not allow compression in in-mem stream.
+     * When spilled over to file, compression gets enabled.
+     *
+     * @param conf
+     * @param fs
+     * @param taskOutput
+     * @param keyClass
+     * @param valueClass
+     * @param codec
+     * @param writesCounter
+     * @param serializedBytesCounter
+     * @param cacheSize
+     * @throws IOException
+     */
+    public FileBackedInMemIFileWriter(Configuration conf, FileSystem fs,
+        TezTaskOutput taskOutput, Class keyClass, Class valueClass,
+        CompressionCodec codec,
+        TezCounter writesCounter,
+        TezCounter serializedBytesCounter,
+        int cacheSize) throws IOException {
+      super(conf, new FSDataOutputStream(createBoundedBuffer(cacheSize), null),
+          keyClass, valueClass, null, writesCounter, serializedBytesCounter);
+      this.fs = fs;
+      this.cacheStream = (BoundedByteArrayOutputStream) this.rawOut.getWrappedStream();
+      this.taskOutput = taskOutput;
+      this.bufferFull = (cacheStream == null);
+      this.totalSize = getBaseCacheSize();
+      this.fileCodec = codec;
+    }
+
+    /**
+     * For basic cache size checks: header + checksum + EOF marker
+     *
+     * @return size of the base cache needed
+     */
+    static int getBaseCacheSize() {
+      return (HEADER.length + checksumSize
+          + (2 * WritableUtils.getVIntSize(EOF_MARKER)));
+    }
+
+    boolean shouldWriteToDisk() {
+      return totalSize >= cacheStream.getLimit();
+    }
+
+    /**
+     * Create in mem stream. In it is too small, adjust it's size
+     *
+     * @param size
+     * @return in memory stream
+     */
+    public static BoundedByteArrayOutputStream createBoundedBuffer(int size) {
+      int resize = Math.max(getBaseCacheSize(), size);
+      return new BoundedByteArrayOutputStream(resize);
+    }
+
+    /**
+     * Flip over from memory to file based writer.
+     *
+     * 1. Content format: HEADER + real data + CHECKSUM. Checksum is for real
+     * data.
+     * 2. Before flipping, close checksum stream, so that checksum is written
+     * out.
+     * 3. Create relevant file based writer.
+     * 4. Write header and then real data.
+     *
+     * @throws IOException
+     */
+    private void resetToFileBasedWriter() throws IOException {
+      // Close out stream, so that data checksums are written.
+      // Buf contents = HEADER + real data + CHECKSUM
+      this.out.close();
+
+      // Get the buffer which contains data in memory
+      BoundedByteArrayOutputStream bout =
+          (BoundedByteArrayOutputStream) this.rawOut.getWrappedStream();
+
+      // Create new file based writer
+      if (outputPath == null) {
+        outputPath = taskOutput.getOutputFileForWrite();
+      }
+      LOG.info("Switching from mem stream to disk stream. File: " + outputPath);
+      FSDataOutputStream newRawOut = fs.create(outputPath);
+      this.rawOut = newRawOut;
+      this.ownOutputStream = true;
+
+      setupOutputStream(fileCodec);
+
+      // Write header to file
+      headerWritten = false;
+      writeHeader(newRawOut);
+
+      // write real data
+      int sPos = HEADER.length;
+      int len = (bout.size() - checksumSize - HEADER.length);
+      this.out.write(bout.getBuffer(), sPos, len);
+
+      bufferFull = true;
+      bout.reset();
+    }
+
+
+    @Override
+    protected void writeKVPair(byte[] keyData, int keyPos, int keyLength,
+        byte[] valueData, int valPos, int valueLength) throws IOException {
+      if (!bufferFull) {
+        // Compute actual payload size: write RLE marker, length info and then entire data.
+        totalSize += ((prevKey == REPEAT_KEY) ? V_END_MARKER_SIZE : 0)
+            + WritableUtils.getVIntSize(keyLength) + keyLength
+            + WritableUtils.getVIntSize(valueLength) + valueLength;
+
+        if (shouldWriteToDisk()) {
+          resetToFileBasedWriter();
+        }
+      }
+      super.writeKVPair(keyData, keyPos, keyLength, valueData, valPos, valueLength);
+    }
+
+    @Override
+    protected void writeValue(byte[] data, int offset, int length) throws IOException {
+      if (!bufferFull) {
+        totalSize += ((prevKey != REPEAT_KEY) ? RLE_MARKER_SIZE : 0)
+            + WritableUtils.getVIntSize(length) + length;
+
+        if (shouldWriteToDisk()) {
+          resetToFileBasedWriter();
+        }
+      }
+      super.writeValue(data, offset, length);
+    }
+
+    /**
+     * Check if data was flushed to disk.
+     *
+     * @return whether data is flushed to disk ot not
+     */
+    public boolean isDataFlushedToDisk() {
+      return bufferFull;
+    }
+
+    /**
+     * Get cached data if any
+     *
+     * @return if data is not flushed to disk, it returns in-mem contents
+     */
+    public ByteBuffer getData() {
+      if (!isDataFlushedToDisk()) {
+        return ByteBuffer.wrap(cacheStream.getBuffer(), 0, cacheStream.size());
+      }
+      return null;
+    }
+
+    @VisibleForTesting
+    void setOutputPath(Path outputPath) {
+      this.outputPath = outputPath;
+    }
+  }
+
+  /**
    * <code>IFile.Writer</code> to write out intermediate map-outputs.
    */
   @InterfaceAudience.Private
@@ -148,9 +340,28 @@ public class IFile {
       this.rawOut = outputStream;
       this.writtenRecordsCounter = writesCounter;
       this.serializedUncompressedBytes = serializedBytesCounter;
-      this.checksumOut = new IFileOutputStream(outputStream);
       this.start = this.rawOut.getPos();
       this.rle = rle;
+
+      setupOutputStream(codec);
+
+      writeHeader(outputStream);
+
+      if (keyClass != null) {
+        this.closeSerializers = true;
+        SerializationFactory serializationFactory =
+          new SerializationFactory(conf);
+        this.keySerializer = serializationFactory.getSerializer(keyClass);
+        this.keySerializer.open(buffer);
+        this.valueSerializer = serializationFactory.getSerializer(valueClass);
+        this.valueSerializer.open(buffer);
+      } else {
+        this.closeSerializers = false;
+      }
+    }
+
+    void setupOutputStream(CompressionCodec codec) throws IOException {
+      this.checksumOut = new IFileOutputStream(this.rawOut);
       if (codec != null) {
         this.compressor = CodecPool.getCompressor(codec);
         if (this.compressor != null) {
@@ -165,19 +376,6 @@ public class IFile {
       } else {
         this.out = new FSDataOutputStream(checksumOut,null);
       }
-      writeHeader(outputStream);
-
-      if (keyClass != null) {
-        this.closeSerializers = true;
-        SerializationFactory serializationFactory =
-          new SerializationFactory(conf);
-        this.keySerializer = serializationFactory.getSerializer(keyClass);
-        this.keySerializer.open(buffer);
-        this.valueSerializer = serializationFactory.getSerializer(valueClass);
-        this.valueSerializer.open(buffer);
-      } else {
-        this.closeSerializers = false;
-      }
     }
 
     public Writer(Configuration conf, FileSystem fs, Path file) throws IOException {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileOutputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileOutputStream.java
index 3198446..5ec0537 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileOutputStream.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileOutputStream.java
@@ -59,6 +59,10 @@ public class IFileOutputStream extends FilterOutputStream {
     offset = 0;
   }
 
+  public static int getCheckSumSize() {
+    return DataChecksum.Type.CRC32.size;
+  }
+
   @Override
   public void close() throws IOException {
     if (closed) {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 7165205..1197dde 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -196,6 +196,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
   private List<WrappedBuffer> filledBuffers = new ArrayList<>();
 
+  // When enabled, uses in-mem ifile writer
+  private final boolean useCachedStream;
+
   public UnorderedPartitionedKVWriter(OutputContext outputContext, Configuration conf,
       int numOutputs, long availableMemoryBytes) throws IOException {
     super(outputContext, conf, numOutputs);
@@ -223,6 +226,13 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
             TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE,
             TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT);
 
+    boolean useCachedStreamConfig = conf.getBoolean(
+        TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE,
+        TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE_DEFAULT);
+
+    this.useCachedStream = useCachedStreamConfig && (this.dataViaEventsEnabled && (numPartitions == 1)
+        && !pipelinedShuffle);
+
     if (availableMemoryBytes == 0) {
       Preconditions.checkArgument(((numPartitions == 1) && !pipelinedShuffle), "availableMemory "
           + "can be set to 0 only when numPartitions=1 and " + TezRuntimeConfiguration
@@ -287,10 +297,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
     if (numPartitions == 1 && !pipelinedShuffle) {
       //special case, where in only one partition is available.
-      finalOutPath = outputFileHandler.getOutputFileForWrite();
       skipBuffers = true;
-      writer = new IFile.Writer(conf, rfs, finalOutPath, keyClass, valClass,
-          codec, outputRecordsCounter, outputRecordBytesCounter);
+      if (this.useCachedStream) {
+        writer = new IFile.FileBackedInMemIFileWriter(conf, rfs, outputFileHandler, keyClass,
+            valClass, codec, outputRecordsCounter, outputRecordBytesCounter,
+            dataViaEventsMaxSize);
+      } else {
+        finalOutPath = outputFileHandler.getOutputFileForWrite();
+        writer = new IFile.Writer(conf, rfs, finalOutPath, keyClass, valClass,
+            codec, outputRecordsCounter, outputRecordBytesCounter);
+      }
       if (!SPILL_FILE_PERMS.equals(SPILL_FILE_PERMS.applyUMask(FsPermission.getUMask(conf)))) {
         rfs.setPermission(finalOutPath, SPILL_FILE_PERMS);
       }
@@ -311,6 +327,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         + ", reportPartitionStats=" + reportPartitionStats
         + ", dataViaEventsEnabled=" + dataViaEventsEnabled
         + ", dataViaEventsMaxSize=" + dataViaEventsMaxSize
+        + ", useCachedStreamConfig=" + useCachedStreamConfig
+        + ", useCachedStream=" + useCachedStream
     );
   }
 
@@ -702,14 +720,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
             && (writer.getCompressedLength() <= dataViaEventsMaxSize);
   }
 
-  private byte[] readDataForDME() throws IOException {
-    // TODO: Not introducing a caching layer in IFile yet.
-    byte[] buf = null;
-    try (FSDataInputStream inStream = rfs.open(finalOutPath)) {
-        buf = new byte[(int) writer.getCompressedLength()];
+  private ByteBuffer readDataForDME() throws IOException {
+    if (this.useCachedStream) {
+      return ((IFile.FileBackedInMemIFileWriter) writer).getData();
+    } else {
+      try (FSDataInputStream inStream = rfs.open(finalOutPath)) {
+        byte[] buf = new byte[(int) writer.getCompressedLength()];
         IOUtils.readFully(inStream, buf, 0, (int) writer.getCompressedLength());
+        return ByteBuffer.wrap(buf);
+      }
     }
-    return buf;
   }
 
   @Override
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 50e1e8b..85368f6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -169,6 +169,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 5c56083..536ee32 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -142,6 +142,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index 90f5374..518f733 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -29,6 +29,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 
+import com.google.protobuf.ByteString;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,6 +86,7 @@ public class TestIFile {
           new Path(System.getProperty("test.build.data", "/tmp")), TestIFile.class.getName())
           .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
       LOG.info("Using workDir: " + workDir);
+      defaultConf.set(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workDir.toString());
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -500,6 +504,126 @@ public class TestIFile {
   }
 
   @Test(timeout = 5000)
+  // Basic test
+  public void testFileBackedInMemIFileWriter() throws IOException {
+    List<KVPair> data = new ArrayList<>();
+    List<IntWritable> values = new ArrayList<>();
+    Text key = new Text("key");
+    IntWritable val = new IntWritable(1);
+    for(int i = 0; i < 5; i++) {
+      data.add(new KVPair(key, val));
+      values.add(val);
+    }
+
+    TezTaskOutputFiles tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
+    IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(defaultConf, localFs, tezTaskOutput,
+        Text.class, IntWritable.class, codec, null, null,
+        200);
+
+    writer.appendKeyValues(data.get(0).getKey(), values.iterator());
+    Text lastKey = new Text("key3");
+    IntWritable lastVal = new IntWritable(10);
+    data.add(new KVPair(lastKey, lastVal));
+    writer.append(lastKey, lastVal);
+    writer.close();
+
+    byte[] bytes = new byte[(int) writer.getRawLength()];
+    IFile.Reader.readToMemory(bytes,
+        new ByteArrayInputStream(ByteString.copyFrom(writer.getData()).toByteArray()),
+        (int) writer.getCompressedLength(), codec, false, -1);
+    readUsingInMemoryReader(bytes, data);
+  }
+
+  @Test(timeout = 5000)
+  // Basic test
+  public void testFileBackedInMemIFileWriterWithSmallBuffer() throws IOException {
+    List<KVPair> data = new ArrayList<>();
+    TezTaskOutputFiles tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
+    IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(defaultConf, localFs, tezTaskOutput,
+        Text.class, IntWritable.class, codec, null, null,
+        2);
+
+    // empty ifile
+    writer.close();
+
+    // Buffer should have self adjusted. So for this empty file, it shouldn't
+    // hit disk.
+    assertFalse("Data should have been flushed to disk", writer.isDataFlushedToDisk());
+
+    byte[] bytes = new byte[(int) writer.getRawLength()];
+    IFile.Reader.readToMemory(bytes,
+        new ByteArrayInputStream(ByteString.copyFrom(writer.getData()).toByteArray()),
+        (int) writer.getCompressedLength(), codec, false, -1);
+
+    readUsingInMemoryReader(bytes, data);
+  }
+
+  @Test(timeout = 20000)
+  // Test file spill over scenario
+  public void testFileBackedInMemIFileWriter_withSpill() throws IOException {
+    List<KVPair> data = new ArrayList<>();
+    List<IntWritable> values = new ArrayList<>();
+
+    Text key = new Text("key");
+    IntWritable val = new IntWritable(1);
+    for(int i = 0; i < 5; i++) {
+      data.add(new KVPair(key, val));
+      values.add(val);
+    }
+
+    // Setting cache limit to 20. Actual data would be around 43 bytes, so it would spill over.
+    TezTaskOutputFiles tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
+    IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(defaultConf, localFs, tezTaskOutput,
+        Text.class, IntWritable.class, codec, null, null,
+        20);
+    writer.setOutputPath(outputPath);
+
+    writer.appendKeyValues(data.get(0).getKey(), values.iterator());
+    Text lastKey = new Text("key3");
+    IntWritable lastVal = new IntWritable(10);
+
+    data.add(new KVPair(lastKey, lastVal));
+    writer.append(lastKey, lastVal);
+    writer.close();
+
+    assertTrue("Data should have been flushed to disk", writer.isDataFlushedToDisk());
+
+    // Read output content to memory
+    FSDataInputStream inStream = localFs.open(outputPath);
+    byte[] bytes = new byte[(int) writer.getRawLength()];
+
+    IFile.Reader.readToMemory(bytes, inStream,
+        (int) writer.getCompressedLength(), codec, false, -1);
+    inStream.close();
+
+    readUsingInMemoryReader(bytes, data);
+  }
+
+  @Test(timeout = 5000)
+  // Test empty file case
+  public void testEmptyFileBackedInMemIFileWriter() throws IOException {
+    List<KVPair> data = new ArrayList<>();
+    TezTaskOutputFiles
+        tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
+
+    IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(defaultConf, localFs, tezTaskOutput,
+        Text.class, IntWritable.class, codec, null, null,
+        100);
+
+    // empty ifile
+    writer.close();
+
+    byte[] bytes = new byte[(int) writer.getRawLength()];
+
+    IFile.Reader.readToMemory(bytes,
+        new ByteArrayInputStream(ByteString.copyFrom(writer.getData()).toByteArray()),
+        (int) writer.getCompressedLength(), codec, false, -1);
+
+    readUsingInMemoryReader(bytes, data);
+  }
+
+
+  @Test(timeout = 5000)
   //Test appendKeyValues feature
   public void testAppendKeyValues() throws IOException {
     List<KVPair> data = new ArrayList<KVPair>();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
index 4bcff88..bff2868 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVOutputConfig.java
@@ -87,6 +87,9 @@ public class TestUnorderedPartitionedKVOutputConfig {
             .setAdditionalConfiguration(TezRuntimeConfiguration
                 .TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE, "5120")
             .setAdditionalConfiguration(TezRuntimeConfiguration
+                .TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE,
+                "false")
+            .setAdditionalConfiguration(TezRuntimeConfiguration
                 .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, "true")
             .setAdditionalConfiguration(TezRuntimeConfiguration
                 .TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, "false")
@@ -129,6 +132,8 @@ public class TestUnorderedPartitionedKVOutputConfig {
             conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, false));
     assertEquals(5120,
             conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE, 512));
+    assertEquals(false,
+        conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE, true));
     assertEquals("io", conf.get("io.shouldExist"));
     assertEquals("file", conf.get("file.shouldExist"));
     assertEquals("fs", conf.get("fs.shouldExist"));