You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/05/19 06:30:36 UTC

[incubator-uniffle] branch master updated: [#715] fix(mr): The container does not exit because shuffleclient is not closed (#882)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 762df540 [#715] fix(mr): The container does not exit because shuffleclient is not closed (#882)
762df540 is described below

commit 762df540f2d947c29178fd943fae6d31c3dcd042
Author: zhaobing <34...@users.noreply.github.com>
AuthorDate: Fri May 19 14:30:29 2023 +0800

    [#715] fix(mr): The container does not exit because shuffleclient is not closed (#882)
    
    ### What changes were proposed in this pull request?
    The container does not exit because shuffleclient is not closed
    
    ### Why are the changes needed?
    
    For #715
    
    1.The process does not exit after the maptask or reducetask execution is complete. The reason is that ShuffleWriteClient has a thread pool that does not close when the task completes. So turning off ShuffleWriteClient can solve this problem.
    
    2.How do I recreate this scene?
    Initialize a small cluster and submit an mr Task whose requested resources exceed the total resources in the cluster.
    We can see that all tasks have completed execution without quitting until the timeout time exceeds 60 seconds(mapreduce.task.exit.timeout). The appmaster requests the nodemanager to kill the corresponding container.
    
    The nodemanager logs are as follows
    `2023-03-12 13:56:45,901 INFO [AsyncDispatcher event handler] org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl: Diagnostics report from attempt_1676901654399_1653119_m_000070_0: [2023-03-12 13:56:44.909]Container killed by the ApplicationMaster.
    [2023-03-12 13:56:44.921]Sent signal OUTPUT_THREAD_DUMP (SIGQUIT) to pid 45556 as user tc_infra for container container_e304_1676901654399_1653119_01_000072, result=success
    [2023-03-12 13:56:44.985]Container killed on request. Exit code is 143
    [2023-03-12 13:56:45.403]Container exited with a non-zero exit code 143.
    `
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    existing UTs.
    
    Co-authored-by: zhaobing <zh...@zhihu.com>
---
 .../main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/client-mr/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java b/client-mr/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
index 47b33728..6bd30981 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.ByteUnit;
@@ -52,6 +53,7 @@ public class RssMapOutputCollector<K extends Object, V extends Object>
   private Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
   private int partitions;
   private SortWriteBufferManager bufferManager;
+  private ShuffleWriteClient shuffleClient;
 
   @Override
   public void init(Context context) throws IOException, ClassNotFoundException {
@@ -107,6 +109,7 @@ public class RssMapOutputCollector<K extends Object, V extends Object>
         RssMRConfig.RSS_CLIENT_DEFAULT_SEND_THREAD_NUM);
     long maxBufferSize = RssMRUtils.getLong(rssJobConf, mrJobConf, RssMRConfig.RSS_WRITER_BUFFER_SIZE,
         RssMRConfig.RSS_WRITER_BUFFER_SIZE_DEFAULT_VALUE);
+    shuffleClient = RssMRUtils.createShuffleClient(mrJobConf);
     bufferManager = new SortWriteBufferManager(
         (long)(ByteUnit.MiB.toBytes(sortmb) * sortThreshold),
         taskAttemptId,
@@ -116,7 +119,7 @@ public class RssMapOutputCollector<K extends Object, V extends Object>
         comparator,
         memoryThreshold,
         appId,
-        RssMRUtils.createShuffleClient(mrJobConf),
+        shuffleClient,
         sendCheckInterval,
         sendCheckTimeout,
         partitionToServers,
@@ -180,6 +183,7 @@ public class RssMapOutputCollector<K extends Object, V extends Object>
   public void close() throws IOException, InterruptedException {
     reporter.progress();
     bufferManager.freeAllResources();
+    shuffleClient.close();
   }
 
   @Override