You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2014/10/06 19:39:29 UTC
[14/33] git commit: MAPREDUCE-6103.Adding reservation APIs to MR
resource manager delegate. Contributed by Subru Krishnan and Carlo Curino.
(cherry picked from commit aa92dd45f2d8c89a8a17ad2e4449aa3ff08bc53a) (cherry
picked from commit 3f282762d1afc916de
MAPREDUCE-6103.Adding reservation APIs to MR resource manager delegate. Contributed by Subru Krishnan and Carlo Curino.
(cherry picked from commit aa92dd45f2d8c89a8a17ad2e4449aa3ff08bc53a)
(cherry picked from commit 3f282762d1afc916de9207d3adeda852ca344853)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/30a370e7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/30a370e7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/30a370e7
Branch: refs/heads/branch-2
Commit: 30a370e70504a4cc3222da0dc706c871fcebfa78
Parents: cbfbdf6
Author: subru <su...@outlook.com>
Authored: Wed Sep 24 18:01:38 2014 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Mon Oct 6 10:21:06 2014 -0700
----------------------------------------------------------------------
YARN-1051-CHANGES.txt | 3 +++
.../java/org/apache/hadoop/mapreduce/Job.java | 21 +++++++++++++++++
.../apache/hadoop/mapreduce/JobSubmitter.java | 8 +++++++
.../apache/hadoop/mapreduce/MRJobConfig.java | 2 ++
.../hadoop/mapred/ResourceMgrDelegate.java | 24 ++++++++++++++++++++
.../org/apache/hadoop/mapred/YARNRunner.java | 22 ++++++++++++++++++
.../hadoop/mapred/TestClientRedirect.java | 24 ++++++++++++++++++++
7 files changed, 104 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30a370e7/YARN-1051-CHANGES.txt
----------------------------------------------------------------------
diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt
index c4106b2..6a27197 100644
--- a/YARN-1051-CHANGES.txt
+++ b/YARN-1051-CHANGES.txt
@@ -23,3 +23,6 @@ subsystem with the scheduler. (Subru Krishnan and Carlo Curino via subru)
YARN-2080. Integrating reservation system with ResourceManager and
client-RM protocol. (Subru Krishnan and Carlo Curino via subru)
+
+MAPREDUCE-6103. Adding reservation APIs to MR resource manager
+delegate. (Subru Krishnan and Carlo Curino via subru)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30a370e7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
index 3f8d139..cfc3437 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ReservationId;
/**
* The job submitter's view of the Job.
@@ -112,6 +113,7 @@ public class Job extends JobContextImpl implements JobContext {
private JobStatus status;
private long statustime;
private Cluster cluster;
+ private ReservationId reservationId;
/**
* @deprecated Use {@link #getInstance()}
@@ -1523,5 +1525,24 @@ public class Job extends JobContextImpl implements JobContext {
updateStatus();
return status.isUber();
}
+
+ /**
+ * Get the reservation to which the job is submitted to, if any
+ *
+ * @return the reservationId the identifier of the job's reservation, null if
+ * the job does not have any reservation associated with it
+ */
+ public ReservationId getReservationId() {
+ return reservationId;
+ }
+
+ /**
+ * Set the reservation to which the job is submitted to
+ *
+ * @param reservationId the reservationId to set
+ */
+ public void setReservationId(ReservationId reservationId) {
+ this.reservationId = reservationId;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30a370e7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
index 6cd569a..d80521c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.QueueACL;
+
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
@@ -60,6 +61,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
@@ -427,6 +429,12 @@ class JobSubmitter {
trackingIds.toArray(new String[trackingIds.size()]));
}
+ // Set reservation info if it exists
+ ReservationId reservationId = job.getReservationId();
+ if (reservationId != null) {
+ conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
+ }
+
// Write job file to submit dir
writeConf(conf, submitJobFile);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30a370e7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 5c82470..33c0461 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -63,6 +63,8 @@ public interface MRJobConfig {
public static final String QUEUE_NAME = "mapreduce.job.queuename";
+ public static final String RESERVATION_ID = "mapreduce.job.reservation.id";
+
public static final String JOB_TAGS = "mapreduce.job.tags";
public static final String JVM_NUMTASKS_TORUN = "mapreduce.job.jvm.numtasks";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30a370e7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
index b76d0f3..803390f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
@@ -43,6 +43,12 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -406,4 +412,22 @@ public class ResourceMgrDelegate extends YarnClient {
throws YarnException, IOException {
client.moveApplicationAcrossQueues(appId, queue);
}
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException, IOException {
+ return client.submitReservation(request);
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ return client.updateReservation(request);
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ return client.deleteReservation(request);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30a370e7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 5120c85..9419d03 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -489,6 +490,26 @@ public class YARNRunner implements ClientProtocol {
appContext.setQueue( // Queue name
jobConf.get(JobContext.QUEUE_NAME,
YarnConfiguration.DEFAULT_QUEUE_NAME));
+ // add reservationID if present
+ ReservationId reservationID = null;
+ try {
+ reservationID =
+ ReservationId.parseReservationId(jobConf
+ .get(JobContext.RESERVATION_ID));
+ } catch (NumberFormatException e) {
+ // throw exception as reservationid as is invalid
+ String errMsg =
+ "Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID)
+ + " specified for the app: " + applicationId;
+ LOG.warn(errMsg);
+ throw new IOException(errMsg);
+ }
+ if (reservationID != null) {
+ appContext.setReservationID(reservationID);
+ LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId
+ + " to queue:" + appContext.getQueue() + " with reservationId:"
+ + appContext.getReservationID());
+ }
appContext.setApplicationName( // Job name
jobConf.get(JobContext.JOB_NAME,
YarnConfiguration.DEFAULT_APPLICATION_NAME));
@@ -503,6 +524,7 @@ public class YARNRunner implements ClientProtocol {
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
}
+
return appContext;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30a370e7/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
index 69ede3a..5663a81 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
@@ -102,6 +102,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesReq
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -397,6 +403,24 @@ public class TestClientRedirect {
throws YarnException, IOException {
return null;
}
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ return null;
+ }
}
class HistoryService extends AMService implements HSClientProtocol {