You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by li...@apache.org on 2020/09/22 18:57:52 UTC

[hadoop] branch branch-2.10 updated: MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache. (#2319)

This is an automated email from the ASF dual-hosted git repository.

liuml07 pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new be42149  MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache. (#2319)
be42149 is described below

commit be421490dadd61c4bcbd23bdb23bed408607ff22
Author: zz <zz...@gmail.com>
AuthorDate: Tue Sep 22 11:57:36 2020 -0700

    MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache. (#2319)
    
    Contributed by Zhenzhao Wang <zh...@gmail.com>
    
    Signed-off-by: Mingliang Liu <li...@apache.org>
---
 .../hadoop/mapreduce/v2/app/job/impl/JobImpl.java  |  3 +-
 .../mapreduce/v2/app/job/impl/TestJobImpl.java     | 23 ++++++++++++
 .../main/java/org/apache/hadoop/mapreduce/Job.java | 41 ++++++++++++----------
 3 files changed, 47 insertions(+), 20 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 4995120..b688f4d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -1421,7 +1421,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
    * be set up to false. In that way, the NMs that host the task containers
    * won't try to upload the resources to shared cache.
    */
-  private static void cleanupSharedCacheUploadPolicies(Configuration conf) {
+  @VisibleForTesting
+  static void cleanupSharedCacheUploadPolicies(Configuration conf) {
     Map<String, Boolean> emap = Collections.emptyMap();
     Job.setArchiveSharedCacheUploadPolicies(conf, emap);
     Job.setFileSharedCacheUploadPolicies(conf, emap);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
index 1827ce4..d342a3f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
@@ -39,6 +39,7 @@ import java.util.concurrent.CyclicBarrier;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
@@ -989,6 +990,28 @@ public class TestJobImpl {
     Assert.assertEquals(updatedPriority, jobPriority);
   }
 
+  @Test
+  public void testCleanupSharedCacheUploadPolicies() {
+    Configuration config = new Configuration();
+    Map<String, Boolean> archivePolicies = new HashMap<>();
+    archivePolicies.put("archive1", true);
+    archivePolicies.put("archive2", true);
+    Job.setArchiveSharedCacheUploadPolicies(config, archivePolicies);
+    Map<String, Boolean> filePolicies = new HashMap<>();
+    filePolicies.put("file1", true);
+    filePolicies.put("jar1", true);
+    Job.setFileSharedCacheUploadPolicies(config, filePolicies);
+    Assert.assertEquals(
+        2, Job.getArchiveSharedCacheUploadPolicies(config).size());
+    Assert.assertEquals(
+        2, Job.getFileSharedCacheUploadPolicies(config).size());
+    JobImpl.cleanupSharedCacheUploadPolicies(config);
+    Assert.assertEquals(
+        0, Job.getArchiveSharedCacheUploadPolicies(config).size());
+    Assert.assertEquals(
+        0, Job.getFileSharedCacheUploadPolicies(config).size());
+  }
+
   private static CommitterEventHandler createCommitterEventHandler(
       Dispatcher dispatcher, OutputCommitter committer) {
     final SystemClock clock = SystemClock.getInstance();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
index 493a221..c276ec0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
@@ -1448,26 +1448,29 @@ public class Job extends JobContextImpl implements JobContext, AutoCloseable {
    */
   private static void setSharedCacheUploadPolicies(Configuration conf,
       Map<String, Boolean> policies, boolean areFiles) {
-    if (policies != null) {
-      StringBuilder sb = new StringBuilder();
-      Iterator<Map.Entry<String, Boolean>> it = policies.entrySet().iterator();
-      Map.Entry<String, Boolean> e;
-      if (it.hasNext()) {
-        e = it.next();
-        sb.append(e.getKey() + DELIM + e.getValue());
-      } else {
-        // policies is an empty map, just skip setting the parameter
-        return;
-      }
-      while (it.hasNext()) {
-        e = it.next();
-        sb.append("," + e.getKey() + DELIM + e.getValue());
-      }
-      String confParam =
-          areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
-              : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
-      conf.set(confParam, sb.toString());
+    String confParam = areFiles ?
+        MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES :
+        MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
+    // If no policy is provided, we will reset the config by setting an empty
+    // string value. In other words, cleaning up existing policies. This is
+    // useful when we try to clean up shared cache upload policies for
+    // non-application master tasks. See MAPREDUCE-7294 for details.
+    if (policies == null || policies.size() == 0) {
+      conf.set(confParam, "");
+      return;
+    }
+    StringBuilder sb = new StringBuilder();
+    Iterator<Map.Entry<String, Boolean>> it = policies.entrySet().iterator();
+    Map.Entry<String, Boolean> e;
+    if (it.hasNext()) {
+      e = it.next();
+      sb.append(e.getKey() + DELIM + e.getValue());
+    }
+    while (it.hasNext()) {
+      e = it.next();
+      sb.append("," + e.getKey() + DELIM + e.getValue());
     }
+    conf.set(confParam, sb.toString());
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org