You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by GitBox <gi...@apache.org> on 2022/07/14 13:40:33 UTC

[GitHub] [incubator-uniffle] frankliee opened a new pull request, #55: [Feature][MR] Support remote spill

frankliee opened a new pull request, #55:
URL: https://github.com/apache/incubator-uniffle/pull/55

   ### What changes were proposed in this pull request?
   Rewrite Mapreduce's MergerManager to spill sorted segments to HDFS, 
   It returns a merge-sorted iterator to read these HDFS segments.
    
   ### Why are the changes needed?
   In cloud, machines may have very limited disk space and performance.
   This PR allows to spill data to remote storage (e.g., hdfs)
   
   ### Does this PR introduce _any_ user-facing change?
   Yes. 
   rss.reduce.remote.spill.enable (default false)
   
   ### How was this patch tested?
   New UT and IT with remote spill.
   
   Co-authored-by: roryqi <ro...@tencent.com>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#discussion_r921777938


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -199,6 +200,24 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
+      // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
+      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,

Review Comment:
   Why do we get the value from extraConf?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#discussion_r921886503


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -201,21 +201,22 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
       // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
-      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+      if (conf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
         RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
 
-        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()) {
+        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()
+            || remoteStorage.isEmpty()) {

Review Comment:
   I think storageType check is also required to avoid illegal status.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#discussion_r921899951


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -201,21 +201,22 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
       // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
-      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+      if (conf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
         RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
 
-        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()) {
+        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()
+            || remoteStorage.isEmpty()) {

Review Comment:
   Look at method `requireRemoteStorage`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#discussion_r921211301


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -199,6 +200,24 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
+      // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
+      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+        RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
+
+        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()) {
+          throw new IllegalArgumentException("Remote spill only supports "
+            + StorageType.MEMORY_LOCALFILE_HDFS.name() + " mode");
+        }
+
+        // Use minimized replica, because spilled data can be recomputed by reduce task.
+        // And task re-computation is much cheaper job re-computation
+        conf.setInt("dfs.replication", 1);
+
+        // When remote spill is enabled, reduce task is more easy to crash.
+        // We allow more attempts to avoid recomputing job.
+        conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 5);

Review Comment:
   It's odd.  The logic is wrong.



##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -199,6 +200,24 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
+      // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
+      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+        RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
+
+        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()) {
+          throw new IllegalArgumentException("Remote spill only supports "
+            + StorageType.MEMORY_LOCALFILE_HDFS.name() + " mode");
+        }
+
+        // Use minimized replica, because spilled data can be recomputed by reduce task.
+        // And task re-computation is much cheaper job re-computation
+        conf.setInt("dfs.replication", 1);

Review Comment:
   We shouldn't modify the configuration. It will affect the replication of output.



##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -199,6 +200,24 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
+      // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
+      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+        RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
+
+        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()) {

Review Comment:
   Could we check check remote storage path instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#discussion_r921224873


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -199,6 +200,24 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
+      // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
+      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+        RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
+
+        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()) {
+          throw new IllegalArgumentException("Remote spill only supports "
+            + StorageType.MEMORY_LOCALFILE_HDFS.name() + " mode");
+        }
+
+        // Use minimized replica, because spilled data can be recomputed by reduce task.
+        // And task re-computation is much cheaper job re-computation
+        conf.setInt("dfs.replication", 1);
+
+        // When remote spill is enabled, reduce task is more easy to crash.
+        // We allow more attempts to avoid recomputing job.

Review Comment:
   Do you realize the config option`dfs.client.block.write.retries`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#issuecomment-1184538911

   > ### What changes were proposed in this pull request?
   > Rewrite Mapreduce's MergerManager to spill sorted segments to HDFS, It returns a merge-sorted iterator to read these HDFS segments.
   > 
   > ### Why are the changes needed?
   > In cloud, machines may have very limited disk space and performance. This PR allows to spill data to remote storage (e.g., hdfs)
   > 
   > ### Does this PR introduce _any_ user-facing change?
   > Yes. rss.reduce.remote.spill.enable (default false)
   > 
   > ### How was this patch tested?
   > New UT and IT with remote spill.
   > 
   > Co-authored-by: roryqi [roryqi@tencent.com](mailto:roryqi@tencent.com)
   
   Because this pr will introduce user-facing change. We should update doc.
   And we should supply the performance test results.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#discussion_r921220855


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -199,6 +200,24 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
+      // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
+      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+        RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
+
+        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()) {

Review Comment:
   ok



##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -199,6 +200,24 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
+      // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
+      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+        RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
+
+        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()) {
+          throw new IllegalArgumentException("Remote spill only supports "
+            + StorageType.MEMORY_LOCALFILE_HDFS.name() + " mode");
+        }
+
+        // Use minimized replica, because spilled data can be recomputed by reduce task.
+        // And task re-computation is much cheaper job re-computation
+        conf.setInt("dfs.replication", 1);

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee merged pull request #55: [Experimental Feature] MR Supports Remote Spill

Posted by GitBox <gi...@apache.org>.
frankliee merged PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#discussion_r921884448


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMerger.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.IFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Merger;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+
+public class RssInMemoryRemoteMerger<K, V> extends MergeThread<InMemoryMapOutput<K,V>, K, V> {
+  private static final Log LOG = LogFactory.getLog(RssInMemoryRemoteMerger.class);
+
+  private static final String SPILL_OUTPUT_PREFIX = "spill";
+  private final RssRemoteMergeManagerImpl<K, V> manager;
+  private final JobConf jobConf;
+  private final FileSystem remoteFs;
+  private final Path spillPath;
+  private final String taskAttemptId;
+  private final CompressionCodec codec;
+  private final Progressable reporter;
+  private final Counters.Counter spilledRecordsCounter;
+  private final Class<? extends Reducer> combinerClass;
+  private final Task.CombineOutputCollector<K,V> combineCollector;
+  private final Counters.Counter reduceCombineInputCounter;
+  private final Counters.Counter mergedMapOutputsCounter;
+
+  public RssInMemoryRemoteMerger(
+      RssRemoteMergeManagerImpl<K, V> manager,
+      JobConf jobConf,
+      FileSystem remoteFs,
+      Path spillPath,
+      String taskId,
+      CompressionCodec codec,
+      Progressable reporter,
+      Counters.Counter spilledRecordsCounter,
+      Class<? extends Reducer> combinerClass,
+      ExceptionReporter exceptionReporter,
+      Task.CombineOutputCollector<K,V> combineCollector,
+      Counters.Counter reduceCombineInputCounter,
+      Counters.Counter mergedMapOutputsCounter) {
+    super(manager, Integer.MAX_VALUE, exceptionReporter);
+    this.setName("RssInMemoryMerger - Thread to merge in-memory map-outputs");
+    this.setDaemon(true);
+    this.manager = manager;
+    this.jobConf = jobConf;
+    this.remoteFs = remoteFs;
+    this.spillPath = spillPath;
+    this.taskAttemptId = taskId;
+    this.codec = codec;
+    this.reporter = reporter;
+    this.spilledRecordsCounter = spilledRecordsCounter;
+    this.combinerClass = combinerClass;
+    this.combineCollector = combineCollector;
+    this.reduceCombineInputCounter = reduceCombineInputCounter;
+    this.mergedMapOutputsCounter = mergedMapOutputsCounter;
+  }
+
+  @Override
+  public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
+    if (inputs == null || inputs.size() == 0) {
+      return;
+    }
+
+    long start = System.currentTimeMillis();
+    TaskAttemptID mapId = inputs.get(0).getMapId();
+
+    List<Merger.Segment<K, V>> inMemorySegments = new ArrayList<Merger.Segment<K, V>>();
+    createInMemorySegments(inputs, inMemorySegments);
+    int noInMemorySegments = inMemorySegments.size();
+
+    String filePath = SPILL_OUTPUT_PREFIX + Path.SEPARATOR + taskAttemptId + Path.SEPARATOR + mapId;
+    Path outputPath = new Path(spillPath, filePath);
+
+    FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, remoteFs.create(outputPath));
+    IFile.Writer<K, V> writer = new IFile.Writer<K, V>(jobConf, out,
+        (Class<K>) jobConf.getMapOutputKeyClass(),
+        (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
+
+    RawKeyValueIterator rIter = null;
+    try {
+      LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments...");
+
+      // tmpDir won't be used. tmpDir is used for onDiskMerger
+      rIter = Merger.merge(jobConf, remoteFs,
+          (Class<K>)jobConf.getMapOutputKeyClass(),
+          (Class<V>)jobConf.getMapOutputValueClass(),
+          inMemorySegments, inMemorySegments.size(),
+          new Path(taskAttemptId),
+          (RawComparator<K>)jobConf.getOutputKeyComparator(),
+          reporter, spilledRecordsCounter, null, null);
+
+      if (null == combinerClass) {
+        Merger.writeFile(rIter, writer, reporter, jobConf);
+      } else {
+        combineCollector.setWriter(writer);
+        combineAndSpill(rIter, reduceCombineInputCounter);
+      }
+      writer.close();
+
+      // keep this for final merge
+      manager.closeOnHDFSFile(outputPath);
+
+      LOG.info(taskAttemptId + " Merge of the " + noInMemorySegments
+          + " files in-memory complete."
+          + " Local file is " + outputPath + " of size "
+          + remoteFs.getFileStatus(outputPath).getLen()
+          + " cost time " + (System.currentTimeMillis() - start) + " ms");
+    } catch (IOException e) {
+      //make sure that we delete the ondisk file that we created

Review Comment:
   Add a blank ater `//`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#discussion_r921972791


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java:
##########
@@ -140,6 +141,14 @@ public RssRemoteMergeManagerImpl(String appId, TaskAttemptID reduceId, JobConf j
     this.mergedMapOutputsCounter = mergedMapOutputsCounter;
 
     try {
+      // Use minimized replica, because spilled data can be recomputed by reduce task.
+      // Instead, we use more retries on HDFS client.
+      int replication = remoteConf.getInt(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION,

Review Comment:
   Fixed



##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -201,21 +201,22 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
       // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
-      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+      if (conf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
         RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
 
-        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()) {
+        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()
+            || remoteStorage.isEmpty()) {

Review Comment:
   cleaned



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#issuecomment-1185349035

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
frankliee commented on PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#issuecomment-1185361183

   > > ### What changes were proposed in this pull request?
   > > Rewrite Mapreduce's MergerManager to spill sorted segments to HDFS, It returns a merge-sorted iterator to read these HDFS segments.
   > > ### Why are the changes needed?
   > > In cloud, machines may have very limited disk space and performance. This PR allows to spill data to remote storage (e.g., hdfs)
   > > ### Does this PR introduce _any_ user-facing change?
   > > Yes. rss.reduce.remote.spill.enable (default false)
   > > ### How was this patch tested?
   > > New UT and IT with remote spill.
   > > Co-authored-by: roryqi [roryqi@tencent.com](mailto:roryqi@tencent.com)
   > 
   > update your description and document. This pr introduce another configuration option.
   
   Doc is updated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#issuecomment-1185333925

   > ### What changes were proposed in this pull request?
   > Rewrite Mapreduce's MergerManager to spill sorted segments to HDFS, It returns a merge-sorted iterator to read these HDFS segments.
   > 
   > ### Why are the changes needed?
   > In cloud, machines may have very limited disk space and performance. This PR allows to spill data to remote storage (e.g., hdfs)
   > 
   > ### Does this PR introduce _any_ user-facing change?
   > Yes. rss.reduce.remote.spill.enable (default false)
   > 
   > ### How was this patch tested?
   > New UT and IT with remote spill.
   > 
   > Co-authored-by: roryqi [roryqi@tencent.com](mailto:roryqi@tencent.com)
   
   update your description and document. This pr introduce another configuration option.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#discussion_r921972365


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -199,6 +200,24 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
+      // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
+      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+        RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
+
+        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()) {
+          throw new IllegalArgumentException("Remote spill only supports "
+            + StorageType.MEMORY_LOCALFILE_HDFS.name() + " mode");
+        }
+
+        // Use minimized replica, because spilled data can be recomputed by reduce task.
+        // And task re-computation is much cheaper job re-computation
+        conf.setInt("dfs.replication", 1);
+
+        // When remote spill is enabled, reduce task is more easy to crash.
+        // We allow more attempts to avoid recomputing job.

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#discussion_r921861163


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -201,21 +201,22 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
       // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
-      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+      if (conf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
         RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
 
-        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()) {
+        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()
+            || remoteStorage.isEmpty()) {
           throw new IllegalArgumentException("Remote spill only supports "
-            + StorageType.MEMORY_LOCALFILE_HDFS.name() + " mode");
+            + StorageType.MEMORY_LOCALFILE_HDFS.name() + " mode with " + remoteStorage);
         }
 
-        // Use minimized replica, because spilled data can be recomputed by reduce task.
-        // And task re-computation is much cheaper job re-computation
-        conf.setInt("dfs.replication", 1);
 
         // When remote spill is enabled, reduce task is more easy to crash.
         // We allow more attempts to avoid recomputing job.
-        conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 5);
+        int originalAttempts = conf.getInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 4);
+        int inc = conf.getInt(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC,

Review Comment:
   checkValue >= 0



##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -201,21 +201,22 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
       // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
-      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+      if (conf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
         RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
 
-        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()) {
+        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()
+            || remoteStorage.isEmpty()) {

Review Comment:
   ```
   if (remoteStorage.isEmpty())
   ```
   we can look at the fetchRemoteStorage internal implement.
   



##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java:
##########
@@ -140,6 +141,14 @@ public RssRemoteMergeManagerImpl(String appId, TaskAttemptID reduceId, JobConf j
     this.mergedMapOutputsCounter = mergedMapOutputsCounter;
 
     try {
+      // Use minimized replica, because spilled data can be recomputed by reduce task.
+      // Instead, we use more retries on HDFS client.
+      int replication = remoteConf.getInt(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION,

Review Comment:
   There seems to be logic error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#discussion_r921870100


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java:
##########
@@ -140,6 +141,14 @@ public RssRemoteMergeManagerImpl(String appId, TaskAttemptID reduceId, JobConf j
     this.mergedMapOutputsCounter = mergedMapOutputsCounter;
 
     try {
+      // Use minimized replica, because spilled data can be recomputed by reduce task.
+      // Instead, we use more retries on HDFS client.
+      int replication = remoteConf.getInt(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION,

Review Comment:
   remoteConf only contains jobConf, doesn't contain rssConf.



##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java:
##########
@@ -140,6 +141,14 @@ public RssRemoteMergeManagerImpl(String appId, TaskAttemptID reduceId, JobConf j
     this.mergedMapOutputsCounter = mergedMapOutputsCounter;
 
     try {
+      // Use minimized replica, because spilled data can be recomputed by reduce task.
+      // Instead, we use more retries on HDFS client.
+      int replication = remoteConf.getInt(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION,

Review Comment:
   RemoteConf only contains jobConf, doesn't contain rssConf.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#discussion_r921891731


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -201,21 +201,22 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
       // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
-      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+      if (conf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
         RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
 
-        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()) {
+        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()
+            || remoteStorage.isEmpty()) {
           throw new IllegalArgumentException("Remote spill only supports "
-            + StorageType.MEMORY_LOCALFILE_HDFS.name() + " mode");
+            + StorageType.MEMORY_LOCALFILE_HDFS.name() + " mode with " + remoteStorage);
         }
 
-        // Use minimized replica, because spilled data can be recomputed by reduce task.
-        // And task re-computation is much cheaper job re-computation
-        conf.setInt("dfs.replication", 1);
 
         // When remote spill is enabled, reduce task is more easy to crash.
         // We allow more attempts to avoid recomputing job.
-        conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 5);
+        int originalAttempts = conf.getInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 4);
+        int inc = conf.getInt(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC,

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] codecov-commenter commented on pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#issuecomment-1184482497

   # [Codecov](https://codecov.io/gh/apache/incubator-uniffle/pull/55?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#55](https://codecov.io/gh/apache/incubator-uniffle/pull/55?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2066002) into [master](https://codecov.io/gh/apache/incubator-uniffle/commit/aa02ee66c2f76a84430b9f9c0ef3b94ab83e30ea?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (aa02ee6) will **increase** coverage by `0.65%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master      #55      +/-   ##
   ============================================
   + Coverage     54.89%   55.55%   +0.65%     
   + Complexity     1092      990     -102     
   ============================================
     Files           146      135      -11     
     Lines          7775     6736    -1039     
     Branches        749      647     -102     
   ============================================
   - Hits           4268     3742     -526     
   + Misses         3270     2782     -488     
   + Partials        237      212      -25     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-uniffle/pull/55?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../java/org/apache/hadoop/mapreduce/RssMRConfig.java](https://codecov.io/gh/apache/incubator-uniffle/pull/55/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkdWNlL1Jzc01SQ29uZmlnLmphdmE=) | | |
   | [...n/java/org/apache/hadoop/mapreduce/RssMRUtils.java](https://codecov.io/gh/apache/incubator-uniffle/pull/55/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkdWNlL1Jzc01SVXRpbHMuamF2YQ==) | | |
   | [...pache/hadoop/mapreduce/task/reduce/RssShuffle.java](https://codecov.io/gh/apache/incubator-uniffle/pull/55/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkdWNlL3Rhc2svcmVkdWNlL1Jzc1NodWZmbGUuamF2YQ==) | | |
   | [...apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java](https://codecov.io/gh/apache/incubator-uniffle/pull/55/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkdWNlL3YyL2FwcC9Sc3NNUkFwcE1hc3Rlci5qYXZh) | | |
   | [.../hadoop/mapreduce/task/reduce/RssEventFetcher.java](https://codecov.io/gh/apache/incubator-uniffle/pull/55/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkdWNlL3Rhc2svcmVkdWNlL1Jzc0V2ZW50RmV0Y2hlci5qYXZh) | | |
   | [...pache/hadoop/mapreduce/task/reduce/RssFetcher.java](https://codecov.io/gh/apache/incubator-uniffle/pull/55/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkdWNlL3Rhc2svcmVkdWNlL1Jzc0ZldGNoZXIuamF2YQ==) | | |
   | [...g/apache/hadoop/mapred/SortWriteBufferManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/55/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkL1NvcnRXcml0ZUJ1ZmZlck1hbmFnZXIuamF2YQ==) | | |
   | [...rg/apache/hadoop/mapred/RssMapOutputCollector.java](https://codecov.io/gh/apache/incubator-uniffle/pull/55/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkL1Jzc01hcE91dHB1dENvbGxlY3Rvci5qYXZh) | | |
   | [.../hadoop/mapreduce/task/reduce/RssBypassWriter.java](https://codecov.io/gh/apache/incubator-uniffle/pull/55/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkdWNlL3Rhc2svcmVkdWNlL1Jzc0J5cGFzc1dyaXRlci5qYXZh) | | |
   | [...n/java/org/apache/hadoop/mapreduce/MRIdHelper.java](https://codecov.io/gh/apache/incubator-uniffle/pull/55/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkdWNlL01SSWRIZWxwZXIuamF2YQ==) | | |
   | ... and [1 more](https://codecov.io/gh/apache/incubator-uniffle/pull/55/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-uniffle/pull/55?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-uniffle/pull/55?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [aa02ee6...2066002](https://codecov.io/gh/apache/incubator-uniffle/pull/55?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#discussion_r921224873


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -199,6 +200,24 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
+      // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
+      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+        RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
+
+        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()) {
+          throw new IllegalArgumentException("Remote spill only supports "
+            + StorageType.MEMORY_LOCALFILE_HDFS.name() + " mode");
+        }
+
+        // Use minimized replica, because spilled data can be recomputed by reduce task.
+        // And task re-computation is much cheaper job re-computation
+        conf.setInt("dfs.replication", 1);
+
+        // When remote spill is enabled, reduce task is more easy to crash.
+        // We allow more attempts to avoid recomputing job.

Review Comment:
   Could you realize the config option`dfs.client.block.write.retries`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#discussion_r921240030


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -199,6 +200,24 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
+      // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
+      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,
+        RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) {
+
+        if (storageType != StorageType.MEMORY_LOCALFILE_HDFS.name()) {
+          throw new IllegalArgumentException("Remote spill only supports "
+            + StorageType.MEMORY_LOCALFILE_HDFS.name() + " mode");
+        }
+
+        // Use minimized replica, because spilled data can be recomputed by reduce task.
+        // And task re-computation is much cheaper job re-computation
+        conf.setInt("dfs.replication", 1);
+
+        // When remote spill is enabled, reduce task is more easy to crash.
+        // We allow more attempts to avoid recomputing job.
+        conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 5);

Review Comment:
   We should add a configuration option, the option tell us how many times attempts that we should increase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#discussion_r921787069


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java:
##########
@@ -199,6 +200,24 @@ public Thread newThread(Runnable r) {
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_PATH, remoteStorage.getPath());
       extraConf.set(RssMRConfig.RSS_REMOTE_STORAGE_CONF, remoteStorage.getConfString());
 
+      // When containers have disk with very limited space, reduce is allowed to spill data to hdfs
+      if (extraConf.getBoolean(RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED,

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #55: [Feature][MR] Support remote spill

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #55:
URL: https://github.com/apache/incubator-uniffle/pull/55#discussion_r921886904


##########
client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssInMemoryRemoteMerger.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.IFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Merger;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+
+public class RssInMemoryRemoteMerger<K, V> extends MergeThread<InMemoryMapOutput<K,V>, K, V> {
+  private static final Log LOG = LogFactory.getLog(RssInMemoryRemoteMerger.class);
+
+  private static final String SPILL_OUTPUT_PREFIX = "spill";
+  private final RssRemoteMergeManagerImpl<K, V> manager;
+  private final JobConf jobConf;
+  private final FileSystem remoteFs;
+  private final Path spillPath;
+  private final String taskAttemptId;
+  private final CompressionCodec codec;
+  private final Progressable reporter;
+  private final Counters.Counter spilledRecordsCounter;
+  private final Class<? extends Reducer> combinerClass;
+  private final Task.CombineOutputCollector<K,V> combineCollector;
+  private final Counters.Counter reduceCombineInputCounter;
+  private final Counters.Counter mergedMapOutputsCounter;
+
+  public RssInMemoryRemoteMerger(
+      RssRemoteMergeManagerImpl<K, V> manager,
+      JobConf jobConf,
+      FileSystem remoteFs,
+      Path spillPath,
+      String taskId,
+      CompressionCodec codec,
+      Progressable reporter,
+      Counters.Counter spilledRecordsCounter,
+      Class<? extends Reducer> combinerClass,
+      ExceptionReporter exceptionReporter,
+      Task.CombineOutputCollector<K,V> combineCollector,
+      Counters.Counter reduceCombineInputCounter,
+      Counters.Counter mergedMapOutputsCounter) {
+    super(manager, Integer.MAX_VALUE, exceptionReporter);
+    this.setName("RssInMemoryMerger - Thread to merge in-memory map-outputs");
+    this.setDaemon(true);
+    this.manager = manager;
+    this.jobConf = jobConf;
+    this.remoteFs = remoteFs;
+    this.spillPath = spillPath;
+    this.taskAttemptId = taskId;
+    this.codec = codec;
+    this.reporter = reporter;
+    this.spilledRecordsCounter = spilledRecordsCounter;
+    this.combinerClass = combinerClass;
+    this.combineCollector = combineCollector;
+    this.reduceCombineInputCounter = reduceCombineInputCounter;
+    this.mergedMapOutputsCounter = mergedMapOutputsCounter;
+  }
+
+  @Override
+  public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
+    if (inputs == null || inputs.size() == 0) {
+      return;
+    }
+
+    long start = System.currentTimeMillis();
+    TaskAttemptID mapId = inputs.get(0).getMapId();
+
+    List<Merger.Segment<K, V>> inMemorySegments = new ArrayList<Merger.Segment<K, V>>();
+    createInMemorySegments(inputs, inMemorySegments);
+    int noInMemorySegments = inMemorySegments.size();
+
+    String filePath = SPILL_OUTPUT_PREFIX + Path.SEPARATOR + taskAttemptId + Path.SEPARATOR + mapId;
+    Path outputPath = new Path(spillPath, filePath);
+
+    FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, remoteFs.create(outputPath));
+    IFile.Writer<K, V> writer = new IFile.Writer<K, V>(jobConf, out,
+        (Class<K>) jobConf.getMapOutputKeyClass(),
+        (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
+
+    RawKeyValueIterator rIter = null;
+    try {
+      LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments...");
+
+      // tmpDir won't be used. tmpDir is used for onDiskMerger
+      rIter = Merger.merge(jobConf, remoteFs,
+          (Class<K>)jobConf.getMapOutputKeyClass(),
+          (Class<V>)jobConf.getMapOutputValueClass(),
+          inMemorySegments, inMemorySegments.size(),
+          new Path(taskAttemptId),
+          (RawComparator<K>)jobConf.getOutputKeyComparator(),
+          reporter, spilledRecordsCounter, null, null);
+
+      if (null == combinerClass) {
+        Merger.writeFile(rIter, writer, reporter, jobConf);
+      } else {
+        combineCollector.setWriter(writer);
+        combineAndSpill(rIter, reduceCombineInputCounter);
+      }
+      writer.close();
+
+      // keep this for final merge
+      manager.closeOnHDFSFile(outputPath);
+
+      LOG.info(taskAttemptId + " Merge of the " + noInMemorySegments
+          + " files in-memory complete."
+          + " Local file is " + outputPath + " of size "
+          + remoteFs.getFileStatus(outputPath).getLen()
+          + " cost time " + (System.currentTimeMillis() - start) + " ms");
+    } catch (IOException e) {
+      //make sure that we delete the ondisk file that we created

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org