You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by li...@apache.org on 2020/09/22 18:57:52 UTC
[hadoop] branch branch-2.10 updated: MAPREDUCE-7294. Only
application master should upload resource to Yarn Shared Cache. (#2319)
This is an automated email from the ASF dual-hosted git repository.
liuml07 pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new be42149 MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache. (#2319)
be42149 is described below
commit be421490dadd61c4bcbd23bdb23bed408607ff22
Author: zz <zz...@gmail.com>
AuthorDate: Tue Sep 22 11:57:36 2020 -0700
MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache. (#2319)
Contributed by Zhenzhao Wang <zh...@gmail.com>
Signed-off-by: Mingliang Liu <li...@apache.org>
---
.../hadoop/mapreduce/v2/app/job/impl/JobImpl.java | 3 +-
.../mapreduce/v2/app/job/impl/TestJobImpl.java | 23 ++++++++++++
.../main/java/org/apache/hadoop/mapreduce/Job.java | 41 ++++++++++++----------
3 files changed, 47 insertions(+), 20 deletions(-)
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 4995120..b688f4d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -1421,7 +1421,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
* be set up to false. In that way, the NMs that host the task containers
* won't try to upload the resources to shared cache.
*/
- private static void cleanupSharedCacheUploadPolicies(Configuration conf) {
+ @VisibleForTesting
+ static void cleanupSharedCacheUploadPolicies(Configuration conf) {
Map<String, Boolean> emap = Collections.emptyMap();
Job.setArchiveSharedCacheUploadPolicies(conf, emap);
Job.setFileSharedCacheUploadPolicies(conf, emap);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
index 1827ce4..d342a3f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
@@ -39,6 +39,7 @@ import java.util.concurrent.CyclicBarrier;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
@@ -989,6 +990,28 @@ public class TestJobImpl {
Assert.assertEquals(updatedPriority, jobPriority);
}
+ @Test
+ public void testCleanupSharedCacheUploadPolicies() {
+ Configuration config = new Configuration();
+ Map<String, Boolean> archivePolicies = new HashMap<>();
+ archivePolicies.put("archive1", true);
+ archivePolicies.put("archive2", true);
+ Job.setArchiveSharedCacheUploadPolicies(config, archivePolicies);
+ Map<String, Boolean> filePolicies = new HashMap<>();
+ filePolicies.put("file1", true);
+ filePolicies.put("jar1", true);
+ Job.setFileSharedCacheUploadPolicies(config, filePolicies);
+ Assert.assertEquals(
+ 2, Job.getArchiveSharedCacheUploadPolicies(config).size());
+ Assert.assertEquals(
+ 2, Job.getFileSharedCacheUploadPolicies(config).size());
+ JobImpl.cleanupSharedCacheUploadPolicies(config);
+ Assert.assertEquals(
+ 0, Job.getArchiveSharedCacheUploadPolicies(config).size());
+ Assert.assertEquals(
+ 0, Job.getFileSharedCacheUploadPolicies(config).size());
+ }
+
private static CommitterEventHandler createCommitterEventHandler(
Dispatcher dispatcher, OutputCommitter committer) {
final SystemClock clock = SystemClock.getInstance();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
index 493a221..c276ec0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
@@ -1448,26 +1448,29 @@ public class Job extends JobContextImpl implements JobContext, AutoCloseable {
*/
private static void setSharedCacheUploadPolicies(Configuration conf,
Map<String, Boolean> policies, boolean areFiles) {
- if (policies != null) {
- StringBuilder sb = new StringBuilder();
- Iterator<Map.Entry<String, Boolean>> it = policies.entrySet().iterator();
- Map.Entry<String, Boolean> e;
- if (it.hasNext()) {
- e = it.next();
- sb.append(e.getKey() + DELIM + e.getValue());
- } else {
- // policies is an empty map, just skip setting the parameter
- return;
- }
- while (it.hasNext()) {
- e = it.next();
- sb.append("," + e.getKey() + DELIM + e.getValue());
- }
- String confParam =
- areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
- : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
- conf.set(confParam, sb.toString());
+ String confParam = areFiles ?
+ MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES :
+ MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
+ // If no policy is provided, we will reset the config by setting an empty
+ // string value. In other words, cleaning up existing policies. This is
+ // useful when we try to clean up shared cache upload policies for
+ // non-application master tasks. See MAPREDUCE-7294 for details.
+ if (policies == null || policies.size() == 0) {
+ conf.set(confParam, "");
+ return;
+ }
+ StringBuilder sb = new StringBuilder();
+ Iterator<Map.Entry<String, Boolean>> it = policies.entrySet().iterator();
+ Map.Entry<String, Boolean> e;
+ if (it.hasNext()) {
+ e = it.next();
+ sb.append(e.getKey() + DELIM + e.getValue());
+ }
+ while (it.hasNext()) {
+ e = it.next();
+ sb.append("," + e.getKey() + DELIM + e.getValue());
}
+ conf.set(confParam, sb.toString());
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org