You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zh...@apache.org on 2022/07/15 09:41:12 UTC
[incubator-uniffle] branch master updated: [Experimental Feature] MR Supports Remote Spill (#55)
This is an automated email from the ASF dual-hosted git repository.
zhifgli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new f4ce2ed [Experimental Feature] MR Supports Remote Spill (#55)
f4ce2ed is described below
commit f4ce2edd2d4e3355a6f1e12f0e7f712964fe83b4
Author: frankliee <fr...@tencent.com>
AuthorDate: Fri Jul 15 17:41:08 2022 +0800
[Experimental Feature] MR Supports Remote Spill (#55)
### What changes were proposed in this pull request?
Rewrite Mapreduce's MergerManager to spill sorted segments to HDFS,
It returns a merge-sorted iterator to read these HDFS segments.
### Why are the changes needed?
In cloud, machines may have very limited disk space and performance.
This PR allows to spill data to remote storage (e.g., hdfs)
### Does this PR introduce _any_ user-facing change?
Yes.
|Property Name|Default|Description|
|---|---|---|
|mapreduce.rss.reduce.remote.spill.enable|false|Whether to use remote spill|
|mapreduce.rss.reduce.remote.spill.attempt.inc|1|Increase reduce attempts as hdfs is easier to crash than disk|
|mapreduce.rss.reduce.remote.spill.replication|1|The replication number to spill data to hdfs|
|mapreduce.rss.reduce.remote.spill.retries|5|The retry number to spill data to hdfs|
### How was this patch tested?
New UT and IT with remote spill.
Co-authored-by: roryqi <ro...@tencent.com>
---
.../org/apache/hadoop/mapreduce/RssMRConfig.java | 14 +
.../org/apache/hadoop/mapreduce/RssMRUtils.java | 4 +
.../task/reduce/RssInMemoryRemoteMerger.java | 197 ++++++++++++++
.../task/reduce/RssRemoteMergeManagerImpl.java | 290 +++++++++++++++++++++
.../hadoop/mapreduce/task/reduce/RssShuffle.java | 47 +++-
.../hadoop/mapreduce/v2/app/RssMRAppMaster.java | 22 ++
.../task/reduce/RssInMemoryRemoteMergerTest.java | 151 +++++++++++
.../task/reduce/RssRemoteMergeManagerTest.java | 157 +++++++++++
docs/client_guide.md | 18 ++
.../apache/uniffle/test/MRIntegrationTestBase.java | 8 +
10 files changed, 900 insertions(+), 8 deletions(-)
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
index 1dfb673..b1b9507 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
@@ -97,6 +97,20 @@ public class RssMRConfig {
public static final long RSS_CLIENT_DEFAULT_MAX_SEGMENT_SIZE = 3 * 1024;
public static final String RSS_STORAGE_TYPE = MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_STORAGE_TYPE;
+ public static final String RSS_REDUCE_REMOTE_SPILL_ENABLED = MR_RSS_CONFIG_PREFIX
+ + "rss.reduce.remote.spill.enable";
+ public static final boolean RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT = false;
+ public static final String RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC = MR_RSS_CONFIG_PREFIX
+ + "rss.reduce.remote.spill.attempt.inc";
+ public static final int RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC_DEFAULT = 1;
+ public static final String RSS_REDUCE_REMOTE_SPILL_REPLICATION = MR_RSS_CONFIG_PREFIX
+ + "rss.reduce.remote.spill.replication";
+ public static final int RSS_REDUCE_REMOTE_SPILL_REPLICATION_DEFAULT = 1;
+ public static final String RSS_REDUCE_REMOTE_SPILL_RETRIES = MR_RSS_CONFIG_PREFIX
+ + "rss.reduce.remote.spill.retries";
+ public static final int RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT = 5;
+
+
public static final String RSS_PARTITION_NUM_PER_RANGE =
MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_PARTITION_NUM_PER_RANGE;
public static final int RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE =
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
index f2e7984..684a5ec 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
@@ -156,6 +156,10 @@ public class RssMRUtils {
return rssJobConf.getLong(key, mrJobConf.getLong(key, defaultValue));
}
+ public static boolean getBoolean(JobConf rssJobConf, JobConf mrJobConf, String key, boolean defaultValue) {
+ return rssJobConf.getBoolean(key, mrJobConf.getBoolean(key, defaultValue));
+ }
+
public static double getDouble(JobConf rssJobConf, JobConf mrJobConf, String key, double defaultValue) {
return rssJobConf.getDouble(key, mrJobConf.getDouble(key, defaultValue));
}
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMerger.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMerger.java
new file mode 100644
index 0000000..445a56f
--- /dev/null
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMerger.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.IFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Merger;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+
+public class RssInMemoryRemoteMerger<K, V> extends MergeThread<InMemoryMapOutput<K,V>, K, V> {
+ private static final Log LOG = LogFactory.getLog(RssInMemoryRemoteMerger.class);
+
+ private static final String SPILL_OUTPUT_PREFIX = "spill";
+ private final RssRemoteMergeManagerImpl<K, V> manager;
+ private final JobConf jobConf;
+ private final FileSystem remoteFs;
+ private final Path spillPath;
+ private final String taskAttemptId;
+ private final CompressionCodec codec;
+ private final Progressable reporter;
+ private final Counters.Counter spilledRecordsCounter;
+ private final Class<? extends Reducer> combinerClass;
+ private final Task.CombineOutputCollector<K,V> combineCollector;
+ private final Counters.Counter reduceCombineInputCounter;
+ private final Counters.Counter mergedMapOutputsCounter;
+
+ public RssInMemoryRemoteMerger(
+ RssRemoteMergeManagerImpl<K, V> manager,
+ JobConf jobConf,
+ FileSystem remoteFs,
+ Path spillPath,
+ String taskId,
+ CompressionCodec codec,
+ Progressable reporter,
+ Counters.Counter spilledRecordsCounter,
+ Class<? extends Reducer> combinerClass,
+ ExceptionReporter exceptionReporter,
+ Task.CombineOutputCollector<K,V> combineCollector,
+ Counters.Counter reduceCombineInputCounter,
+ Counters.Counter mergedMapOutputsCounter) {
+ super(manager, Integer.MAX_VALUE, exceptionReporter);
+ this.setName("RssInMemoryMerger - Thread to merge in-memory map-outputs");
+ this.setDaemon(true);
+ this.manager = manager;
+ this.jobConf = jobConf;
+ this.remoteFs = remoteFs;
+ this.spillPath = spillPath;
+ this.taskAttemptId = taskId;
+ this.codec = codec;
+ this.reporter = reporter;
+ this.spilledRecordsCounter = spilledRecordsCounter;
+ this.combinerClass = combinerClass;
+ this.combineCollector = combineCollector;
+ this.reduceCombineInputCounter = reduceCombineInputCounter;
+ this.mergedMapOutputsCounter = mergedMapOutputsCounter;
+ }
+
+ @Override
+ public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
+ if (inputs == null || inputs.size() == 0) {
+ return;
+ }
+
+ long start = System.currentTimeMillis();
+ TaskAttemptID mapId = inputs.get(0).getMapId();
+
+ List<Merger.Segment<K, V>> inMemorySegments = new ArrayList<Merger.Segment<K, V>>();
+ createInMemorySegments(inputs, inMemorySegments);
+ int noInMemorySegments = inMemorySegments.size();
+
+ String filePath = SPILL_OUTPUT_PREFIX + Path.SEPARATOR + taskAttemptId + Path.SEPARATOR + mapId;
+ Path outputPath = new Path(spillPath, filePath);
+
+ FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, remoteFs.create(outputPath));
+ IFile.Writer<K, V> writer = new IFile.Writer<K, V>(jobConf, out,
+ (Class<K>) jobConf.getMapOutputKeyClass(),
+ (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
+
+ RawKeyValueIterator rIter = null;
+ try {
+ LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments...");
+
+ // tmpDir won't be used. tmpDir is used for onDiskMerger
+ rIter = Merger.merge(jobConf, remoteFs,
+ (Class<K>)jobConf.getMapOutputKeyClass(),
+ (Class<V>)jobConf.getMapOutputValueClass(),
+ inMemorySegments, inMemorySegments.size(),
+ new Path(taskAttemptId),
+ (RawComparator<K>)jobConf.getOutputKeyComparator(),
+ reporter, spilledRecordsCounter, null, null);
+
+ if (null == combinerClass) {
+ Merger.writeFile(rIter, writer, reporter, jobConf);
+ } else {
+ combineCollector.setWriter(writer);
+ combineAndSpill(rIter, reduceCombineInputCounter);
+ }
+ writer.close();
+
+ // keep this for final merge
+ manager.closeOnHDFSFile(outputPath);
+
+ LOG.info(taskAttemptId + " Merge of the " + noInMemorySegments
+ + " files in-memory complete."
+ + " Local file is " + outputPath + " of size "
+ + remoteFs.getFileStatus(outputPath).getLen()
+ + " cost time " + (System.currentTimeMillis() - start) + " ms");
+ } catch (IOException e) {
+ // make sure that we delete the ondisk file that we created
+ // earlier when we invoked cloneFileAttributes
+ remoteFs.delete(outputPath, true);
+ throw e;
+ }
+
+ }
+
+ private void combineAndSpill(
+ RawKeyValueIterator kvIter,
+ Counters.Counter inCounter) throws IOException {
+ JobConf job = jobConf;
+ Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
+ Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
+ Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
+ RawComparator<K> comparator =
+ (RawComparator<K>) job.getCombinerKeyGroupingComparator();
+ try {
+ Task.CombineValuesIterator values = new Task.CombineValuesIterator(
+ kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
+ inCounter);
+ while (values.more()) {
+ combiner.reduce(values.getKey(), values, combineCollector,
+ Reporter.NULL);
+ values.nextKey();
+ }
+ } finally {
+ combiner.close();
+ }
+ }
+
+ private long createInMemorySegments(
+ List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
+ List<Merger.Segment<K, V>> inMemorySegments) throws IOException {
+ long totalSize = 0L;
+ // We could use fullSize could come from the RamManager, but files can be
+ // closed but not yet present in inMemoryMapOutputs
+ long fullSize = 0L;
+ for (InMemoryMapOutput<K,V> mo : inMemoryMapOutputs) {
+ fullSize += mo.getMemory().length;
+ }
+ while (fullSize > 0) {
+ InMemoryMapOutput<K,V> mo = inMemoryMapOutputs.remove(0);
+ byte[] data = mo.getMemory();
+ long size = data.length;
+ totalSize += size;
+ fullSize -= size;
+ IFile.Reader<K,V> reader = new InMemoryReader<K,V>(manager,
+ mo.getMapId(), data, 0, (int)size, jobConf);
+ inMemorySegments.add(new Merger.Segment<K,V>(reader, true, mergedMapOutputsCounter));
+ }
+ return totalSize;
+ }
+}
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java
new file mode 100644
index 0000000..4fe38b2
--- /dev/null
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.TreeSet;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapred.Merger;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progress;
+
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.storage.util.ShuffleStorageUtils;
+
+public class RssRemoteMergeManagerImpl<K, V> extends MergeManagerImpl<K, V> {
+
+ private static final Log LOG = LogFactory.getLog(RssRemoteMergeManagerImpl.class);
+
+ private final String appId;
+ private final TaskAttemptID reduceId;
+
+ private final JobConf jobConf;
+
+ Set<InMemoryMapOutput<K, V>> inMemoryMapOutputs =
+ new TreeSet<InMemoryMapOutput<K,V>>(new MapOutput.MapOutputComparator<K, V>());
+ private final RssInMemoryRemoteMerger<K, V> inMemoryMerger;
+
+ /**
+ * Spilled segments on HDFS
+ */
+ Set<Path> onHDFSMapOutputs = new TreeSet<Path>();
+
+
+ @VisibleForTesting
+ final long memoryLimit;
+
+ private long usedMemory;
+ private long commitMemory;
+
+ private final long mergeThreshold;
+
+ private final Reporter reporter;
+ private final ExceptionReporter exceptionReporter;
+
+ /**
+ * Combiner class to run during in-memory merge, if defined.
+ */
+ private final Class<? extends Reducer> combinerClass;
+
+ /**
+ * Resettable collector used for combine.
+ */
+ private final Task.CombineOutputCollector<K,V> combineCollector;
+
+ private final Counters.Counter spilledRecordsCounter;
+
+ private final Counters.Counter reduceCombineInputCounter;
+
+ private final Counters.Counter mergedMapOutputsCounter;
+
+ private final CompressionCodec codec;
+
+ private final Progress mergePhase;
+
+ private String basePath;
+ private FileSystem remoteFS;
+
+ public RssRemoteMergeManagerImpl(String appId, TaskAttemptID reduceId, JobConf jobConf,
+ String basePath,
+ int replication,
+ int retries,
+ FileSystem localFS,
+ LocalDirAllocator localDirAllocator,
+ Reporter reporter,
+ CompressionCodec codec,
+ Class<? extends Reducer> combinerClass,
+ Task.CombineOutputCollector<K,V> combineCollector,
+ Counters.Counter spilledRecordsCounter,
+ Counters.Counter reduceCombineInputCounter,
+ Counters.Counter mergedMapOutputsCounter,
+ ExceptionReporter exceptionReporter,
+ Progress mergePhase,
+ MapOutputFile mapOutputFile,
+ JobConf remoteConf) {
+ super(reduceId, jobConf,
+ localFS,
+ localDirAllocator,
+ reporter,
+ codec,
+ combinerClass,
+ combineCollector,
+ spilledRecordsCounter,
+ reduceCombineInputCounter,
+ mergedMapOutputsCounter,
+ exceptionReporter,
+ mergePhase, mapOutputFile);
+
+ this.appId = appId;
+ this.reduceId = reduceId;
+ this.jobConf = jobConf;
+ this.exceptionReporter = exceptionReporter;
+ this.mergePhase = mergePhase;
+
+ this.reporter = reporter;
+ this.codec = codec;
+ this.combinerClass = combinerClass;
+ this.combineCollector = combineCollector;
+ this.reduceCombineInputCounter = reduceCombineInputCounter;
+ this.spilledRecordsCounter = spilledRecordsCounter;
+ this.mergedMapOutputsCounter = mergedMapOutputsCounter;
+
+ try {
+ remoteConf.setInt("dfs.replication", replication);
+ remoteConf.setInt("dfs.client.block.write.retries", retries); // origin=3
+ this.remoteFS = ShuffleStorageUtils.getFileSystemForPath(new Path(basePath), remoteConf);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot init remoteFS on path:" + basePath);
+ }
+
+ this.basePath = basePath;
+
+ final float maxInMemCopyUse =
+ jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT,
+ MRJobConfig.DEFAULT_SHUFFLE_INPUT_BUFFER_PERCENT);
+ if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+ throw new IllegalArgumentException("Invalid value for "
+ + MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": "
+ + maxInMemCopyUse);
+ }
+
+ // Allow unit tests to fix Runtime memory
+ this.memoryLimit = (long)(jobConf.getLong(
+ MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
+ Runtime.getRuntime().maxMemory()) * maxInMemCopyUse);
+
+ this.usedMemory = 0L;
+ this.commitMemory = 0L;
+
+ this.mergeThreshold = (long)(this.memoryLimit
+ * jobConf.getFloat(
+ MRJobConfig.SHUFFLE_MERGE_PERCENT,
+ MRJobConfig.DEFAULT_SHUFFLE_MERGE_PERCENT));
+ LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", "
+ + "mergeThreshold=" + mergeThreshold);
+
+ this.inMemoryMerger = createRssInMemoryMerger();
+ this.inMemoryMerger.start();
+ }
+
+
+ protected RssInMemoryRemoteMerger<K, V> createRssInMemoryMerger() {
+ return new RssInMemoryRemoteMerger<K, V>(
+ this,
+ jobConf,
+ remoteFS,
+ new Path(basePath, appId),
+ reduceId.toString(),
+ codec,
+ reporter,
+ spilledRecordsCounter,
+ combinerClass,
+ exceptionReporter,
+ combineCollector,
+ reduceCombineInputCounter,
+ mergedMapOutputsCounter
+ );
+ }
+
+ @Override
+ public void waitForResource() throws InterruptedException {
+ inMemoryMerger.waitForMerge();
+ }
+
+ @Override
+ public synchronized MapOutput<K, V> reserve(TaskAttemptID mapId,
+ long requestedSize,
+ int fetcher) throws IOException {
+ // we disable OnDisk MapOutput to avoid merging disk immediate data
+ if (usedMemory > memoryLimit) {
+ LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
+ + ") is greater than memoryLimit (" + memoryLimit + ")."
+ + " CommitMemory is (" + commitMemory + ")");
+ return null;
+ }
+
+ // Allow the in-memory shuffle to progress
+ LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
+ + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
+ + "CommitMemory is (" + commitMemory + ")");
+ usedMemory += requestedSize;
+ // use this rss merger as the callback
+ return new InMemoryMapOutput<K,V>(jobConf, mapId, this, (int)requestedSize, codec, true);
+ }
+
+ @Override
+ synchronized void unreserve(long size) {
+ usedMemory -= size;
+ }
+
+ @Override
+ public synchronized void closeInMemoryFile(InMemoryMapOutput<K,V> mapOutput) {
+ inMemoryMapOutputs.add(mapOutput);
+ LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
+ + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
+ + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
+
+ commitMemory += mapOutput.getSize();
+ // Can hang if mergeThreshold is really low.
+ if (commitMemory >= mergeThreshold) {
+ LOG.info("Starting inMemoryMerger's merge since commitMemory="
+ + commitMemory + " > mergeThreshold=" + mergeThreshold
+ + ". Current usedMemory=" + usedMemory);
+ inMemoryMergedMapOutputs.clear();
+ inMemoryMerger.startMerge(inMemoryMapOutputs);
+ commitMemory = 0L; // Reset commitMemory.
+ }
+ // we disable memToMemMerger to simplify design
+ }
+
+ public synchronized void closeOnHDFSFile(Path file) {
+ // many in-memory segments have been commit to HDFS
+ onHDFSMapOutputs.add(file);
+ }
+
+ @Override
+ public synchronized void closeInMemoryMergedFile(InMemoryMapOutput<K,V> mapOutput) {
+ throw new IllegalStateException("closeInMemoryMergedFile is unsupported for rss merger");
+ }
+
+ @Override
+ public synchronized void closeOnDiskFile(CompressAwarePath file) {
+ throw new IllegalStateException("closeOnDiskFile is unsupported for rss merger");
+ }
+
+ @Override
+ public RawKeyValueIterator close() throws Throwable {
+ // Wait for on-going merges to complete
+ inMemoryMerger.startMerge(inMemoryMapOutputs);
+ inMemoryMerger.close();
+ if (!inMemoryMapOutputs.isEmpty()) {
+ throw new RssException("InMemoryMapOutputs should be empty");
+ }
+ return finalMerge();
+ }
+
+ // Read HDFS segments for reduce
+ private RawKeyValueIterator finalMerge() throws IOException {
+ Class<K> keyClass = (Class<K>)jobConf.getMapOutputKeyClass();
+ Class<V> valueClass = (Class<V>)jobConf.getMapOutputValueClass();
+ final RawComparator<K> comparator =
+ (RawComparator<K>)jobConf.getOutputKeyComparator();
+ // We will only merge sort once time
+ return Merger.merge(jobConf, remoteFS, keyClass, valueClass, codec,
+ onHDFSMapOutputs.toArray(new Path[onHDFSMapOutputs.size()]), true, Integer.MAX_VALUE,
+ new Path("reduceId"), comparator, reporter, spilledRecordsCounter,
+ null, null, null);
+ }
+}
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
index 8f3efd3..1d30df9 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java
@@ -98,7 +98,6 @@ public class RssShuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionR
this.copyPhase = context.getCopyPhase();
this.taskStatus = context.getStatus();
this.reduceTask = context.getReduceTask();
- this.merger = createMergeManager(context);
// rss init
this.appId = RssMRUtils.getApplicationAttemptId().toString();
@@ -122,17 +121,44 @@ public class RssShuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionR
RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE));
String remoteStorageConf = RssMRUtils.getString(rssJobConf, mrJobConf, RssMRConfig.RSS_REMOTE_STORAGE_CONF, "");
this.remoteStorageInfo = new RemoteStorageInfo(basePath, remoteStorageConf);
+ this.merger = createMergeManager(context);
}
protected MergeManager<K, V> createMergeManager(
ShuffleConsumerPlugin.Context context) {
- return new MergeManagerImpl<K, V>(reduceId, mrJobConf, context.getLocalFS(),
+ boolean useRemoteSpill = RssMRUtils.getBoolean(rssJobConf, mrJobConf,
+ RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED, RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT);
+ if (useRemoteSpill) {
+ // Use minimized replica, because spilled data can be recomputed by reduce task.
+ // Instead, we use more retries on HDFS client.
+ int replication = RssMRUtils.getInt(rssJobConf, mrJobConf,
+ RssMRConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION,
+ RssMRConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION_DEFAULT);
+ int retries = RssMRUtils.getInt(rssJobConf, mrJobConf,
+ RssMRConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES,
+ RssMRConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT);
+ return new RssRemoteMergeManagerImpl(appId, reduceId, mrJobConf,
+ basePath,
+ replication,
+ retries,
+ context.getLocalFS(),
+ context.getLocalDirAllocator(), reporter, context.getCodec(),
+ context.getCombinerClass(), context.getCombineCollector(),
+ context.getSpilledRecordsCounter(),
+ context.getReduceCombineInputCounter(),
+ context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
+ context.getMapOutputFile(),
+ getRemoteConf()
+ );
+ } else {
+ return new MergeManagerImpl<K, V>(reduceId, mrJobConf, context.getLocalFS(),
context.getLocalDirAllocator(), reporter, context.getCodec(),
context.getCombinerClass(), context.getCombineCollector(),
context.getSpilledRecordsCounter(),
context.getReduceCombineInputCounter(),
context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
context.getMapOutputFile());
+ }
}
@Override
@@ -164,12 +190,7 @@ public class RssShuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionR
if (!taskIdBitmap.isEmpty()) {
LOG.info("In reduce: " + reduceId
+ ", Rss MR client starts to fetch blocks from RSS server");
- JobConf readerJobConf = new JobConf((mrJobConf));
- if (!remoteStorageInfo.isEmpty()) {
- for (Map.Entry<String, String> entry : remoteStorageInfo.getConfItems().entrySet()) {
- readerJobConf.set(entry.getKey(), entry.getValue());
- }
- }
+ JobConf readerJobConf = getRemoteConf();
CreateShuffleReadClientRequest request = new CreateShuffleReadClientRequest(
appId, 0, reduceId.getTaskID().getId(), storageType, basePath, indexReadLimit, readBufferSize,
partitionNumPerRange, partitionNum, blockIdBitmap, taskIdBitmap, serverInfoList,
@@ -208,6 +229,16 @@ public class RssShuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionR
return kvIter;
}
+ private JobConf getRemoteConf() {
+ JobConf readerJobConf = new JobConf((mrJobConf));
+ if (!remoteStorageInfo.isEmpty()) {
+ for (Map.Entry<String, String> entry : remoteStorageInfo.getConfItems().entrySet()) {
+ readerJobConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+ return readerJobConf;
+ }
+
@Override
public void close() {
}
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index e15351b..e163eec 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -78,6 +78,7 @@ import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.storage.util.StorageType;
public class RssMRAppMaster extends MRAppMaster {
@@ -199,6 +200,27 @@ public class RssMRAppMaster extends MRAppMaster {
extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
+ // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
+ if (conf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+ RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
+
+ if (remoteStorage.isEmpty()) {
+ throw new IllegalArgumentException("Remote spill only supports "
+ + StorageType.MEMORY_LOCALFILE_HDFS.name() + " mode with " + remoteStorage);
+ }
+
+ // When remote spill is enabled, reduce task is more easy to crash.
+ // We allow more attempts to avoid recomputing job.
+ int originalAttempts = conf.getInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 4);
+ int inc = conf.getInt(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC,
+ RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC_DEFAULT);
+ if (inc < 0) {
+ throw new IllegalArgumentException(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC
+ + " cannot be negative");
+ }
+ conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, originalAttempts + inc);
+ }
+
LOG.info("Start to register shuffle");
long start = System.currentTimeMillis();
serverToPartitionRanges.entrySet().forEach(entry -> {
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMergerTest.java b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMergerTest.java
new file mode 100644
index 0000000..1fc133b
--- /dev/null
+++ b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMergerTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.IFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MROutputFiles;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.Progress;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RssInMemoryRemoteMergerTest {
+
+ @Test
+ public void mergerTest() throws IOException {
+ JobConf jobConf = new JobConf();
+ FileSystem fs = FileSystem.getLocal(jobConf);
+ LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
+ File tmpDir = Files.createTempDir();
+ tmpDir.deleteOnExit();
+ JobID jobId = new JobID("a", 0);
+ TaskAttemptID mapId1 = new TaskAttemptID(
+ new TaskID(jobId, TaskType.MAP, 1), 0);
+ TaskAttemptID mapId2 = new TaskAttemptID(
+ new TaskID(jobId, TaskType.MAP, 2), 0);
+ TaskAttemptID reduceId1 = new TaskAttemptID(
+ new TaskID(jobId, TaskType.REDUCE, 0), 0);
+ RssRemoteMergeManagerImpl<Text, Text> mergeManager = new RssRemoteMergeManagerImpl<Text, Text>(
+ "app", reduceId1, jobConf, tmpDir.toString(), 1,5, fs, lda, Reporter.NULL,
+ null, null, null, null, null,
+ null, null, new Progress(), new MROutputFiles(), new JobConf());
+
+ // write map outputs
+ Map<String, String> map1 = new TreeMap<String, String>();
+ map1.put("apple", "disgusting");
+ map1.put("carrot", "delicious");
+ Map<String, String> map2 = new TreeMap<String, String>();
+ map2.put("banana", "pretty good");
+ byte[] mapOutputBytes1 = writeMapOutput(jobConf, map1);
+ byte[] mapOutputBytes2 = writeMapOutput(jobConf, map2);
+ InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>(
+ jobConf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
+ InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>(
+ jobConf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
+ System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
+ mapOutputBytes1.length);
+ System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
+ mapOutputBytes2.length);
+
+ Path spillPath = new Path("test");
+ MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
+ new RssInMemoryRemoteMerger<Text, Text>(mergeManager, jobConf, fs, spillPath,
+ "test", null, Reporter.NULL, null,
+ null, null, null, null, null);
+ List<InMemoryMapOutput<Text, Text>> mapOutputs1 =
+ new ArrayList<InMemoryMapOutput<Text, Text>>();
+ mapOutputs1.add(mapOutput1);
+ mapOutputs1.add(mapOutput2);
+
+ String filePath = "spill" + Path.SEPARATOR + "test" + Path.SEPARATOR + mapId1;
+ Path mergePath = new Path(spillPath, filePath);
+ inMemoryMerger.merge(mapOutputs1);
+ assertTrue(fs.exists(mergePath));
+ List<String> keys = Lists.newArrayList();
+ List<String> values = Lists.newArrayList();
+ readOnDiskMapOutput(jobConf, fs, mergePath, keys, values);
+ fs.delete(spillPath, true);
+ List<String> actualKeys = Lists.newArrayList("apple", "banana", "carrot");
+ List<String> actualValues = Lists.newArrayList("disgusting", "pretty good", "delicious");
+ for (int i = 0; i < 3; i++) {
+ assertEquals(keys.get(i), actualKeys.get(i));
+ assertEquals(values.get(i), actualValues.get(i));
+ }
+ }
+
+ private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues)
+ throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
+ IFile.Writer<Text, Text> writer = new IFile.Writer<Text, Text>(conf, fsdos,
+ Text.class, Text.class, null, null);
+ for (String key : keysToValues.keySet()) {
+ String value = keysToValues.get(key);
+ writer.append(new Text(key), new Text(value));
+ }
+ writer.close();
+ return baos.toByteArray();
+ }
+
+ private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
+ List<String> keys, List<String> values) throws IOException {
+ FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
+
+ IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
+ fs.getFileStatus(path).getLen(), null, null);
+ DataInputBuffer keyBuff = new DataInputBuffer();
+ DataInputBuffer valueBuff = new DataInputBuffer();
+ Text key = new Text();
+ Text value = new Text();
+ while (reader.nextRawKey(keyBuff)) {
+ key.readFields(keyBuff);
+ keys.add(key.toString());
+ reader.nextRawValue(valueBuff);
+ value.readFields(valueBuff);
+ values.add(value.toString());
+ }
+ }
+}
diff --git a/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerTest.java b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerTest.java
new file mode 100644
index 0000000..122d91c
--- /dev/null
+++ b/client-mr/src/test/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.IFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MROutputFiles;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.Progress;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RssRemoteMergeManagerTest {
+ String appId = "app1";
+ JobID jobId = new JobID(appId, 0);
+
+ TaskAttemptID mapId1 = new TaskAttemptID(
+ new TaskID(jobId, TaskType.MAP, 1), 0);
+ TaskAttemptID mapId2 = new TaskAttemptID(
+ new TaskID(jobId, TaskType.MAP, 2), 0);
+ TaskAttemptID reduceId1 = new TaskAttemptID(
+ new TaskID(jobId, TaskType.REDUCE, 0), 0);
+ @Test
+ public void mergerTest() throws Throwable {
+ JobConf jobConf = new JobConf();
+ FileSystem fs = FileSystem.getLocal(jobConf);
+ LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
+
+ File tmpDir = Files.createTempDir();
+ tmpDir.deleteOnExit();
+ jobConf.set("mapreduce.reduce.memory.totalbytes", "1024");
+ jobConf.set("mapreduce.reduce.shuffle.memory.limit.percent", "0.01");
+ jobConf.set("mapreduce.reduce.shuffle.merge.percent", "0.1");
+
+ RssRemoteMergeManagerImpl<Text, Text> mergeManager = new RssRemoteMergeManagerImpl<Text, Text>(
+ appId, reduceId1, jobConf, tmpDir.toString(), 1, 5, fs, lda, Reporter.NULL,
+ null, null, null, null, null,
+ null, null, new Progress(), new MROutputFiles(), new JobConf());
+
+ // write map outputs
+ Map<String, String> map1 = new TreeMap<String, String>();
+ map1.put("apple", "disgusting");
+ map1.put("carrot", "delicious");
+ Map<String, String> map2 = new TreeMap<String, String>();
+ map2.put("banana", "pretty good");
+ byte[] mapOutputBytes1 = writeMapOutput(jobConf, map1);
+ byte[] mapOutputBytes2 = writeMapOutput(jobConf, map2);
+ InMemoryMapOutput mapOutput1 = (InMemoryMapOutput)mergeManager.reserve(mapId1, mapOutputBytes1.length, 0);
+ InMemoryMapOutput mapOutput2 = (InMemoryMapOutput)mergeManager.reserve(mapId2, mapOutputBytes2.length, 0);
+ System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
+ mapOutputBytes1.length);
+ System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
+ mapOutputBytes2.length);
+ mapOutput1.commit();
+ mapOutput2.commit();
+
+ RawKeyValueIterator iterator = mergeManager.close();
+
+ File[] mergedFiles = new File(tmpDir + Path.SEPARATOR + appId + Path.SEPARATOR
+ + "spill" + Path.SEPARATOR + "attempt_app1_0000_r_000000_0").listFiles();
+
+ assertEquals(mergedFiles.length, 1);
+
+ List<String> keys = Lists.newArrayList();
+ List<String> values = Lists.newArrayList();
+ readOnDiskMapOutput(jobConf, fs, new Path(mergedFiles[0].toString()), keys, values);
+ List<String> actualKeys = Lists.newArrayList("apple", "banana", "carrot");
+ List<String> actualValues = Lists.newArrayList("disgusting", "pretty good", "delicious");
+ for (int i = 0; i < 3; i++) {
+ assertEquals(keys.get(i), actualKeys.get(i));
+ assertEquals(values.get(i), actualValues.get(i));
+
+ // test final returned values
+ iterator.next();
+ byte[] key = new byte[iterator.getKey().getLength()];
+ byte[] value = new byte[iterator.getValue().getLength()];
+ System.arraycopy(iterator.getKey().getData(), 0, key, 0, key.length);
+ System.arraycopy(iterator.getValue().getData(), 0, value, 0, value.length);
+ assertEquals(new Text(key).toString().trim(), actualKeys.get(i));
+ assertEquals(new Text(value).toString().trim(), actualValues.get(i));
+ }
+ }
+
+ private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues)
+ throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
+ IFile.Writer<Text, Text> writer = new IFile.Writer<Text, Text>(conf, fsdos,
+ Text.class, Text.class, null, null);
+ for (String key : keysToValues.keySet()) {
+ String value = keysToValues.get(key);
+ writer.append(new Text(key), new Text(value));
+ }
+ writer.close();
+ return baos.toByteArray();
+ }
+
+ private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
+ List<String> keys, List<String> values) throws IOException {
+ FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
+
+ IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
+ fs.getFileStatus(path).getLen(), null, null);
+ DataInputBuffer keyBuff = new DataInputBuffer();
+ DataInputBuffer valueBuff = new DataInputBuffer();
+ Text key = new Text();
+ Text value = new Text();
+ while (reader.nextRawKey(keyBuff)) {
+ key.readFields(keyBuff);
+ keys.add(key.toString());
+ reader.nextRawValue(valueBuff);
+ value.readFields(valueBuff);
+ values.add(value.toString());
+ }
+ }
+
+}
diff --git a/docs/client_guide.md b/docs/client_guide.md
index ba26f3c..14b914d 100644
--- a/docs/client_guide.md
+++ b/docs/client_guide.md
@@ -161,3 +161,21 @@ The important configuration is listed as following.
|---|---|---|
|mapreduce.rss.client.max.buffer.size|3k|The max buffer size in map side|
|mapreduce.rss.client.batch.trigger.num|50|The max batch of buffers to send data in map side|
+
+
+
+
+### Remote Spill (Experimental)
+
+In cloud environment, VM may have very limited disk space and performance.
+This experimental feature allows reduce tasks to spill data to remote storage (e.g., hdfs)
+
+|Property Name|Default|Description|
+|---|---|---|
+|mapreduce.rss.reduce.remote.spill.enable|false|Whether to use remote spill|
+|mapreduce.rss.reduce.remote.spill.attempt.inc|1|Increase reduce attempts as hdfs is easier to crash than disk|
+|mapreduce.rss.reduce.remote.spill.replication|1|The replication number to spill data to hdfs|
+|mapreduce.rss.reduce.remote.spill.retries|5|The retry number to spill data to hdfs|
+
+Notice: this feature requires the MEMORY_LOCAL_HDFS mode.
+
\ No newline at end of file
diff --git a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
index 5958932..3170fd3 100644
--- a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
+++ b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
@@ -101,6 +101,12 @@ public class MRIntegrationTestBase extends IntegrationTestBase {
runRssApp(appConf);
String rssPath = appConf.get("mapreduce.output.fileoutputformat.outputdir");
verifyResults(originPath, rssPath);
+
+ appConf = new JobConf(mrYarnCluster.getConfig());
+ appConf.set("mapreduce.rss.reduce.remote.spill.enable", "true");
+ runRssApp(appConf);
+ String rssRemoteSpillPath = appConf.get("mapreduce.output.fileoutputformat.outputdir");
+ verifyResults(originPath, rssRemoteSpillPath);
}
private void updateCommonConfiguration(Configuration jobConf) {
@@ -126,6 +132,8 @@ public class MRIntegrationTestBase extends IntegrationTestBase {
jobConf.setInt(MRJobConfig.IO_SORT_MB, 128);
jobConf.set(MRJobConfig.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, "org.apache.hadoop.mapred.RssMapOutputCollector");
jobConf.set(MRConfig.SHUFFLE_CONSUMER_PLUGIN, "org.apache.hadoop.mapreduce.task.reduce.RssShuffle");
+ jobConf.set(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED, "true");
+
File file = new File(parentPath, "client-mr/target/shaded");
File[] jars = file.listFiles();
File localFile = null;