You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2014/10/06 19:39:42 UTC

[27/33] git commit: MAPREDUCE-6103.Adding reservation APIs to MR resource manager delegate. Contributed by Subru Krishnan and Carlo Curino. (cherry picked from commit aa92dd45f2d8c89a8a17ad2e4449aa3ff08bc53a) (cherry picked from commit 3f282762d1afc916de

MAPREDUCE-6103.Adding reservation APIs to MR resource manager delegate. Contributed by Subru Krishnan and Carlo Curino.
(cherry picked from commit aa92dd45f2d8c89a8a17ad2e4449aa3ff08bc53a)
(cherry picked from commit 3f282762d1afc916de9207d3adeda852ca344853)
(cherry picked from commit 30a370e70504a4cc3222da0dc706c871fcebfa78)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/256a9510
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/256a9510
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/256a9510

Branch: refs/heads/branch-2.6
Commit: 256a9510492fa16fd9cbbee7e3892e39181ec3a3
Parents: d244b2a
Author: subru <su...@outlook.com>
Authored: Wed Sep 24 18:01:38 2014 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Mon Oct 6 10:29:13 2014 -0700

----------------------------------------------------------------------
 YARN-1051-CHANGES.txt                           |  3 +++
 .../java/org/apache/hadoop/mapreduce/Job.java   | 21 +++++++++++++++++
 .../apache/hadoop/mapreduce/JobSubmitter.java   |  8 +++++++
 .../apache/hadoop/mapreduce/MRJobConfig.java    |  2 ++
 .../hadoop/mapred/ResourceMgrDelegate.java      | 24 ++++++++++++++++++++
 .../org/apache/hadoop/mapred/YARNRunner.java    | 22 ++++++++++++++++++
 .../hadoop/mapred/TestClientRedirect.java       | 24 ++++++++++++++++++++
 7 files changed, 104 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/256a9510/YARN-1051-CHANGES.txt
----------------------------------------------------------------------
diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt
index c4106b2..6a27197 100644
--- a/YARN-1051-CHANGES.txt
+++ b/YARN-1051-CHANGES.txt
@@ -23,3 +23,6 @@ subsystem with the scheduler. (Subru Krishnan and Carlo Curino  via subru)
 
 YARN-2080. Integrating reservation system with ResourceManager and 
 client-RM protocol. (Subru Krishnan and Carlo Curino  via subru)
+
+MAPREDUCE-6103. Adding reservation APIs to MR resource manager
+delegate. (Subru Krishnan and Carlo Curino  via subru)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256a9510/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
index 3f8d139..cfc3437 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 
 /**
  * The job submitter's view of the Job.
@@ -112,6 +113,7 @@ public class Job extends JobContextImpl implements JobContext {
   private JobStatus status;
   private long statustime;
   private Cluster cluster;
+  private ReservationId reservationId;
 
   /**
    * @deprecated Use {@link #getInstance()}
@@ -1523,5 +1525,24 @@ public class Job extends JobContextImpl implements JobContext {
     updateStatus();
     return status.isUber();
   }
+
+  /**
+   * Get the reservation to which the job is submitted to, if any
+   *
+   * @return the reservationId the identifier of the job's reservation, null if
+   *         the job does not have any reservation associated with it
+   */
+  public ReservationId getReservationId() {
+    return reservationId;
+  }
+
+  /**
+   * Set the reservation to which the job is submitted to
+   *
+   * @param reservationId the reservationId to set
+   */
+  public void setReservationId(ReservationId reservationId) {
+    this.reservationId = reservationId;
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256a9510/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
index 6cd569a..d80521c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.QueueACL;
+
 import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
@@ -60,6 +61,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -427,6 +429,12 @@ class JobSubmitter {
             trackingIds.toArray(new String[trackingIds.size()]));
       }
 
+      // Set reservation info if it exists
+      ReservationId reservationId = job.getReservationId();
+      if (reservationId != null) {
+        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
+      }
+
       // Write job file to submit dir
       writeConf(conf, submitJobFile);
       

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256a9510/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 5c82470..33c0461 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -63,6 +63,8 @@ public interface MRJobConfig {
 
   public static final String QUEUE_NAME = "mapreduce.job.queuename";
 
+  public static final String RESERVATION_ID = "mapreduce.job.reservation.id";
+
   public static final String JOB_TAGS = "mapreduce.job.tags";
 
   public static final String JVM_NUMTASKS_TORUN = "mapreduce.job.jvm.numtasks";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256a9510/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
index b76d0f3..803390f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
@@ -43,6 +43,12 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -406,4 +412,22 @@ public class ResourceMgrDelegate extends YarnClient {
       throws YarnException, IOException {
     client.moveApplicationAcrossQueues(appId, queue);
   }
+
+  @Override
+  public ReservationSubmissionResponse submitReservation(
+      ReservationSubmissionRequest request) throws YarnException, IOException {
+    return client.submitReservation(request);
+  }
+
+  @Override
+  public ReservationUpdateResponse updateReservation(
+      ReservationUpdateRequest request) throws YarnException, IOException {
+    return client.updateReservation(request);
+  }
+
+  @Override
+  public ReservationDeleteResponse deleteReservation(
+      ReservationDeleteRequest request) throws YarnException, IOException {
+    return client.deleteReservation(request);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256a9510/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 5120c85..9419d03 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -489,6 +490,26 @@ public class YARNRunner implements ClientProtocol {
     appContext.setQueue(                                       // Queue name
         jobConf.get(JobContext.QUEUE_NAME,
         YarnConfiguration.DEFAULT_QUEUE_NAME));
+    // add reservationID if present
+    ReservationId reservationID = null;
+    try {
+      reservationID =
+          ReservationId.parseReservationId(jobConf
+              .get(JobContext.RESERVATION_ID));
+    } catch (NumberFormatException e) {
+      // throw exception as reservationid as is invalid
+      String errMsg =
+          "Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID)
+              + " specified for the app: " + applicationId;
+      LOG.warn(errMsg);
+      throw new IOException(errMsg);
+    }
+    if (reservationID != null) {
+      appContext.setReservationID(reservationID);
+      LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId
+          + " to queue:" + appContext.getQueue() + " with reservationId:"
+          + appContext.getReservationID());
+    }
     appContext.setApplicationName(                             // Job name
         jobConf.get(JobContext.JOB_NAME,
         YarnConfiguration.DEFAULT_APPLICATION_NAME));
@@ -503,6 +524,7 @@ public class YARNRunner implements ClientProtocol {
     if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
       appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
     }
+
     return appContext;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/256a9510/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
index 69ede3a..5663a81 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
@@ -102,6 +102,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesReq
 import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -397,6 +403,24 @@ public class TestClientRedirect {
         throws YarnException, IOException {
       return null;
     }
+
+    @Override
+    public ReservationSubmissionResponse submitReservation(
+        ReservationSubmissionRequest request) throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public ReservationUpdateResponse updateReservation(
+        ReservationUpdateRequest request) throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public ReservationDeleteResponse deleteReservation(
+        ReservationDeleteRequest request) throws YarnException, IOException {
+      return null;
+    }
   }
 
   class HistoryService extends AMService implements HSClientProtocol {