You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zh...@apache.org on 2022/07/15 09:41:12 UTC

[incubator-uniffle] branch master updated: [Experimental Feature] MR Supports Remote Spill (#55)

This is an automated email from the ASF dual-hosted git repository.

zhifgli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new f4ce2ed  [Experimental Feature] MR Supports Remote Spill (#55)
f4ce2ed is described below

commit f4ce2edd2d4e3355a6f1e12f0e7f712964fe83b4
Author: frankliee <fr...@tencent.com>
AuthorDate: Fri Jul 15 17:41:08 2022 +0800

    [Experimental Feature] MR Supports Remote Spill (#55)
    
    ### What changes were proposed in this pull request?
    Rewrite Mapreduce's MergerManager to spill sorted segments to HDFS,
    It returns a merge-sorted iterator to read these HDFS segments.
    
    ### Why are the changes needed?
    In cloud, machines may have very limited disk space and performance.
    This PR allows to spill data to remote storage (e.g., hdfs)
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    |Property Name|Default|Description|
    |---|---|---|
    |mapreduce.rss.reduce.remote.spill.enable|false|Whether to use remote spill|
    |mapreduce.rss.reduce.remote.spill.attempt.inc|1|Increase reduce attempts as hdfs is easier to crash than disk|
    |mapreduce.rss.reduce.remote.spill.replication|1|The replication number to spill data to hdfs|
    |mapreduce.rss.reduce.remote.spill.retries|5|The retry number to spill data to hdfs|
    ### How was this patch tested?
    New UT and IT with remote spill.
    
    Co-authored-by: roryqi <ro...@tencent.com>
---
 .../org/apache/hadoop/mapreduce/RssMRConfig.java   |  14 +
 .../org/apache/hadoop/mapreduce/RssMRUtils.java    |   4 +
 .../task/reduce/RssInMemoryRemoteMerger.java       | 197 ++++++++++++++
 .../task/reduce/RssRemoteMergeManagerImpl.java     | 290 +++++++++++++++++++++
 .../hadoop/mapreduce/task/reduce/RssShuffle.java   |  47 +++-
 .../hadoop/mapreduce/v2/app/RssMRAppMaster.java    |  22 ++
 .../task/reduce/RssInMemoryRemoteMergerTest.java   | 151 +++++++++++
 .../task/reduce/RssRemoteMergeManagerTest.java     | 157 +++++++++++
 docs/client_guide.md                               |  18 ++
 .../apache/uniffle/test/MRIntegrationTestBase.java |   8 +
 10 files changed, 900 insertions(+), 8 deletions(-)

diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
index 1dfb673..b1b9507 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
@@ -97,6 +97,20 @@ public class RssMRConfig {
   public static final long RSS_CLIENT_DEFAULT_MAX_SEGMENT_SIZE = 3 * 1024;
   public static final String RSS_STORAGE_TYPE = MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_STORAGE_TYPE;
 
+  public static final String RSS_REDUCE_REMOTE_SPILL_ENABLED = MR_RSS_CONFIG_PREFIX
+      + "rss.reduce.remote.spill.enable";
+  public static final boolean RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT = false;
+  public static final String RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC = MR_RSS_CONFIG_PREFIX
+      + "rss.reduce.remote.spill.attempt.inc";
+  public static final int RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC_DEFAULT = 1;
+  public static final String RSS_REDUCE_REMOTE_SPILL_REPLICATION = MR_RSS_CONFIG_PREFIX
+      + "rss.reduce.remote.spill.replication";
+  public static final int RSS_REDUCE_REMOTE_SPILL_REPLICATION_DEFAULT = 1;
+  public static final String RSS_REDUCE_REMOTE_SPILL_RETRIES = MR_RSS_CONFIG_PREFIX
+      + "rss.reduce.remote.spill.retries";
+  public static final int RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT = 5;
+
+
   public static final String RSS_PARTITION_NUM_PER_RANGE =
       MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_PARTITION_NUM_PER_RANGE;
   public static final int RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE =
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index f2e7984..684a5ec 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -156,6 +156,10 @@ public class RssMRUtils {
     return rssJobConf.getLong(key, mrJobConf.getLong(key, defaultValue));
   }
 
+  public static boolean getBoolean(JobConf rssJobConf, JobConf mrJobConf, String key, boolean defaultValue) {
+    return rssJobConf.getBoolean(key, mrJobConf.getBoolean(key, defaultValue));
+  }
+
   public static double getDouble(JobConf rssJobConf, JobConf mrJobConf, String key, double defaultValue) {
     return rssJobConf.getDouble(key, mrJobConf.getDouble(key, defaultValue));
   }
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMerger.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMerger.java
new file mode 100644
index 0000000..445a56f
--- /dev/null
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMerger.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.IFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Merger;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+
+public class RssInMemoryRemoteMerger<K, V> extends MergeThread<InMemoryMapOutput<K,V>, K, V> {
+  private static final Log LOG = LogFactory.getLog(RssInMemoryRemoteMerger.class);
+
+  private static final String SPILL_OUTPUT_PREFIX = "spill";
+  private final RssRemoteMergeManagerImpl<K, V> manager;
+  private final JobConf jobConf;
+  private final FileSystem remoteFs;
+  private final Path spillPath;
+  private final String taskAttemptId;
+  private final CompressionCodec codec;
+  private final Progressable reporter;
+  private final Counters.Counter spilledRecordsCounter;
+  private final Class<? extends Reducer> combinerClass;
+  private final Task.CombineOutputCollector<K,V> combineCollector;
+  private final Counters.Counter reduceCombineInputCounter;
+  private final Counters.Counter mergedMapOutputsCounter;
+
+  public RssInMemoryRemoteMerger(
+      RssRemoteMergeManagerImpl<K, V> manager,
+      JobConf jobConf,
+      FileSystem remoteFs,
+      Path spillPath,
+      String taskId,
+      CompressionCodec codec,
+      Progressable reporter,
+      Counters.Counter spilledRecordsCounter,
+      Class<? extends Reducer> combinerClass,
+      ExceptionReporter exceptionReporter,
+      Task.CombineOutputCollector<K,V> combineCollector,
+      Counters.Counter reduceCombineInputCounter,
+      Counters.Counter mergedMapOutputsCounter) {
+    super(manager, Integer.MAX_VALUE, exceptionReporter);
+    this.setName("RssInMemoryMerger - Thread to merge in-memory map-outputs");
+    this.setDaemon(true);
+    this.manager = manager;
+    this.jobConf = jobConf;
+    this.remoteFs = remoteFs;
+    this.spillPath = spillPath;
+    this.taskAttemptId = taskId;
+    this.codec = codec;
+    this.reporter = reporter;
+    this.spilledRecordsCounter = spilledRecordsCounter;
+    this.combinerClass = combinerClass;
+    this.combineCollector = combineCollector;
+    this.reduceCombineInputCounter = reduceCombineInputCounter;
+    this.mergedMapOutputsCounter = mergedMapOutputsCounter;
+  }
+
+  @Override
+  public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
+    if (inputs == null || inputs.size() == 0) {
+      return;
+    }
+
+    long start = System.currentTimeMillis();
+    TaskAttemptID mapId = inputs.get(0).getMapId();
+
+    List<Merger.Segment<K, V>> inMemorySegments = new ArrayList<Merger.Segment<K, V>>();
+    createInMemorySegments(inputs, inMemorySegments);
+    int noInMemorySegments = inMemorySegments.size();
+
+    String filePath = SPILL_OUTPUT_PREFIX + Path.SEPARATOR + taskAttemptId + Path.SEPARATOR + mapId;
+    Path outputPath = new Path(spillPath, filePath);
+
+    FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, remoteFs.create(outputPath));
+    IFile.Writer<K, V> writer = new IFile.Writer<K, V>(jobConf, out,
+        (Class<K>) jobConf.getMapOutputKeyClass(),
+        (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
+
+    RawKeyValueIterator rIter = null;
+    try {
+      LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments...");
+
+      // tmpDir won't be used. tmpDir is used for onDiskMerger
+      rIter = Merger.merge(jobConf, remoteFs,
+          (Class<K>)jobConf.getMapOutputKeyClass(),
+          (Class<V>)jobConf.getMapOutputValueClass(),
+          inMemorySegments, inMemorySegments.size(),
+          new Path(taskAttemptId),
+          (RawComparator<K>)jobConf.getOutputKeyComparator(),
+          reporter, spilledRecordsCounter, null, null);
+
+      if (null == combinerClass) {
+        Merger.writeFile(rIter, writer, reporter, jobConf);
+      } else {
+        combineCollector.setWriter(writer);
+        combineAndSpill(rIter, reduceCombineInputCounter);
+      }
+      writer.close();
+
+      // keep this for final merge
+      manager.closeOnHDFSFile(outputPath);
+
+      LOG.info(taskAttemptId + " Merge of the " + noInMemorySegments
+          + " files in-memory complete."
+          + " Local file is " + outputPath + " of size "
+          + remoteFs.getFileStatus(outputPath).getLen()
+          + " cost time " + (System.currentTimeMillis() - start) + " ms");
+    } catch (IOException e) {
+      // make sure that we delete the ondisk file that we created
+      // earlier when we invoked cloneFileAttributes
+      remoteFs.delete(outputPath, true);
+      throw e;
+    }
+
+  }
+
+  private void combineAndSpill(
+      RawKeyValueIterator kvIter,
+      Counters.Counter inCounter) throws IOException {
+    JobConf job = jobConf;
+    Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
+    Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
+    Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
+    RawComparator<K> comparator =
+        (RawComparator<K>) job.getCombinerKeyGroupingComparator();
+    try {
+      Task.CombineValuesIterator values = new Task.CombineValuesIterator(
+          kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
+          inCounter);
+      while (values.more()) {
+        combiner.reduce(values.getKey(), values, combineCollector,
+            Reporter.NULL);
+        values.nextKey();
+      }
+    } finally {
+      combiner.close();
+    }
+  }
+
+  private long createInMemorySegments(
+      List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
+      List<Merger.Segment<K, V>> inMemorySegments) throws IOException {
+    long totalSize = 0L;
+    // We could use fullSize could come from the RamManager, but files can be
+    // closed but not yet present in inMemoryMapOutputs
+    long fullSize = 0L;
+    for (InMemoryMapOutput<K,V> mo : inMemoryMapOutputs) {
+      fullSize += mo.getMemory().length;
+    }
+    while (fullSize > 0) {
+      InMemoryMapOutput<K,V> mo = inMemoryMapOutputs.remove(0);
+      byte[] data = mo.getMemory();
+      long size = data.length;
+      totalSize += size;
+      fullSize -= size;
+      IFile.Reader<K,V> reader = new InMemoryReader<K,V>(manager,
+          mo.getMapId(), data, 0, (int)size, jobConf);
+      inMemorySegments.add(new Merger.Segment<K,V>(reader, true, mergedMapOutputsCounter));
+    }
+    return totalSize;
+  }
+}
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java
new file mode 100644
index 0000000..4fe38b2
--- /dev/null
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.TreeSet;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapred.Merger;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progress;
+
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.storage.util.ShuffleStorageUtils;
+
+public class RssRemoteMergeManagerImpl<K, V> extends MergeManagerImpl<K, V> {
+
+  private static final Log LOG = LogFactory.getLog(RssRemoteMergeManagerImpl.class);
+
+  private final String appId;
+  private final TaskAttemptID reduceId;
+
+  private final JobConf jobConf;
+
+  Set<InMemoryMapOutput<K, V>> inMemoryMapOutputs =
+      new TreeSet<InMemoryMapOutput<K,V>>(new MapOutput.MapOutputComparator<K, V>());
+  private final RssInMemoryRemoteMerger<K, V> inMemoryMerger;
+
+  /**
+   * Spilled segments on HDFS
+   */
+  Set<Path> onHDFSMapOutputs = new TreeSet<Path>();
+
+
+  @VisibleForTesting
+  final long memoryLimit;
+
+  private long usedMemory;
+  private long commitMemory;
+
+  private final long mergeThreshold;
+
+  private final Reporter reporter;
+  private final ExceptionReporter exceptionReporter;
+
+  /**
+   * Combiner class to run during in-memory merge, if defined.
+   */
+  private final Class<? extends Reducer> combinerClass;
+
+  /**
+   * Resettable collector used for combine.
+   */
+  private final Task.CombineOutputCollector<K,V> combineCollector;
+
+  private final Counters.Counter spilledRecordsCounter;
+
+  private final Counters.Counter reduceCombineInputCounter;
+
+  private final Counters.Counter mergedMapOutputsCounter;
+
+  private final CompressionCodec codec;
+
+  private final Progress mergePhase;
+
+  private String basePath;
+  private FileSystem remoteFS;
+
+  public RssRemoteMergeManagerImpl(String appId, TaskAttemptID reduceId, JobConf jobConf,
+                              String basePath,
+                              int replication,
+                              int retries,
+                              FileSystem localFS,
+                              LocalDirAllocator localDirAllocator,
+                              Reporter reporter,
+                              CompressionCodec codec,
+                              Class<? extends Reducer> combinerClass,
+                              Task.CombineOutputCollector<K,V> combineCollector,
+                              Counters.Counter spilledRecordsCounter,
+                              Counters.Counter reduceCombineInputCounter,
+                              Counters.Counter mergedMapOutputsCounter,
+                              ExceptionReporter exceptionReporter,
+                              Progress mergePhase,
+                              MapOutputFile mapOutputFile,
+                              JobConf remoteConf) {
+    super(reduceId,  jobConf,
+        localFS,
+        localDirAllocator,
+        reporter,
+        codec,
+        combinerClass,
+        combineCollector,
+        spilledRecordsCounter,
+        reduceCombineInputCounter,
+        mergedMapOutputsCounter,
+        exceptionReporter,
+        mergePhase, mapOutputFile);
+
+    this.appId = appId;
+    this.reduceId = reduceId;
+    this.jobConf = jobConf;
+    this.exceptionReporter = exceptionReporter;
+    this.mergePhase = mergePhase;
+
+    this.reporter = reporter;
+    this.codec = codec;
+    this.combinerClass = combinerClass;
+    this.combineCollector = combineCollector;
+    this.reduceCombineInputCounter = reduceCombineInputCounter;
+    this.spilledRecordsCounter = spilledRecordsCounter;
+    this.mergedMapOutputsCounter = mergedMapOutputsCounter;
+
+    try {
+      remoteConf.setInt("dfs.replication", replication);
+      remoteConf.setInt("dfs.client.block.write.retries", retries); // origin=3
+      this.remoteFS = ShuffleStorageUtils.getFileSystemForPath(new Path(basePath), remoteConf);
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot init remoteFS on path:" + basePath);
+    }
+
+    this.basePath = basePath;
+
+    final float maxInMemCopyUse =
+        jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT,
+          MRJobConfig.DEFAULT_SHUFFLE_INPUT_BUFFER_PERCENT);
+    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+      throw new IllegalArgumentException("Invalid value for "
+          + MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": "
+          + maxInMemCopyUse);
+    }
+
+    // Allow unit tests to fix Runtime memory
+    this.memoryLimit = (long)(jobConf.getLong(
+      MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
+      Runtime.getRuntime().maxMemory()) * maxInMemCopyUse);
+
+    this.usedMemory = 0L;
+    this.commitMemory = 0L;
+
+    this.mergeThreshold = (long)(this.memoryLimit
+      * jobConf.getFloat(
+        MRJobConfig.SHUFFLE_MERGE_PERCENT,
+        MRJobConfig.DEFAULT_SHUFFLE_MERGE_PERCENT));
+    LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", "
+        + "mergeThreshold=" + mergeThreshold);
+
+    this.inMemoryMerger = createRssInMemoryMerger();
+    this.inMemoryMerger.start();
+  }
+
+
+  protected RssInMemoryRemoteMerger<K, V> createRssInMemoryMerger() {
+    return new RssInMemoryRemoteMerger<K, V>(
+      this,
+      jobConf,
+      remoteFS,
+        new Path(basePath, appId),
+      reduceId.toString(),
+      codec,
+      reporter,
+      spilledRecordsCounter,
+      combinerClass,
+      exceptionReporter,
+      combineCollector,
+      reduceCombineInputCounter,
+      mergedMapOutputsCounter
+    );
+  }
+
+  @Override
+  public void waitForResource() throws InterruptedException {
+    inMemoryMerger.waitForMerge();
+  }
+
+  @Override
+  public synchronized MapOutput<K, V> reserve(TaskAttemptID mapId,
+                                 long requestedSize,
+                                 int fetcher) throws IOException {
+    // we disable OnDisk MapOutput to avoid merging disk immediate data
+    if (usedMemory > memoryLimit) {
+      LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
+          + ") is greater than memoryLimit (" + memoryLimit + ")."
+          + " CommitMemory is (" + commitMemory + ")");
+      return null;
+    }
+
+    // Allow the in-memory shuffle to progress
+    LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
+        + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
+        + "CommitMemory is (" + commitMemory + ")");
+    usedMemory += requestedSize;
+    // use this rss merger as the callback
+    return new InMemoryMapOutput<K,V>(jobConf, mapId, this, (int)requestedSize, codec, true);
+  }
+
+  @Override
+  synchronized void unreserve(long size) {
+    usedMemory -= size;
+  }
+
+  @Override
+  public synchronized void closeInMemoryFile(InMemoryMapOutput<K,V> mapOutput) {
+    inMemoryMapOutputs.add(mapOutput);
+    LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
+        + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
+        + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
+
+    commitMemory += mapOutput.getSize();
+    // Can hang if mergeThreshold is really low.
+    if (commitMemory >= mergeThreshold) {
+      LOG.info("Starting inMemoryMerger's merge since commitMemory="
+          + commitMemory + " > mergeThreshold=" + mergeThreshold
+          + ". Current usedMemory=" + usedMemory);
+      inMemoryMergedMapOutputs.clear();
+      inMemoryMerger.startMerge(inMemoryMapOutputs);
+      commitMemory = 0L;  // Reset commitMemory.
+    }
+    // we disable memToMemMerger to simplify design
+  }
+
+  public synchronized void closeOnHDFSFile(Path file) {
+    // many in-memory segments have been commit to HDFS
+    onHDFSMapOutputs.add(file);
+  }
+
+  @Override
+  public synchronized void closeInMemoryMergedFile(InMemoryMapOutput<K,V> mapOutput) {
+    throw new IllegalStateException("closeInMemoryMergedFile is unsupported for rss merger");
+  }
+
+  @Override
+  public synchronized void closeOnDiskFile(CompressAwarePath file) {
+    throw new IllegalStateException("closeOnDiskFile is unsupported for rss merger");
+  }
+
+  @Override
+  public RawKeyValueIterator close() throws Throwable {
+    // Wait for on-going merges to complete
+    inMemoryMerger.startMerge(inMemoryMapOutputs);
+    inMemoryMerger.close();
+    if (!inMemoryMapOutputs.isEmpty()) {
+      throw new RssException("InMemoryMapOutputs should be empty");
+    }
+    return finalMerge();
+  }
+
+  // Read HDFS segments for reduce
+  private RawKeyValueIterator finalMerge() throws IOException {
+    Class<K> keyClass = (Class<K>)jobConf.getMapOutputKeyClass();
+    Class<V> valueClass = (Class<V>)jobConf.getMapOutputValueClass();
+    final RawComparator<K> comparator =
+        (RawComparator<K>)jobConf.getOutputKeyComparator();
+    // We will only merge sort once time
+    return Merger.merge(jobConf, remoteFS, keyClass, valueClass, codec,
+        onHDFSMapOutputs.toArray(new Path[onHDFSMapOutputs.size()]), true, Integer.MAX_VALUE,
+        new Path("reduceId"), comparator, reporter, spilledRecordsCounter,
+        null, null, null);
+  }
+}
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
index 8f3efd3..1d30df9 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
@@ -98,7 +98,6 @@ public class RssShuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionR
     this.copyPhase = context.getCopyPhase();
     this.taskStatus = context.getStatus();
     this.reduceTask = context.getReduceTask();
-    this.merger = createMergeManager(context);
 
     // rss init
     this.appId = RssMRUtils.getApplicationAttemptId().toString();
@@ -122,17 +121,44 @@ public class RssShuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionR
             RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE));
     String remoteStorageConf = RssMRUtils.getString(rssJobConf, mrJobConf, RssMRConfig.RSS_REMOTE_STORAGE_CONF, "");
     this.remoteStorageInfo = new RemoteStorageInfo(basePath, remoteStorageConf);
+    this.merger = createMergeManager(context);
   }
 
   protected MergeManager<K, V> createMergeManager(
       ShuffleConsumerPlugin.Context context) {
-    return new MergeManagerImpl<K, V>(reduceId, mrJobConf, context.getLocalFS(),
+    boolean useRemoteSpill = RssMRUtils.getBoolean(rssJobConf, mrJobConf,
+        RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED, RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT);
+    if (useRemoteSpill) {
+      // Use minimized replica, because spilled data can be recomputed by reduce task.
+      // Instead, we use more retries on HDFS client.
+      int replication = RssMRUtils.getInt(rssJobConf, mrJobConf,
+          RssMRConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION,
+          RssMRConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION_DEFAULT);
+      int retries = RssMRUtils.getInt(rssJobConf, mrJobConf,
+          RssMRConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES,
+          RssMRConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT);
+      return new RssRemoteMergeManagerImpl(appId, reduceId, mrJobConf,
+        basePath,
+        replication,
+        retries,
+        context.getLocalFS(),
+        context.getLocalDirAllocator(), reporter, context.getCodec(),
+        context.getCombinerClass(), context.getCombineCollector(),
+        context.getSpilledRecordsCounter(),
+        context.getReduceCombineInputCounter(),
+        context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
+        context.getMapOutputFile(),
+        getRemoteConf()
+      );
+    } else {
+      return new MergeManagerImpl<K, V>(reduceId, mrJobConf, context.getLocalFS(),
         context.getLocalDirAllocator(), reporter, context.getCodec(),
         context.getCombinerClass(), context.getCombineCollector(),
         context.getSpilledRecordsCounter(),
         context.getReduceCombineInputCounter(),
         context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
         context.getMapOutputFile());
+    }
   }
 
   @Override
@@ -164,12 +190,7 @@ public class RssShuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionR
     if (!taskIdBitmap.isEmpty()) {
       LOG.info("In reduce: " + reduceId
           + ", Rss MR client starts to fetch blocks from RSS server");
-      JobConf readerJobConf = new JobConf((mrJobConf));
-      if (!remoteStorageInfo.isEmpty()) {
-        for (Map.Entry<String, String> entry : remoteStorageInfo.getConfItems().entrySet()) {
-          readerJobConf.set(entry.getKey(), entry.getValue());
-        }
-      }
+      JobConf readerJobConf = getRemoteConf();
       CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest(
           appId, 0, reduceId.getTaskID().getId(), storageType, basePath, indexReadLimit, readBufferSize,
           partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, serverInfoList,
@@ -208,6 +229,16 @@ public class RssShuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionR
     return kvIter;
   }
 
+  private JobConf getRemoteConf() {
+    JobConf readerJobConf = new JobConf((mrJobConf));
+    if (!remoteStorageInfo.isEmpty()) {
+      for (Map.Entry<String, String> entry : remoteStorageInfo.getConfItems().entrySet()) {
+        readerJobConf.set(entry.getKey(), entry.getValue());
+      }
+    }
+    return readerJobConf;
+  }
+
   @Override
   public void close() {
   }
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index e15351b..e163eec 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -78,6 +78,7 @@ import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.storage.util.StorageType;
 
 public class RssMRAppMaster extends MRAppMaster {
 
@@ -199,6 +200,27 @@ public class RssMRAppMaster extends MRAppMaster {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
+      // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
+      if (conf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+          RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
+
+        if (remoteStorage.isEmpty()) {
+          throw new IllegalArgumentException("Remote spill only supports "
+            + StorageType.MEMORY_LOCALFILE_HDFS.name() + " mode with " + remoteStorage);
+        }
+
+        // When remote spill is enabled, reduce task is more easy to crash.
+        // We allow more attempts to avoid recomputing job.
+        int originalAttempts = conf.getInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 4);
+        int inc = conf.getInt(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC,
+            RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC_DEFAULT);
+        if (inc < 0) {
+          throw new IllegalArgumentException(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC
+              + " cannot be negative");
+        }
+        conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, originalAttempts + inc);
+      }
+
       LOG.info("Start to register shuffle");
       long start = System.currentTimeMillis();
       serverToPartitionRanges.entrySet().forEach(entry -> {
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMergerTest.java b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMergerTest.java
new file mode 100644
index 0000000..1fc133b
--- /dev/null
+++ b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMergerTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.IFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MROutputFiles;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.Progress;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RssInMemoryRemoteMergerTest {
+
+  @Test
+  public void mergerTest() throws IOException {
+    JobConf jobConf = new JobConf();
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
+    File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+    JobID jobId = new JobID("a", 0);
+    TaskAttemptID mapId1 = new TaskAttemptID(
+        new TaskID(jobId, TaskType.MAP, 1), 0);
+    TaskAttemptID mapId2 = new TaskAttemptID(
+        new TaskID(jobId, TaskType.MAP, 2), 0);
+    TaskAttemptID reduceId1 = new TaskAttemptID(
+        new TaskID(jobId, TaskType.REDUCE, 0), 0);
+    RssRemoteMergeManagerImpl<Text, Text> mergeManager = new RssRemoteMergeManagerImpl<Text, Text>(
+        "app", reduceId1, jobConf, tmpDir.toString(),  1,5,  fs, lda, Reporter.NULL,
+      null, null, null, null, null,
+        null, null, new Progress(), new MROutputFiles(), new JobConf());
+
+    // write map outputs
+    Map<String, String> map1 = new TreeMap<String, String>();
+    map1.put("apple", "disgusting");
+    map1.put("carrot", "delicious");
+    Map<String, String> map2 = new TreeMap<String, String>();
+    map2.put("banana", "pretty good");
+    byte[] mapOutputBytes1 = writeMapOutput(jobConf, map1);
+    byte[] mapOutputBytes2 = writeMapOutput(jobConf, map2);
+    InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>(
+        jobConf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
+    InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>(
+        jobConf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
+    System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
+        mapOutputBytes1.length);
+    System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
+        mapOutputBytes2.length);
+
+    Path spillPath = new Path("test");
+    MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
+        new RssInMemoryRemoteMerger<Text, Text>(mergeManager, jobConf, fs, spillPath,
+            "test", null, Reporter.NULL, null,
+            null, null, null, null, null);
+    List<InMemoryMapOutput<Text, Text>> mapOutputs1 =
+        new ArrayList<InMemoryMapOutput<Text, Text>>();
+    mapOutputs1.add(mapOutput1);
+    mapOutputs1.add(mapOutput2);
+
+    String filePath = "spill" + Path.SEPARATOR + "test" + Path.SEPARATOR + mapId1;
+    Path mergePath = new Path(spillPath, filePath);
+    inMemoryMerger.merge(mapOutputs1);
+    assertTrue(fs.exists(mergePath));
+    List<String> keys = Lists.newArrayList();
+    List<String> values = Lists.newArrayList();
+    readOnDiskMapOutput(jobConf, fs, mergePath, keys, values);
+    fs.delete(spillPath, true);
+    List<String> actualKeys = Lists.newArrayList("apple", "banana", "carrot");
+    List<String> actualValues = Lists.newArrayList("disgusting", "pretty good", "delicious");
+    for (int i = 0; i < 3; i++) {
+      assertEquals(keys.get(i), actualKeys.get(i));
+      assertEquals(values.get(i), actualValues.get(i));
+    }
+  }
+
+  private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues)
+      throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
+    IFile.Writer<Text, Text> writer = new IFile.Writer<Text, Text>(conf, fsdos,
+        Text.class, Text.class, null, null);
+    for (String key : keysToValues.keySet()) {
+      String value = keysToValues.get(key);
+      writer.append(new Text(key), new Text(value));
+    }
+    writer.close();
+    return baos.toByteArray();
+  }
+
+  private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
+                                   List<String> keys, List<String> values) throws IOException {
+    FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
+
+    IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
+        fs.getFileStatus(path).getLen(), null, null);
+    DataInputBuffer keyBuff = new DataInputBuffer();
+    DataInputBuffer valueBuff = new DataInputBuffer();
+    Text key = new Text();
+    Text value = new Text();
+    while (reader.nextRawKey(keyBuff)) {
+      key.readFields(keyBuff);
+      keys.add(key.toString());
+      reader.nextRawValue(valueBuff);
+      value.readFields(valueBuff);
+      values.add(value.toString());
+    }
+  }
+}
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerTest.java b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerTest.java
new file mode 100644
index 0000000..122d91c
--- /dev/null
+++ b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.IFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MROutputFiles;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.Progress;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RssRemoteMergeManagerTest {
+  String appId = "app1";
+  JobID jobId = new JobID(appId, 0);
+
+  TaskAttemptID mapId1 = new TaskAttemptID(
+    new TaskID(jobId, TaskType.MAP, 1), 0);
+  TaskAttemptID mapId2 = new TaskAttemptID(
+    new TaskID(jobId, TaskType.MAP, 2), 0);
+  TaskAttemptID reduceId1 = new TaskAttemptID(
+    new TaskID(jobId, TaskType.REDUCE, 0), 0);
+  @Test
+  public void mergerTest() throws Throwable {
+    JobConf jobConf = new JobConf();
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
+
+    File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+    jobConf.set("mapreduce.reduce.memory.totalbytes", "1024");
+    jobConf.set("mapreduce.reduce.shuffle.memory.limit.percent", "0.01");
+    jobConf.set("mapreduce.reduce.shuffle.merge.percent", "0.1");
+
+    RssRemoteMergeManagerImpl<Text, Text> mergeManager = new RssRemoteMergeManagerImpl<Text, Text>(
+      appId, reduceId1, jobConf, tmpDir.toString(), 1, 5, fs, lda, Reporter.NULL,
+      null, null, null, null, null,
+      null, null, new Progress(), new MROutputFiles(), new JobConf());
+
+    // write map outputs
+    Map<String, String> map1 = new TreeMap<String, String>();
+    map1.put("apple", "disgusting");
+    map1.put("carrot", "delicious");
+    Map<String, String> map2 = new TreeMap<String, String>();
+    map2.put("banana", "pretty good");
+    byte[] mapOutputBytes1 = writeMapOutput(jobConf, map1);
+    byte[] mapOutputBytes2 = writeMapOutput(jobConf, map2);
+    InMemoryMapOutput mapOutput1 = (InMemoryMapOutput)mergeManager.reserve(mapId1, mapOutputBytes1.length, 0);
+    InMemoryMapOutput mapOutput2 = (InMemoryMapOutput)mergeManager.reserve(mapId2, mapOutputBytes2.length, 0);
+    System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
+      mapOutputBytes1.length);
+    System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
+      mapOutputBytes2.length);
+    mapOutput1.commit();
+    mapOutput2.commit();
+
+    RawKeyValueIterator iterator = mergeManager.close();
+
+    File[] mergedFiles = new File(tmpDir + Path.SEPARATOR + appId + Path.SEPARATOR
+      + "spill" + Path.SEPARATOR + "attempt_app1_0000_r_000000_0").listFiles();
+
+    assertEquals(mergedFiles.length, 1);
+
+    List<String> keys = Lists.newArrayList();
+    List<String> values = Lists.newArrayList();
+    readOnDiskMapOutput(jobConf, fs, new Path(mergedFiles[0].toString()), keys, values);
+    List<String> actualKeys = Lists.newArrayList("apple", "banana", "carrot");
+    List<String> actualValues = Lists.newArrayList("disgusting", "pretty good", "delicious");
+    for (int i = 0; i < 3; i++) {
+      assertEquals(keys.get(i), actualKeys.get(i));
+      assertEquals(values.get(i), actualValues.get(i));
+
+      // test final returned values
+      iterator.next();
+      byte[] key = new byte[iterator.getKey().getLength()];
+      byte[] value = new byte[iterator.getValue().getLength()];
+      System.arraycopy(iterator.getKey().getData(), 0, key, 0, key.length);
+      System.arraycopy(iterator.getValue().getData(), 0, value, 0, value.length);
+      assertEquals(new Text(key).toString().trim(), actualKeys.get(i));
+      assertEquals(new Text(value).toString().trim(), actualValues.get(i));
+    }
+  }
+
+  private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues)
+    throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
+    IFile.Writer<Text, Text> writer = new IFile.Writer<Text, Text>(conf, fsdos,
+      Text.class, Text.class, null, null);
+    for (String key : keysToValues.keySet()) {
+      String value = keysToValues.get(key);
+      writer.append(new Text(key), new Text(value));
+    }
+    writer.close();
+    return baos.toByteArray();
+  }
+
+  private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
+                                   List<String> keys, List<String> values) throws IOException {
+    FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
+
+    IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
+      fs.getFileStatus(path).getLen(), null, null);
+    DataInputBuffer keyBuff = new DataInputBuffer();
+    DataInputBuffer valueBuff = new DataInputBuffer();
+    Text key = new Text();
+    Text value = new Text();
+    while (reader.nextRawKey(keyBuff)) {
+      key.readFields(keyBuff);
+      keys.add(key.toString());
+      reader.nextRawValue(valueBuff);
+      value.readFields(valueBuff);
+      values.add(value.toString());
+    }
+  }
+
+}
diff --git a/docs/client_guide.md b/docs/client_guide.md
index ba26f3c..14b914d 100644
--- a/docs/client_guide.md
+++ b/docs/client_guide.md
@@ -161,3 +161,21 @@ The important configuration is listed as following.
 |---|---|---|
 |mapreduce.rss.client.max.buffer.size|3k|The max buffer size in map side|
 |mapreduce.rss.client.batch.trigger.num|50|The max batch of buffers to send data in map side|
+
+
+
+
+### Remote Spill (Experimental)
+
+In cloud environment, VM may have very limited disk space and performance.
+This experimental feature allows reduce tasks to spill data to remote storage (e.g., hdfs)
+
+|Property Name|Default|Description|
+|---|---|---|
+|mapreduce.rss.reduce.remote.spill.enable|false|Whether to use remote spill|
+|mapreduce.rss.reduce.remote.spill.attempt.inc|1|Increase reduce attempts as hdfs is easier to crash than disk|
+|mapreduce.rss.reduce.remote.spill.replication|1|The replication number to spill data to hdfs|
+|mapreduce.rss.reduce.remote.spill.retries|5|The retry number to spill data to hdfs|
+
+Notice: this feature requires the MEMORY_LOCAL_HDFS mode.
+ 
\ No newline at end of file
diff --git a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
index 5958932..3170fd3 100644
--- a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
+++ b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
@@ -101,6 +101,12 @@ public class MRIntegrationTestBase extends IntegrationTestBase {
     runRssApp(appConf);
     String rssPath = appConf.get("mapreduce.output.fileoutputformat.outputdir");
     verifyResults(originPath, rssPath);
+
+    appConf = new JobConf(mrYarnCluster.getConfig());
+    appConf.set("mapreduce.rss.reduce.remote.spill.enable", "true");
+    runRssApp(appConf);
+    String rssRemoteSpillPath = appConf.get("mapreduce.output.fileoutputformat.outputdir");
+    verifyResults(originPath, rssRemoteSpillPath);
   }
 
   private void updateCommonConfiguration(Configuration jobConf) {
@@ -126,6 +132,8 @@ public class MRIntegrationTestBase extends IntegrationTestBase {
     jobConf.setInt(MRJobConfig.IO_SORT_MB, 128);
     jobConf.set(MRJobConfig.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, "org.apache.hadoop.mapred.RssMapOutputCollector");
     jobConf.set(MRConfig.SHUFFLE_CONSUMER_PLUGIN, "org.apache.hadoop.mapreduce.task.reduce.RssShuffle");
+    jobConf.set(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED, "true");
+
     File file = new File(parentPath, "client-mr/target/shaded");
     File[] jars = file.listFiles();
     File localFile = null;