You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/10/19 15:04:15 UTC

[incubator-uniffle] branch master updated: [#1231] feat(tez): Support remote spill in merge stage. (#1245)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f56e59ea [#1231] feat(tez): Support remote spill in merge stage. (#1245)
4f56e59ea is described below

commit 4f56e59ea886f4504358f9a24a85575e23b049f2
Author: zhengchenyu <zh...@163.com>
AuthorDate: Thu Oct 19 23:04:09 2023 +0800

    [#1231] feat(tez): Support remote spill in merge stage. (#1245)
    
    ### What changes were proposed in this pull request?
    
    Add new merger manager for remote spill.
    
    ### Why are the changes needed?
    
    In a cloud scenario, the storage of the machine that performs the job is very small. The merge phase often uses disk. The remote spill allows remote shuffle service job to use as little disk as possible.
    
    Fix: #1231
    
    ### Does this PR introduce _any_ user-facing change?
    
    | config | description |
    | --- | --- |
    | tez.rss.reduce.remote.spill.enable | Whether to enable remote spill |
    | tez.rss.remote.spill.storage.path | the remote spill path |
    | rss.reduce.remote.spill.replication | the replication of remote fs |
    | rss.reduce.remote.spill.retries | try times for remote fs |
    
    ### How was this patch tested?
    
    unit test, integration test, tez examples on cluster.
---
 client-tez/pom.xml                                 |   5 +
 .../java/org/apache/tez/common/RssTezConfig.java   |  12 +
 .../shuffle/orderedgrouped/RssInMemoryMerger.java  | 224 ++++++++
 .../shuffle/orderedgrouped/RssMergeManager.java    | 606 +++++++++++++++++++++
 .../common/shuffle/orderedgrouped/RssShuffle.java  | 114 +++-
 .../orderedgrouped/RssInMemoryMergerTest.java      | 244 +++++++++
 .../orderedgrouped/RssMergeManagerTest.java        | 290 ++++++++++
 .../uniffle/test/TezIntegrationTestBase.java       |  12 +-
 8 files changed, 1476 insertions(+), 31 deletions(-)

diff --git a/client-tez/pom.xml b/client-tez/pom.xml
index df0409616..4a5f09ced 100644
--- a/client-tez/pom.xml
+++ b/client-tez/pom.xml
@@ -116,6 +116,11 @@
           </exclusion>
         </exclusions>
       </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-minicluster</artifactId>
+          <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
index cffaf7b97..bcbb8a95d 100644
--- a/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
+++ b/client-tez/src/main/java/org/apache/tez/common/RssTezConfig.java
@@ -211,6 +211,18 @@ public class RssTezConfig {
       TEZ_RSS_CONFIG_PREFIX + "rss.avoid.recompute.succeeded.task";
   public static final boolean RSS_AVOID_RECOMPUTE_SUCCEEDED_TASK_DEFAULT = false;
 
+  public static final String RSS_REDUCE_REMOTE_SPILL_ENABLED =
+      TEZ_RSS_CONFIG_PREFIX + "rss.reduce.remote.spill.enable";
+  public static final boolean RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT = false;
+  public static final String RSS_REDUCE_REMOTE_SPILL_REPLICATION =
+      TEZ_RSS_CONFIG_PREFIX + "rss.reduce.remote.spill.replication";
+  public static final int RSS_REDUCE_REMOTE_SPILL_REPLICATION_DEFAULT = 1;
+  public static final String RSS_REDUCE_REMOTE_SPILL_RETRIES =
+      TEZ_RSS_CONFIG_PREFIX + "rss.reduce.remote.spill.retries";
+  public static final int RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT = 5;
+  public static final String RSS_REMOTE_SPILL_STORAGE_PATH =
+      TEZ_RSS_CONFIG_PREFIX + "rss.remote.spill.storage.path";
+
   public static RssConf toRssConf(Configuration jobConf) {
     RssConf rssConf = new RssConf();
     for (Map.Entry<String, String> entry : jobConf) {
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssInMemoryMerger.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssInMemoryMerger.java
new file mode 100644
index 000000000..11c003e89
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssInMemoryMerger.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FileChunk;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.combine.Combiner;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RssInMemoryMerger extends MergeThread<MapOutput> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RssInMemoryMerger.class);
+
+  /*
+   * Spill path must be unique, is composed of unique_id, src_id and spill_id.
+   * unique_id is task attempt id + io context increasing id.
+   * src_id the source input id, is determined by partition and source task.
+   * spill_id is the spill id of map task.
+   * */
+  private static final String SPILL_FILE_PATTERN = "%s/%s_src_%d_spill_%d.out";
+
+  private final InputContext inputContext;
+  private final Configuration conf;
+  private final RssMergeManager manager;
+  private final CompressionCodec codec;
+  private final Combiner combiner;
+
+  private final FileSystem remoteFs;
+  private final Path spillPath;
+  private final String appAttemptId;
+  private volatile InputAttemptIdentifier srcTaskIdentifier;
+  private volatile Path outputPath;
+
+  private final TezCounter spilledRecordsCounter;
+  private final TezCounter numMemToRemoteMerges;
+  private final TezCounter additionalBytesRead;
+  private final TezCounter additionalBytesWritten;
+
+  public enum Counter {
+    NUM_MEM_TO_REMOTE_MERGES
+  }
+
+  public RssInMemoryMerger(
+      RssMergeManager manager,
+      Configuration conf,
+      InputContext inputContext,
+      Combiner combiner,
+      ExceptionReporter reporter,
+      CompressionCodec codec,
+      FileSystem remoteFs,
+      String spillPath,
+      String appAttemptId) {
+    super(manager, Integer.MAX_VALUE, reporter);
+    this.setName(
+        "MemtoRemoteMerger ["
+            + TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName())
+            + "]");
+    this.setDaemon(true);
+    this.manager = manager;
+    this.inputContext = inputContext;
+    this.conf = conf;
+    this.remoteFs = remoteFs;
+    this.spillPath = new Path(spillPath);
+    this.appAttemptId = appAttemptId;
+    this.codec = codec;
+    this.combiner = combiner;
+    this.spilledRecordsCounter =
+        inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
+    this.numMemToRemoteMerges =
+        inputContext.getCounters().findCounter(Counter.NUM_MEM_TO_REMOTE_MERGES);
+    this.additionalBytesRead =
+        inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
+    this.additionalBytesWritten =
+        inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
+  }
+
+  @Override
+  public void merge(List<MapOutput> inputs) throws IOException, InterruptedException {
+    if (inputs == null || inputs.size() == 0) {
+      return;
+    }
+    numMemToRemoteMerges.increment(1);
+    inputContext.notifyProgress();
+
+    this.srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
+    List<TezMerger.Segment> inMemorySegments = new ArrayList();
+
+    manager.createInMemorySegments(inputs, inMemorySegments, 0);
+    int noInMemorySegments = inMemorySegments.size();
+
+    String mergedFile =
+        String.format(
+            SPILL_FILE_PATTERN,
+            appAttemptId,
+            inputContext.getUniqueIdentifier(),
+            srcTaskIdentifier.getInputIdentifier(),
+            srcTaskIdentifier.getSpillEventId());
+    this.outputPath = new Path(spillPath, mergedFile);
+
+    Writer writer = null;
+    long outFileLen = 0;
+    try {
+      writer =
+          new Writer(
+              conf,
+              remoteFs,
+              outputPath,
+              ConfigUtils.getIntermediateInputKeyClass(conf),
+              ConfigUtils.getIntermediateInputValueClass(conf),
+              codec,
+              null,
+              null);
+
+      TezRawKeyValueIterator rIter;
+      LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments...");
+
+      // When factor is greater or equal to the size of segements, we will ignore
+      // intermediate merger, so tmpeDir and tmpFs is useless
+      Path tmpDir = null;
+      FileSystem tmpFs = null;
+      rIter =
+          TezMerger.merge(
+              conf,
+              tmpFs,
+              ConfigUtils.getIntermediateInputKeyClass(conf),
+              ConfigUtils.getIntermediateInputValueClass(conf),
+              inMemorySegments,
+              inMemorySegments.size(),
+              tmpDir,
+              ConfigUtils.getIntermediateInputKeyComparator(conf),
+              manager.getProgressable(),
+              spilledRecordsCounter,
+              null,
+              additionalBytesRead,
+              null);
+
+      if (null == combiner) {
+        TezMerger.writeFile(
+            rIter,
+            writer,
+            manager.getProgressable(),
+            TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+      } else {
+        runCombineProcessor(rIter, writer);
+      }
+      // The compressed length of writer is calculated when called Writer::close, we must
+      // update the counter after Writer::close. Counter should be updated in normal
+      // execution flow, so do not update counter in finally block.
+      writer.close();
+      additionalBytesWritten.increment(writer.getCompressedLength());
+      writer = null;
+      outFileLen = remoteFs.getFileStatus(outputPath).getLen();
+      LOG.info(
+          inputContext.getUniqueIdentifier()
+              + " Merge of the "
+              + noInMemorySegments
+              + " files in-memory complete."
+              + " Remote file is "
+              + outputPath
+              + " of size "
+              + outFileLen);
+    } catch (IOException e) {
+      remoteFs.delete(outputPath, true);
+      throw e;
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+    }
+
+    // Note the output of the merge
+    manager.closeOnDiskFile(new FileChunk(outputPath, 0, outFileLen));
+  }
+
+  void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer)
+      throws IOException, InterruptedException {
+    combiner.combine(kvIter, writer);
+  }
+
+  @Override
+  public void cleanup(List<MapOutput> inputs, boolean deleteData)
+      throws IOException, InterruptedException {
+    if (deleteData) {
+      // Additional check at task level
+      if (this.manager.isCleanup()) {
+        LOG.info("Try deleting stale data");
+        MergeManager.cleanup(remoteFs, outputPath);
+      }
+    }
+  }
+}
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssMergeManager.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssMergeManager.java
new file mode 100644
index 000000000..f62a56bf3
--- /dev/null
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssMergeManager.java
@@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FileChunk;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Time;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.combine.Combiner;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+
+import static org.apache.tez.common.RssTezConfig.RSS_REMOTE_SPILL_STORAGE_PATH;
+
+public class RssMergeManager extends MergeManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MergeManager.class);
+
+  private Configuration conf;
+  private InputContext inputContext;
+  private ExceptionReporter exceptionReporter;
+  private RssInMemoryMerger inMemoryMerger;
+  private Combiner combiner;
+  private CompressionCodec codec;
+  private boolean ifileReadAhead;
+  private int ifileReadAheadLength;
+  private int ifileBufferSize;
+  private String appAttemptId;
+
+  private final long initialMemoryAvailable;
+  private final long memoryLimit;
+  private final long maxSingleShuffleLimit;
+  private long mergeThreshold;
+  private long commitMemory;
+  private long usedMemory;
+
+  private String spillBasePath;
+  private FileSystem remoteFS;
+
+  // Variables for stats and logging
+  private long lastInMemSegmentLogTime = -1L;
+  private final SegmentStatsTracker statsInMemTotal = new SegmentStatsTracker();
+  private final SegmentStatsTracker statsInMemLastLog = new SegmentStatsTracker();
+
+  private final TezCounter spilledRecordsCounter;
+  private final TezCounter mergedMapOutputsCounter;
+  private final TezCounter additionalBytesRead;
+
+  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+  private final boolean cleanup;
+
+  private final Progressable progressable =
+      new Progressable() {
+        @Override
+        public void progress() {
+          inputContext.notifyProgress();
+        }
+      };
+
+  public RssMergeManager(
+      Configuration conf,
+      FileSystem localFS,
+      InputContext inputContext,
+      Combiner combiner,
+      TezCounter spilledRecordsCounter,
+      TezCounter reduceCombineInputCounter,
+      TezCounter mergedMapOutputsCounter,
+      ExceptionReporter exceptionReporter,
+      long initialMemoryAvailable,
+      CompressionCodec codec,
+      boolean ifileReadAheadEnabled,
+      int ifileReadAheadLength,
+      Configuration remoteConf,
+      int replication,
+      int retries,
+      String appAttemptId) {
+    super(
+        conf,
+        localFS,
+        null,
+        inputContext,
+        combiner,
+        spilledRecordsCounter,
+        reduceCombineInputCounter,
+        mergedMapOutputsCounter,
+        exceptionReporter,
+        initialMemoryAvailable,
+        codec,
+        ifileReadAheadEnabled,
+        ifileReadAheadLength);
+    this.conf = conf;
+    this.inputContext = inputContext;
+    this.exceptionReporter = exceptionReporter;
+    this.codec = codec;
+    this.combiner = combiner;
+    this.initialMemoryAvailable = initialMemoryAvailable;
+    this.ifileReadAhead = ifileReadAheadEnabled;
+    if (this.ifileReadAhead) {
+      this.ifileReadAheadLength = ifileReadAheadLength;
+    } else {
+      this.ifileReadAheadLength = 0;
+    }
+    this.ifileBufferSize =
+        conf.getInt(
+            "io.file.buffer.size", TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
+    this.appAttemptId = appAttemptId;
+    this.cleanup =
+        conf.getBoolean(
+            TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT,
+            TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT);
+
+    // Set memory, here ignore some check which have done in MergeManager
+    final float maxInMemCopyUse =
+        conf.getFloat(
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
+    long memLimit =
+        conf.getLong(
+            Constants.TEZ_RUNTIME_TASK_MEMORY,
+            (long) (inputContext.getTotalMemoryAvailableToTask() * maxInMemCopyUse));
+    if (this.initialMemoryAvailable < memLimit) {
+      this.memoryLimit = this.initialMemoryAvailable;
+    } else {
+      this.memoryLimit = memLimit;
+    }
+    this.mergeThreshold =
+        (long)
+            (this.memoryLimit
+                * conf.getFloat(
+                    TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT,
+                    TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT_DEFAULT));
+    final float singleShuffleMemoryLimitPercent =
+        conf.getFloat(
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
+            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT_DEFAULT);
+    this.maxSingleShuffleLimit =
+        (long) Math.min((memoryLimit * singleShuffleMemoryLimitPercent), Integer.MAX_VALUE);
+
+    // counter
+    this.spilledRecordsCounter = spilledRecordsCounter;
+    this.mergedMapOutputsCounter = mergedMapOutputsCounter;
+    this.additionalBytesRead =
+        inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
+
+    // remote fs
+    Configuration remoteConfCopied = new Configuration(remoteConf);
+    this.spillBasePath = conf.get(RSS_REMOTE_SPILL_STORAGE_PATH);
+    try {
+      remoteConfCopied.setInt("dfs.replication", replication);
+      remoteConfCopied.setInt("dfs.client.block.write.retries", retries);
+      this.remoteFS =
+          HadoopFilesystemProvider.getFilesystem(new Path(spillBasePath), remoteConfCopied);
+    } catch (Exception e) {
+      throw new RssException("Cannot init remoteFS on path:" + spillBasePath);
+    }
+    if (StringUtils.isBlank(this.spillBasePath)) {
+      throw new RssException("You must set remote spill path!");
+    }
+    this.inMemoryMerger = createRssInMemoryMerger();
+  }
+
+  private RssInMemoryMerger createRssInMemoryMerger() {
+    return new RssInMemoryMerger(
+        this,
+        this.conf,
+        inputContext,
+        combiner,
+        exceptionReporter,
+        codec,
+        remoteFS,
+        spillBasePath,
+        appAttemptId);
+  }
+
+  @Override
+  void configureAndStart() {
+    this.inMemoryMerger.start();
+  }
+
+  @Override
+  public void waitForInMemoryMerge() throws InterruptedException {
+    inMemoryMerger.waitForMerge();
+
+    /**
+     * Memory released during merge process could have been used by active fetchers and if they
+     * merge was already in progress, this would not have kicked off another merge and fetchers
+     * could get into indefinite wait state later. To address this, trigger another merge process if
+     * needed and wait for it to complete (to release committedMemory & usedMemory).
+     */
+    boolean triggerAdditionalMerge = false;
+    synchronized (this) {
+      if (commitMemory >= mergeThreshold) {
+        startMemToRemoteMerge();
+        triggerAdditionalMerge = true;
+      }
+    }
+    if (triggerAdditionalMerge) {
+      inMemoryMerger.waitForMerge();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Additional in-memory merge triggered");
+      }
+    }
+  }
+
+  private boolean canShuffleToMemory(long requestedSize) {
+    // TODO: large shuffle data should be store in remote fs directly
+    return true;
+  }
+
+  @Override
+  public synchronized void waitForShuffleToMergeMemory() throws InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (usedMemory > memoryLimit) {
+      wait();
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Waited for "
+              + (System.currentTimeMillis() - startTime)
+              + " for memory to become"
+              + " available");
+    }
+  }
+
+  private final MapOutput stallShuffle = MapOutput.createWaitMapOutput(null);
+
+  @Override
+  public synchronized MapOutput reserve(
+      InputAttemptIdentifier srcAttemptIdentifier,
+      long requestedSize,
+      long compressedLength,
+      int fetcher)
+      throws IOException {
+    if (!canShuffleToMemory(requestedSize)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            srcAttemptIdentifier
+                + ": Shuffling to remote fs since "
+                + requestedSize
+                + " is greater than maxSingleShuffleLimit ("
+                + maxSingleShuffleLimit
+                + ")");
+      }
+      throw new RssException("Shuffle large date is not implemented!");
+    }
+
+    // Stall shuffle if we are above the memory limit
+
+    // It is possible that all threads could just be stalling and not make
+    // progress at all. This could happen when:
+    //
+    // requested size is causing the used memory to go above limit &&
+    // requested size < singleShuffleLimit &&
+    // current used size < mergeThreshold (merge will not get triggered)
+    //
+    // To avoid this from happening, we allow exactly one thread to go past
+    // the memory limit. We check (usedMemory > memoryLimit) and not
+    // (usedMemory + requestedSize > memoryLimit). When this thread is done
+    // fetching, this will automatically trigger a merge thereby unlocking
+    // all the stalled threads
+
+    if (usedMemory > memoryLimit) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            srcAttemptIdentifier
+                + ": Stalling shuffle since usedMemory ("
+                + usedMemory
+                + ") is greater than memoryLimit ("
+                + memoryLimit
+                + ")."
+                + " CommitMemory is ("
+                + commitMemory
+                + ")");
+      }
+      return stallShuffle;
+    }
+
+    // Allow the in-memory shuffle to progress
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          srcAttemptIdentifier
+              + ": Proceeding with shuffle since usedMemory ("
+              + usedMemory
+              + ") is lesser than memoryLimit ("
+              + memoryLimit
+              + ")."
+              + "CommitMemory is ("
+              + commitMemory
+              + ")");
+    }
+    return unconditionalReserve(srcAttemptIdentifier, requestedSize, true);
+  }
+
+  private synchronized MapOutput unconditionalReserve(
+      InputAttemptIdentifier srcAttemptIdentifier, long requestedSize, boolean primaryMapOutput)
+      throws IOException {
+    usedMemory += requestedSize;
+    return MapOutput.createMemoryMapOutput(
+        srcAttemptIdentifier, this, (int) requestedSize, primaryMapOutput);
+  }
+
+  @Override
+  public synchronized void unreserve(long size) {
+    usedMemory -= size;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Notifying unreserve : size="
+              + size
+              + ", commitMemory="
+              + commitMemory
+              + ", usedMemory="
+              + usedMemory
+              + ", mergeThreshold="
+              + mergeThreshold);
+    }
+    notifyAll();
+  }
+
+  @Override
+  public synchronized void releaseCommittedMemory(long size) {
+    commitMemory -= size;
+    unreserve(size);
+  }
+
+  @Override
+  public synchronized void closeInMemoryFile(MapOutput mapOutput) {
+    inMemoryMapOutputs.add(mapOutput);
+    trackAndLogCloseInMemoryFile(mapOutput);
+    commitMemory += mapOutput.getSize();
+    if (commitMemory >= mergeThreshold) {
+      startMemToRemoteMerge();
+    }
+  }
+
+  private void trackAndLogCloseInMemoryFile(MapOutput mapOutput) {
+    statsInMemTotal.updateStats(mapOutput.getSize());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "closeInMemoryFile -> map-output of size: "
+              + mapOutput.getSize()
+              + ", inMemoryMapOutputs.size() -> "
+              + inMemoryMapOutputs.size()
+              + ", commitMemory -> "
+              + commitMemory
+              + ", usedMemory ->"
+              + usedMemory
+              + ", mapOutput="
+              + mapOutput);
+    } else {
+      statsInMemLastLog.updateStats(mapOutput.getSize());
+      long now = Time.monotonicNow();
+      if (now > lastInMemSegmentLogTime + 30 * 1000L) {
+        LOG.info(
+            "CloseInMemoryFile. Current state: inMemoryMapOutputs.size={},"
+                + " commitMemory={},"
+                + " usedMemory={}. Since last log:"
+                + " count={},"
+                + " min={},"
+                + " max={},"
+                + " total={},"
+                + " avg={}",
+            inMemoryMapOutputs.size(),
+            commitMemory,
+            usedMemory,
+            statsInMemLastLog.count,
+            statsInMemLastLog.minSize,
+            statsInMemLastLog.maxSize,
+            statsInMemLastLog.size,
+            (statsInMemLastLog.count == 0
+                ? "nan"
+                : (statsInMemLastLog.size / (double) statsInMemLastLog.count)));
+        statsInMemLastLog.reset();
+        lastInMemSegmentLogTime = now;
+      }
+    }
+  }
+
+  private void startMemToRemoteMerge() {
+    synchronized (inMemoryMerger) {
+      if (!inMemoryMerger.isInProgress()) {
+        LOG.info(
+            inputContext.getSourceVertexName()
+                + ": "
+                + "Starting inMemoryMerger's merge since commitMemory="
+                + commitMemory
+                + " > mergeThreshold="
+                + mergeThreshold
+                + ". Current usedMemory="
+                + usedMemory);
+        inMemoryMerger.startMerge(inMemoryMapOutputs);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void closeOnDiskFile(FileChunk file) {
+    onDiskMapOutputs.add(file);
+    logCloseOnDiskFile(file);
+  }
+
+  @Override
+  public TezRawKeyValueIterator close(boolean tryFinalMerge) throws Throwable {
+    if (!isShutdown.getAndSet(true)) {
+      // Wait for on-going merges to complete
+      inMemoryMerger.startMerge(inMemoryMapOutputs);
+      inMemoryMerger.close();
+
+      // Wait for on-going merges to complete
+      if (!inMemoryMapOutputs.isEmpty()) {
+        throw new RssException("InMemoryMapOutputs should be empty");
+      }
+
+      if (statsInMemTotal.count > 0) {
+        LOG.info(
+            "TotalInMemFetchStats: count={}, totalSize={}, min={}, max={}, avg={}",
+            statsInMemTotal.count,
+            statsInMemTotal.size,
+            statsInMemTotal.minSize,
+            statsInMemTotal.maxSize,
+            (statsInMemTotal.size / (float) statsInMemTotal.size));
+      }
+
+      // Don't attempt a final merge if close is invoked as a result of a previous
+      // shuffle exception / error.
+      if (tryFinalMerge) {
+        try {
+          TezRawKeyValueIterator kvIter = finalMerge();
+          return kvIter;
+        } catch (InterruptedException e) {
+          // Cleanup the disk segments
+          if (cleanup) {
+            cleanup(remoteFS, onDiskMapOutputs);
+          }
+          Thread.currentThread().interrupt(); // reset interrupt status
+          throw e;
+        }
+      }
+    }
+    return null;
+  }
+
+  long createInMemorySegments(
+      List<MapOutput> inMemoryMapOutputs, List<TezMerger.Segment> inMemorySegments, long leaveBytes)
+      throws IOException {
+    long totalSize = 0L;
+    // We could use fullSize could come from the RamManager, but files can be
+    // closed but not yet present in inMemoryMapOutputs
+    long fullSize = 0L;
+    for (MapOutput mo : inMemoryMapOutputs) {
+      fullSize += mo.getSize();
+    }
+    int inMemoryMapOutputsOffset = 0;
+    while ((fullSize > leaveBytes) && !Thread.currentThread().isInterrupted()) {
+      MapOutput mo = inMemoryMapOutputs.get(inMemoryMapOutputsOffset++);
+      byte[] data = mo.getMemory();
+      long size = data.length;
+      totalSize += size;
+      fullSize -= size;
+      IFile.Reader reader =
+          new InMemoryReader(RssMergeManager.this, mo.getAttemptIdentifier(), data, 0, (int) size);
+      inMemorySegments.add(
+          new TezMerger.Segment(
+              reader, (mo.isPrimaryMapOutput() ? mergedMapOutputsCounter : null)));
+    }
+    // Bulk remove removed in-memory map outputs efficiently
+    inMemoryMapOutputs.subList(0, inMemoryMapOutputsOffset).clear();
+    return totalSize;
+  }
+
+  private long lastOnDiskSegmentLogTime = -1L;
+
+  private void logCloseOnDiskFile(FileChunk file) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "close onDiskFile="
+              + file.getPath()
+              + ", len="
+              + file.getLength()
+              + ", onDisMapOutputs="
+              + onDiskMapOutputs.size());
+    } else {
+      long now = Time.monotonicNow();
+      if (now > lastOnDiskSegmentLogTime + 30 * 1000L) {
+        LOG.info(
+            "close onDiskFile. State: NumOnDiskFiles={}. Current: path={}, len={}",
+            onDiskMapOutputs.size(),
+            file.getPath(),
+            file.getLength());
+        lastOnDiskSegmentLogTime = now;
+      }
+    }
+  }
+
+  /*
+   * Since merge remote files with memory files or other remote files is a
+   * time-consuming operation, the implementation of FinalMerge is simplified,
+   * sort io factor will be ignored. We do not merge any files that have been
+   * archived to the remote file system, but give all remote files to the iterator
+   * for direct reading.
+   * */
+  private TezRawKeyValueIterator finalMerge() throws IOException, InterruptedException {
+    Class keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+    Class valueClass = ConfigUtils.getIntermediateInputValueClass(conf);
+    Path[] remotePaths =
+        onDiskMapOutputs.stream()
+            .map(output -> output.getPath())
+            .toArray(num -> new Path[onDiskMapOutputs.size()]);
+    final RawComparator comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
+    return TezMerger.merge(
+        conf,
+        remoteFS,
+        keyClass,
+        valueClass,
+        codec,
+        ifileReadAhead,
+        ifileReadAheadLength,
+        ifileBufferSize,
+        remotePaths,
+        true,
+        Integer.MAX_VALUE,
+        null,
+        comparator,
+        progressable,
+        spilledRecordsCounter,
+        null,
+        additionalBytesRead,
+        null);
+  }
+
+  public boolean isCleanup() {
+    return cleanup;
+  }
+
+  public Progressable getProgressable() {
+    return progressable;
+  }
+
+  private static class SegmentStatsTracker {
+    private long size;
+    private int count;
+    private long minSize;
+    private long maxSize;
+
+    SegmentStatsTracker() {
+      reset();
+    }
+
+    void updateStats(long segSize) {
+      size += segSize;
+      count++;
+      minSize = (segSize < minSize ? segSize : minSize);
+      maxSize = (segSize > maxSize ? segSize : maxSize);
+    }
+
+    void reset() {
+      size = 0L;
+      count = 0;
+      minSize = Long.MAX_VALUE;
+      maxSize = Long.MIN_VALUE;
+    }
+  }
+
+  @VisibleForTesting
+  public RssInMemoryMerger getInMemoryMerger() {
+    return inMemoryMerger;
+  }
+}
diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffle.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffle.java
index 8b42749db..56e00b324 100644
--- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffle.java
+++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffle.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -45,6 +46,7 @@ import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.common.CallableWithNdc;
+import org.apache.tez.common.RssTezConfig;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
@@ -63,6 +65,8 @@ import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.RemoteStorageInfo;
+
 /** Usage: Create instance, setInitialMemoryAllocated(long), run() */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -98,6 +102,7 @@ public class RssShuffle implements ExceptionReporter {
   private final long startTime;
   private final TezCounter mergePhaseTime;
   private final TezCounter shufflePhaseTime;
+  private Configuration remoteConf;
 
   /** Usage: Create instance, RssShuffle */
   public RssShuffle(
@@ -134,20 +139,7 @@ public class RssShuffle implements ExceptionReporter {
     } else {
       this.ifileReadAheadLength = 0;
     }
-
-    Combiner combiner = TezRuntimeUtils.instantiateCombiner(conf, inputContext);
-
-    FileSystem localFS = FileSystem.getLocal(this.conf);
-    LocalDirAllocator localDirAllocator =
-        new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
-
-    // TODO TEZ Get rid of Map / Reduce references.
-    TezCounter spilledRecordsCounter =
-        inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
-    TezCounter reduceCombineInputCounter =
-        inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
-    TezCounter mergedMapOutputsCounter =
-        inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
+    this.remoteConf = getRemoteConf(conf);
 
     LOG.info(
         srcNameTrimmed
@@ -161,22 +153,7 @@ public class RssShuffle implements ExceptionReporter {
             + ifileReadAhead);
 
     startTime = System.currentTimeMillis();
-    merger =
-        new MergeManager(
-            this.conf,
-            localFS,
-            localDirAllocator,
-            inputContext,
-            combiner,
-            spilledRecordsCounter,
-            reduceCombineInputCounter,
-            mergedMapOutputsCounter,
-            this,
-            initialMemoryAvailable,
-            codec,
-            ifileReadAhead,
-            ifileReadAheadLength);
-
+    merger = createMergeManager(initialMemoryAvailable, applicationAttemptId);
     rssScheduler =
         new RssShuffleScheduler(
             this.inputContext,
@@ -212,6 +189,83 @@ public class RssShuffle implements ExceptionReporter {
     rssRunShuffleCallable = new RssRunShuffleCallable();
   }
 
+  protected MergeManager createMergeManager(
+      long initialMemoryAvailable, ApplicationAttemptId appAttemptId) throws IOException {
+    Combiner combiner = TezRuntimeUtils.instantiateCombiner(conf, inputContext);
+    FileSystem localFS = FileSystem.getLocal(this.conf);
+    LocalDirAllocator localDirAllocator =
+        new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    TezCounter spilledRecordsCounter =
+        inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
+    TezCounter reduceCombineInputCounter =
+        inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+    TezCounter mergedMapOutputsCounter =
+        inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
+
+    boolean useRemoteSpill =
+        conf.getBoolean(
+            RssTezConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+            RssTezConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT);
+    if (useRemoteSpill) {
+      // Use minimized replica, because spilled data can be recomputed by reduce task.
+      // Instead, we use more retries on HDFS client.
+      int replication =
+          conf.getInt(
+              RssTezConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION,
+              RssTezConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION_DEFAULT);
+      int retries =
+          conf.getInt(
+              RssTezConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES,
+              RssTezConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT);
+      LOG.info("Tez RssShuffle will use RssMergeManager!");
+      return new RssMergeManager(
+          this.conf,
+          localFS,
+          inputContext,
+          combiner,
+          spilledRecordsCounter,
+          reduceCombineInputCounter,
+          mergedMapOutputsCounter,
+          this,
+          initialMemoryAvailable,
+          codec,
+          ifileReadAhead,
+          ifileReadAheadLength,
+          this.remoteConf,
+          replication,
+          retries,
+          appAttemptId.toString());
+    } else {
+      return new MergeManager(
+          this.conf,
+          localFS,
+          localDirAllocator,
+          inputContext,
+          combiner,
+          spilledRecordsCounter,
+          reduceCombineInputCounter,
+          mergedMapOutputsCounter,
+          this,
+          initialMemoryAvailable,
+          codec,
+          ifileReadAhead,
+          ifileReadAheadLength);
+    }
+  }
+
+  private static Configuration getRemoteConf(Configuration conf) {
+    String basePath = conf.get(RssTezConfig.RSS_REMOTE_STORAGE_PATH);
+    String remoteStorageConf = conf.get(RssTezConfig.RSS_REMOTE_STORAGE_CONF);
+    RemoteStorageInfo remoteStorageInfo = new RemoteStorageInfo(basePath, remoteStorageConf);
+    Configuration remoteConf = new Configuration(conf);
+    if (!remoteStorageInfo.isEmpty()) {
+      for (Map.Entry<String, String> entry : remoteStorageInfo.getConfItems().entrySet()) {
+        remoteConf.set(entry.getKey(), entry.getValue());
+      }
+    }
+    return remoteConf;
+  }
+
   public void handleEvents(List<Event> events) throws IOException {
     if (!isShutDown.get()) {
       eventHandler.handleEvents(events);
diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssInMemoryMergerTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssInMemoryMergerTest.java
new file mode 100644
index 000000000..e207ce9d0
--- /dev/null
+++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssInMemoryMergerTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.impl.TezInputContextImpl;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.tez.common.RssTezConfig.RSS_REMOTE_SPILL_STORAGE_PATH;
+import static org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS;
+import static org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS;
+import static org.apache.tez.runtime.library.common.Constants.TEZ_RUNTIME_TASK_MEMORY;
+import static org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RssInMemoryMerger.Counter.NUM_MEM_TO_REMOTE_MERGES;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RssInMemoryMergerTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RssInMemoryMergerTest.class);
+
+  private static final String[] KEYS = {"aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc", "dddddddddd"};
+  private static final String[] VALUES = {"1111111111", "2222222222", "3333333333", "4444444444"};
+  private static final String BASE_SPILL_PATH = "/tmp";
+  private static final ApplicationId APP_ID = ApplicationId.newInstance(1, 1);
+  private static final ApplicationAttemptId APP_ATTEMPT_ID =
+      ApplicationAttemptId.newInstance(APP_ID, 1);
+  private static final String UNIQUE_ID = "TASK_ATTEMPT_1";
+
+  private static FileSystem remoteFS;
+  private static MiniDFSCluster cluster;
+
+  @BeforeAll
+  public static void setUpHdfs(@TempDir File tempDir) throws Exception {
+    Configuration conf = new Configuration();
+    File baseDir = tempDir;
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+    cluster = (new MiniDFSCluster.Builder(conf)).build();
+    String hdfsUri = cluster.getURI().toString() + "/";
+    remoteFS = (new Path(hdfsUri)).getFileSystem(conf);
+  }
+
+  @AfterAll
+  public static void tearDownHdfs() throws Exception {
+    remoteFS.close();
+    cluster.shutdown();
+  }
+
+  @Test
+  public void mergerTest() throws Exception {
+    // 1 Construct RssMergeManager
+    Configuration conf = new Configuration();
+    conf.set(RSS_REMOTE_SPILL_STORAGE_PATH, BASE_SPILL_PATH);
+    conf.setInt(TEZ_RUNTIME_TASK_MEMORY, 1024);
+    conf.setClass(TEZ_RUNTIME_KEY_CLASS, Text.class, Text.class);
+    conf.setClass(TEZ_RUNTIME_VALUE_CLASS, Text.class, Text.class);
+
+    TezInputContextImpl inputContext = mock(TezInputContextImpl.class);
+    TezCounters tezCounters = new TezCounters();
+    when(inputContext.getCounters()).thenReturn(tezCounters);
+    when(inputContext.getSourceVertexName()).thenReturn("vertex0");
+    when(inputContext.getUniqueIdentifier()).thenReturn(UNIQUE_ID);
+    TezCounter spilledRecordsCounter = tezCounters.findCounter(TaskCounter.SPILLED_RECORDS);
+    TezCounter reduceCombineInputCounter =
+        tezCounters.findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+    TezCounter mergedMapOutputsCounter = tezCounters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
+
+    FileSystem localFS = FileSystem.getLocal(conf);
+    int initialMemoryAvailable = 1024;
+    CompressionCodec codec = null;
+    final RssMergeManager manager =
+        new RssMergeManager(
+            conf,
+            localFS,
+            inputContext,
+            null,
+            spilledRecordsCounter,
+            reduceCombineInputCounter,
+            mergedMapOutputsCounter,
+            null,
+            initialMemoryAvailable,
+            codec,
+            false,
+            0,
+            remoteFS.getConf(),
+            1,
+            3,
+            APP_ATTEMPT_ID.toString());
+
+    // 2 Example
+    // 2.1 write map outputs
+    Map<String, String> map1 = new TreeMap<String, String>();
+    map1.put(KEYS[0], VALUES[0]);
+    map1.put(KEYS[2], VALUES[2]);
+    Map<String, String> map2 = new TreeMap<String, String>();
+    map2.put(KEYS[1], VALUES[1]);
+    map2.put(KEYS[3], VALUES[3]);
+    byte[] mapOutputBytes1 = writeMapOutput(conf, map1);
+    ByteArrayInputStream mapInput1 = new ByteArrayInputStream(mapOutputBytes1);
+    byte[] mapOutputBytes2 = writeMapOutput(conf, map2);
+    ByteArrayInputStream mapInput2 = new ByteArrayInputStream(mapOutputBytes2);
+    InputAttemptIdentifier mapId1 = new InputAttemptIdentifier(1, 0);
+    InputAttemptIdentifier mapId2 = new InputAttemptIdentifier(2, 0);
+    // header lenght is 4, we must substract it!
+    MapOutput mapOutput1 =
+        MapOutput.createMemoryMapOutput(mapId1, manager, mapOutputBytes1.length - 4, false);
+    MapOutput mapOutput2 =
+        MapOutput.createMemoryMapOutput(mapId2, manager, mapOutputBytes2.length - 4, false);
+    ShuffleUtils.shuffleToMemory(
+        mapOutput1.getMemory(),
+        mapInput1,
+        mapOutputBytes1.length,
+        mapOutputBytes1.length,
+        codec,
+        false,
+        0,
+        LOG,
+        mapId1);
+    ShuffleUtils.shuffleToMemory(
+        mapOutput2.getMemory(),
+        mapInput2,
+        mapOutputBytes2.length,
+        mapOutputBytes2.length,
+        codec,
+        false,
+        0,
+        LOG,
+        mapId2);
+
+    // 2.2 Trigger to merge the map outputs
+    RssInMemoryMerger merger = manager.getInMemoryMerger();
+    List<MapOutput> mapOutputs = new ArrayList();
+    mapOutputs.add(mapOutput1);
+    mapOutputs.add(mapOutput2);
+    merger.merge(mapOutputs);
+
+    // 3 Verify result
+    // 3.1 Verify counters
+    assertEquals(1, tezCounters.findCounter(NUM_MEM_TO_REMOTE_MERGES).getValue());
+
+    // 3.2 Verify only one remote file contains
+    String parentPath = String.format("%s/%s", BASE_SPILL_PATH, APP_ATTEMPT_ID);
+    FileStatus[] files = remoteFS.listStatus(new Path(parentPath));
+    assertEquals(1, files.length);
+    String filePathPattern =
+        String.format(
+            "%s/%s/%s_src_(\\d)_spill_%d.out", BASE_SPILL_PATH, APP_ATTEMPT_ID, UNIQUE_ID, -1);
+    Pattern pattern = Pattern.compile(filePathPattern);
+    Matcher matcher = pattern.matcher(files[0].getPath().toString());
+    assertTrue(matcher.find());
+    assertTrue(
+        matcher.group(1).equals(String.valueOf(mapId1.getInputIdentifier()))
+            || matcher.group(1).equals(String.valueOf(mapId2.getInputIdentifier())));
+    Path mergePath = files[0].getPath();
+
+    // 3.3 Verify the content from remote fs
+    List<String> keys = Lists.newArrayList();
+    List<String> values = Lists.newArrayList();
+    readOnDiskMapOutput(remoteFS, mergePath, keys, values);
+    remoteFS.delete(new Path(BASE_SPILL_PATH), true);
+    for (int i = 0; i < KEYS.length; i++) {
+      assertEquals(KEYS[i], keys.get(i));
+      assertEquals(VALUES[i], values.get(i));
+    }
+  }
+
+  static byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues)
+      throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
+    IFile.Writer writer = new IFile.Writer(conf, fsdos, Text.class, Text.class, null, null, null);
+    for (String key : keysToValues.keySet()) {
+      String value = keysToValues.get(key);
+      writer.append(new Text(key), new Text(value));
+    }
+    writer.close();
+    return baos.toByteArray();
+  }
+
+  static void readOnDiskMapOutput(FileSystem fs, Path path, List<String> keys, List<String> values)
+      throws IOException {
+    IFile.Reader reader = new IFile.Reader(fs, path, null, null, null, false, 0, 0);
+    DataInputBuffer keyBuff = new DataInputBuffer();
+    DataInputBuffer valueBuff = new DataInputBuffer();
+    Text key = new Text();
+    Text value = new Text();
+    while (reader.nextRawKey(keyBuff)) {
+      key.readFields(keyBuff);
+      keys.add(key.toString());
+      reader.nextRawValue(valueBuff);
+      value.readFields(valueBuff);
+      values.add(value.toString());
+    }
+  }
+}
diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssMergeManagerTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssMergeManagerTest.java
new file mode 100644
index 000000000..660c40b9f
--- /dev/null
+++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssMergeManagerTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.impl.TezInputContextImpl;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.tez.common.RssTezConfig.RSS_REMOTE_SPILL_STORAGE_PATH;
+import static org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS;
+import static org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT;
+import static org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT;
+import static org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS;
+import static org.apache.tez.runtime.library.common.Constants.TEZ_RUNTIME_TASK_MEMORY;
+import static org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RssInMemoryMerger.Counter.NUM_MEM_TO_REMOTE_MERGES;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RssMergeManagerTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RssMergeManagerTest.class);
+
+  private static final String[] KEYS = {
+    "aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc", "dddddddddd", "eeeeeeeeee", "ffffffffff"
+  };
+  private static final String[] VALUES = {
+    "1111111111", "2222222222", "3333333333", "4444444444", "5555555555", "6666666666"
+  };
+  private static final String BASE_SPILL_PATH = "/tmp";
+  private static final ApplicationId APP_ID = ApplicationId.newInstance(1, 1);
+  private static final ApplicationAttemptId APP_ATTEMPT_ID =
+      ApplicationAttemptId.newInstance(APP_ID, 1);
+  private static final String UNIQUE_ID = "TASK_ATTEMPT_1";
+
+  private static FileSystem remoteFS;
+  private static MiniDFSCluster cluster;
+
+  @BeforeAll
+  public static void setUpHdfs(@TempDir File tempDir) throws Exception {
+    Configuration conf = new Configuration();
+    File baseDir = tempDir;
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+    cluster = (new MiniDFSCluster.Builder(conf)).build();
+    String hdfsUri = cluster.getURI().toString() + "/";
+    remoteFS = (new Path(hdfsUri)).getFileSystem(conf);
+  }
+
+  @AfterAll
+  public static void tearDownHdfs() throws Exception {
+    remoteFS.close();
+    cluster.shutdown();
+  }
+
+  @Test
+  public void mergerTest() throws Throwable {
+    // 1 Construct and start RssMergeManager
+    Configuration conf = new Configuration();
+    conf.set(RSS_REMOTE_SPILL_STORAGE_PATH, BASE_SPILL_PATH);
+    conf.setInt(TEZ_RUNTIME_TASK_MEMORY, 1024);
+    conf.setClass(TEZ_RUNTIME_KEY_CLASS, Text.class, Text.class);
+    conf.setClass(TEZ_RUNTIME_VALUE_CLASS, Text.class, Text.class);
+    conf.setFloat(TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.07F);
+    conf.setFloat(TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.08F);
+
+    TezInputContextImpl inputContext = mock(TezInputContextImpl.class);
+    TezCounters tezCounters = new TezCounters();
+    when(inputContext.getCounters()).thenReturn(tezCounters);
+    when(inputContext.getSourceVertexName()).thenReturn("vertex0");
+    when(inputContext.getUniqueIdentifier()).thenReturn(UNIQUE_ID);
+    TezCounter spilledRecordsCounter = tezCounters.findCounter(TaskCounter.SPILLED_RECORDS);
+    TezCounter reduceCombineInputCounter =
+        tezCounters.findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+    TezCounter mergedMapOutputsCounter = tezCounters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
+
+    FileSystem localFS = FileSystem.getLocal(conf);
+    int initialMemoryAvailable = 1024;
+    CompressionCodec codec = null;
+    final RssMergeManager manager =
+        new RssMergeManager(
+            conf,
+            localFS,
+            inputContext,
+            null,
+            spilledRecordsCounter,
+            reduceCombineInputCounter,
+            mergedMapOutputsCounter,
+            null,
+            initialMemoryAvailable,
+            codec,
+            false,
+            0,
+            remoteFS.getConf(),
+            1,
+            3,
+            APP_ATTEMPT_ID.toString());
+    manager.configureAndStart();
+
+    // 2 Example1: Trigger to merge
+    // 2.1 write and commit map outputs
+    // Total write file is 116, it is larger than 1024 * 0.08, so will trigger to merger.
+    Map<String, String> map1 = new TreeMap<String, String>();
+    map1.put(KEYS[0], VALUES[0]);
+    map1.put(KEYS[2], VALUES[2]);
+    Map<String, String> map2 = new TreeMap<String, String>();
+    map2.put(KEYS[3], VALUES[3]);
+    map2.put(KEYS[5], VALUES[5]);
+    byte[] mapOutputBytes1 = RssInMemoryMergerTest.writeMapOutput(conf, map1);
+    ByteArrayInputStream mapInput1 = new ByteArrayInputStream(mapOutputBytes1);
+    byte[] mapOutputBytes2 = RssInMemoryMergerTest.writeMapOutput(conf, map2);
+    ByteArrayInputStream mapInput2 = new ByteArrayInputStream(mapOutputBytes2);
+    InputAttemptIdentifier mapId1 = new InputAttemptIdentifier(1, 0);
+    InputAttemptIdentifier mapId2 = new InputAttemptIdentifier(2, 0);
+    // Header lenght is 4, we must substract it!
+    MapOutput mapOutput1 =
+        manager.reserve(mapId1, mapOutputBytes1.length - 4, mapOutputBytes1.length - 4, 0);
+    MapOutput mapOutput2 =
+        manager.reserve(mapId2, mapOutputBytes2.length - 4, mapOutputBytes1.length - 4, 0);
+    ShuffleUtils.shuffleToMemory(
+        mapOutput1.getMemory(),
+        mapInput1,
+        mapOutputBytes1.length,
+        mapOutputBytes1.length,
+        codec,
+        false,
+        0,
+        LOG,
+        mapId1);
+    ShuffleUtils.shuffleToMemory(
+        mapOutput2.getMemory(),
+        mapInput2,
+        mapOutputBytes2.length,
+        mapOutputBytes2.length,
+        codec,
+        false,
+        0,
+        LOG,
+        mapId2);
+    mapOutput1.commit();
+    mapOutput2.commit();
+
+    // 2.2 Wait for merge
+    manager.waitForInMemoryMerge();
+
+    // 2.3 Verify result
+    // 2.3.1 Verify counters
+    assertEquals(1, tezCounters.findCounter(NUM_MEM_TO_REMOTE_MERGES).getValue());
+
+    // 2.3.2 Verify only one merged file is located in remote fs
+    String parentPath = String.format("%s/%s", BASE_SPILL_PATH, APP_ATTEMPT_ID.toString());
+    FileStatus[] files = remoteFS.listStatus(new Path(parentPath));
+    assertEquals(1, files.length);
+    String filePathPattern =
+        String.format(
+            "%s/%s/%s_src_(\\d)_spill_%d.out", BASE_SPILL_PATH, APP_ATTEMPT_ID, UNIQUE_ID, -1);
+    Pattern pattern = Pattern.compile(filePathPattern);
+    Matcher matcher = pattern.matcher(files[0].getPath().toString());
+    assertTrue(matcher.find());
+    assertTrue(
+        matcher.group(1).equals(String.valueOf(mapId1.getInputIdentifier()))
+            || matcher.group(1).equals(String.valueOf(mapId2.getInputIdentifier())));
+
+    // 2.3.3 Verify the content from remote fs
+    Path mergePath = files[0].getPath();
+    List<String> keys = Lists.newArrayList();
+    List<String> values = Lists.newArrayList();
+    RssInMemoryMergerTest.readOnDiskMapOutput(remoteFS, mergePath, keys, values);
+    // assert content in remote fs
+    List<String> actualKeys = Lists.newArrayList(KEYS[0], KEYS[2], KEYS[3], KEYS[5]);
+    List<String> actualValues = Lists.newArrayList(VALUES[0], VALUES[2], VALUES[3], VALUES[5]);
+    for (int i = 0; i < actualValues.size(); i++) {
+      assertEquals(keys.get(i), actualKeys.get(i));
+      assertEquals(values.get(i), actualValues.get(i));
+    }
+
+    // 3 Example2: Wait close to trigger merge
+    // 3.1 write and commit map outputs
+    // Total write file is 58, it is less than 1024 * 0.08, so will not trigger to merge.
+    Map<String, String> map3 = new TreeMap<String, String>();
+    map3.put(KEYS[1], VALUES[1]);
+    map3.put(KEYS[4], VALUES[4]);
+    byte[] mapOutputBytes3 = RssInMemoryMergerTest.writeMapOutput(conf, map3);
+    ByteArrayInputStream mapInput3 = new ByteArrayInputStream(mapOutputBytes3);
+    // header lenght is 4, we must substract it!
+    InputAttemptIdentifier mapId3 = new InputAttemptIdentifier(3, 0);
+    MapOutput mapOutput3 =
+        manager.reserve(mapId3, mapOutputBytes3.length - 4, mapOutputBytes3.length - 4, 0);
+    ShuffleUtils.shuffleToMemory(
+        mapOutput3.getMemory(),
+        mapInput3,
+        mapOutputBytes3.length,
+        mapOutputBytes3.length,
+        codec,
+        false,
+        0,
+        LOG,
+        mapId3);
+    mapOutput3.commit();
+
+    // 3.2 Verfiy merge is not trigger, only one remote file exists.
+    manager.waitForInMemoryMerge();
+    files = remoteFS.listStatus(new Path(parentPath));
+    assertEquals(1, files.length);
+
+    // 3.3 Call close, then trigger to mege
+    TezRawKeyValueIterator iterator = manager.close(true);
+
+    // 3.4 Verify result
+    // 3.4.1 Verify counters
+    assertEquals(2, tezCounters.findCounter(NUM_MEM_TO_REMOTE_MERGES).getValue());
+
+    // 3.4.2 Verify two merged file is located in remote fs
+    files = remoteFS.listStatus(new Path(parentPath));
+    assertEquals(2, files.length);
+    String filePath3 =
+        String.format(
+            "%s/%s/%s_src_%s_spill_%d.out",
+            BASE_SPILL_PATH, APP_ATTEMPT_ID, UNIQUE_ID, mapId3.getInputIdentifier(), -1);
+    assertTrue(remoteFS.exists(new Path(filePath3)));
+
+    // 3.4.3 Verify the content from remote fs
+    mergePath = new Path(filePath3);
+    keys = Lists.newArrayList();
+    values = Lists.newArrayList();
+    RssInMemoryMergerTest.readOnDiskMapOutput(remoteFS, mergePath, keys, values);
+    // assert content in remote fs
+    actualKeys = Lists.newArrayList(KEYS[1], KEYS[4]);
+    actualValues = Lists.newArrayList(VALUES[1], VALUES[4]);
+    for (int i = 0; i < actualValues.size(); i++) {
+      assertEquals(keys.get(i), actualKeys.get(i));
+      assertEquals(values.get(i), actualValues.get(i));
+    }
+
+    // 3.4.3 Verify the content from iterator
+    for (int i = 0; i < KEYS.length; i++) {
+      // test final returned values
+      iterator.next();
+      byte[] key = new byte[iterator.getKey().getLength()];
+      byte[] value = new byte[iterator.getValue().getLength()];
+      System.arraycopy(iterator.getKey().getData(), 0, key, 0, key.length);
+      System.arraycopy(iterator.getValue().getData(), 0, value, 0, value.length);
+      assertEquals(KEYS[i], new Text(key).toString().trim());
+      assertEquals(VALUES[i], new Text(value).toString().trim());
+    }
+  }
+}
diff --git a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java
index b5219efeb..55e8bd1be 100644
--- a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java
+++ b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java
@@ -123,8 +123,18 @@ public class TezIntegrationTestBase extends IntegrationTestBase {
     runTezApp(appConf, getTestTool(), getTestArgs("rss"));
     final String rssPath = getOutputDir("rss");
 
-    // 3 verify the results
+    // 3 Run Tez examples base on rss with remote spill enable
+    appConf = new TezConfiguration(miniTezCluster.getConfig());
+    appConf.setBoolean(RssTezConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED, true);
+    appConf.set(RssTezConfig.RSS_REMOTE_SPILL_STORAGE_PATH, "/tmp/spill");
+    updateRssConfiguration(appConf);
+    appendAndUploadRssJars(appConf);
+    runTezApp(appConf, getTestTool(), getTestArgs("rss-spill"));
+    final String rssPathSpill = getOutputDir("rss-spill");
+
+    // 4 verify the results
     verifyResults(originPath, rssPath);
+    verifyResults(originPath, rssPathSpill);
   }
 
   public Tool getTestTool() {