You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by li...@apache.org on 2020/09/20 06:46:25 UTC

[hadoop] branch branch-3.2 updated: MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache (#2223)

This is an automated email from the ASF dual-hosted git repository.

liuml07 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 0873555  MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache (#2223)
0873555 is described below

commit 0873555f040e632228a476168047ae99f433d3ba
Author: zz <zz...@gmail.com>
AuthorDate: Sat Sep 19 23:10:05 2020 -0700

    MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache (#2223)
    
    Contributed by Zhenzhao Wang <zh...@gmail.com>
    
    Signed-off-by: Mingliang Liu <li...@apache.org>
---
 .../hadoop/mapreduce/v2/app/job/impl/JobImpl.java  |  3 +-
 .../mapreduce/v2/app/job/impl/TestJobImpl.java     | 23 +++++++++++++++
 .../main/java/org/apache/hadoop/mapreduce/Job.java | 33 +++++++++-------------
 3 files changed, 39 insertions(+), 20 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index d2e2492..59320b2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -1423,7 +1423,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
    * be set up to false. In that way, the NMs that host the task containers
    * won't try to upload the resources to shared cache.
    */
-  private static void cleanupSharedCacheUploadPolicies(Configuration conf) {
+  @VisibleForTesting
+  static void cleanupSharedCacheUploadPolicies(Configuration conf) {
     Job.setArchiveSharedCacheUploadPolicies(conf, Collections.emptyMap());
     Job.setFileSharedCacheUploadPolicies(conf, Collections.emptyMap());
   }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
index 1367ff6..013f74a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
@@ -39,6 +39,7 @@ import java.util.concurrent.CyclicBarrier;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
@@ -991,6 +992,28 @@ public class TestJobImpl {
     Assert.assertEquals(updatedPriority, jobPriority);
   }
 
+  @Test
+  public void testCleanupSharedCacheUploadPolicies() {
+    Configuration config = new Configuration();
+    Map<String, Boolean> archivePolicies = new HashMap<>();
+    archivePolicies.put("archive1", true);
+    archivePolicies.put("archive2", true);
+    Job.setArchiveSharedCacheUploadPolicies(config, archivePolicies);
+    Map<String, Boolean> filePolicies = new HashMap<>();
+    filePolicies.put("file1", true);
+    filePolicies.put("jar1", true);
+    Job.setFileSharedCacheUploadPolicies(config, filePolicies);
+    Assert.assertEquals(
+        2, Job.getArchiveSharedCacheUploadPolicies(config).size());
+    Assert.assertEquals(
+        2, Job.getFileSharedCacheUploadPolicies(config).size());
+    JobImpl.cleanupSharedCacheUploadPolicies(config);
+    Assert.assertEquals(
+        0, Job.getArchiveSharedCacheUploadPolicies(config).size());
+    Assert.assertEquals(
+        0, Job.getFileSharedCacheUploadPolicies(config).size());
+  }
+
   private static CommitterEventHandler createCommitterEventHandler(
       Dispatcher dispatcher, OutputCommitter committer) {
     final SystemClock clock = SystemClock.getInstance();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
index f164b62..d7fa75d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
@@ -1448,26 +1448,21 @@ public class Job extends JobContextImpl implements JobContext, AutoCloseable {
    */
   private static void setSharedCacheUploadPolicies(Configuration conf,
       Map<String, Boolean> policies, boolean areFiles) {
-    if (policies != null) {
-      StringBuilder sb = new StringBuilder();
-      Iterator<Map.Entry<String, Boolean>> it = policies.entrySet().iterator();
-      Map.Entry<String, Boolean> e;
-      if (it.hasNext()) {
-        e = it.next();
-        sb.append(e.getKey() + DELIM + e.getValue());
-      } else {
-        // policies is an empty map, just skip setting the parameter
-        return;
-      }
-      while (it.hasNext()) {
-        e = it.next();
-        sb.append("," + e.getKey() + DELIM + e.getValue());
-      }
-      String confParam =
-          areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
-              : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
-      conf.set(confParam, sb.toString());
+    String confParam = areFiles ?
+        MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES :
+        MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
+    // If no policy is provided, we will reset the config by setting an empty
+    // string value. In other words, cleaning up existing policies. This is
+    // useful when we try to clean up shared cache upload policies for
+    // non-application master tasks. See MAPREDUCE-7294 for details.
+    if (policies == null || policies.size() == 0) {
+      conf.set(confParam, "");
+      return;
     }
+    StringBuilder sb = new StringBuilder();
+    policies.forEach((k,v) -> sb.append(k).append(DELIM).append(v).append(","));
+    sb.deleteCharAt(sb.length() - 1);
+    conf.set(confParam, sb.toString());
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org