You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by li...@apache.org on 2020/09/20 07:07:17 UTC
[hadoop] branch branch-3.1 updated: MAPREDUCE-7294. Only
application master should upload resource to Yarn Shared Cache (#2223)
This is an automated email from the ASF dual-hosted git repository.
liuml07 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 06ff4d1 MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache (#2223)
06ff4d1 is described below
commit 06ff4d141670f8bd306da2bdfd3df773e8f5866f
Author: zz <zz...@gmail.com>
AuthorDate: Sat Sep 19 23:10:05 2020 -0700
MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache (#2223)
Contributed by Zhenzhao Wang <zh...@gmail.com>
Signed-off-by: Mingliang Liu <li...@apache.org>
---
.../hadoop/mapreduce/v2/app/job/impl/JobImpl.java | 3 +-
.../mapreduce/v2/app/job/impl/TestJobImpl.java | 23 +++++++++++++++
.../main/java/org/apache/hadoop/mapreduce/Job.java | 33 +++++++++-------------
3 files changed, 39 insertions(+), 20 deletions(-)
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index d2e2492..59320b2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -1423,7 +1423,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
* be set up to false. In that way, the NMs that host the task containers
* won't try to upload the resources to shared cache.
*/
- private static void cleanupSharedCacheUploadPolicies(Configuration conf) {
+ @VisibleForTesting
+ static void cleanupSharedCacheUploadPolicies(Configuration conf) {
Job.setArchiveSharedCacheUploadPolicies(conf, Collections.emptyMap());
Job.setFileSharedCacheUploadPolicies(conf, Collections.emptyMap());
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
index 1367ff6..013f74a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
@@ -39,6 +39,7 @@ import java.util.concurrent.CyclicBarrier;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
@@ -991,6 +992,28 @@ public class TestJobImpl {
Assert.assertEquals(updatedPriority, jobPriority);
}
+ @Test
+ public void testCleanupSharedCacheUploadPolicies() {
+ Configuration config = new Configuration();
+ Map<String, Boolean> archivePolicies = new HashMap<>();
+ archivePolicies.put("archive1", true);
+ archivePolicies.put("archive2", true);
+ Job.setArchiveSharedCacheUploadPolicies(config, archivePolicies);
+ Map<String, Boolean> filePolicies = new HashMap<>();
+ filePolicies.put("file1", true);
+ filePolicies.put("jar1", true);
+ Job.setFileSharedCacheUploadPolicies(config, filePolicies);
+ Assert.assertEquals(
+ 2, Job.getArchiveSharedCacheUploadPolicies(config).size());
+ Assert.assertEquals(
+ 2, Job.getFileSharedCacheUploadPolicies(config).size());
+ JobImpl.cleanupSharedCacheUploadPolicies(config);
+ Assert.assertEquals(
+ 0, Job.getArchiveSharedCacheUploadPolicies(config).size());
+ Assert.assertEquals(
+ 0, Job.getFileSharedCacheUploadPolicies(config).size());
+ }
+
private static CommitterEventHandler createCommitterEventHandler(
Dispatcher dispatcher, OutputCommitter committer) {
final SystemClock clock = SystemClock.getInstance();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
index f164b62..d7fa75d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
@@ -1448,26 +1448,21 @@ public class Job extends JobContextImpl implements JobContext, AutoCloseable {
*/
private static void setSharedCacheUploadPolicies(Configuration conf,
Map<String, Boolean> policies, boolean areFiles) {
- if (policies != null) {
- StringBuilder sb = new StringBuilder();
- Iterator<Map.Entry<String, Boolean>> it = policies.entrySet().iterator();
- Map.Entry<String, Boolean> e;
- if (it.hasNext()) {
- e = it.next();
- sb.append(e.getKey() + DELIM + e.getValue());
- } else {
- // policies is an empty map, just skip setting the parameter
- return;
- }
- while (it.hasNext()) {
- e = it.next();
- sb.append("," + e.getKey() + DELIM + e.getValue());
- }
- String confParam =
- areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
- : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
- conf.set(confParam, sb.toString());
+ String confParam = areFiles ?
+ MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES :
+ MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
+ // If no policy is provided, we will reset the config by setting an empty
+ // string value. In other words, cleaning up existing policies. This is
+ // useful when we try to clean up shared cache upload policies for
+ // non-application master tasks. See MAPREDUCE-7294 for details.
+ if (policies == null || policies.size() == 0) {
+ conf.set(confParam, "");
+ return;
}
+ StringBuilder sb = new StringBuilder();
+ policies.forEach((k,v) -> sb.append(k).append(DELIM).append(v).append(","));
+ sb.deleteCharAt(sb.length() - 1);
+ conf.set(confParam, sb.toString());
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org