You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2014/10/04 02:10:10 UTC

[12/16] git commit: MAPREDUCE-6103.Adding reservation APIs to MR resource manager delegate. Contributed by Subru Krishnan and Carlo Curino. (cherry picked from commit aa92dd45f2d8c89a8a17ad2e4449aa3ff08bc53a)

MAPREDUCE-6103.Adding reservation APIs to MR resource manager delegate. Contributed by Subru Krishnan and Carlo Curino.
(cherry picked from commit aa92dd45f2d8c89a8a17ad2e4449aa3ff08bc53a)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3f282762
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3f282762
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3f282762

Branch: refs/heads/trunk
Commit: 3f282762d1afc916de9207d3adeda852ca344853
Parents: 6261f7c
Author: subru <su...@outlook.com>
Authored: Wed Sep 24 18:01:38 2014 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Oct 3 15:42:59 2014 -0700

----------------------------------------------------------------------
 YARN-1051-CHANGES.txt                           |  3 +++
 .../java/org/apache/hadoop/mapreduce/Job.java   | 21 +++++++++++++++++
 .../apache/hadoop/mapreduce/JobSubmitter.java   |  8 +++++++
 .../apache/hadoop/mapreduce/MRJobConfig.java    |  2 ++
 .../hadoop/mapred/ResourceMgrDelegate.java      | 24 ++++++++++++++++++++
 .../org/apache/hadoop/mapred/YARNRunner.java    | 22 ++++++++++++++++++
 .../hadoop/mapred/TestClientRedirect.java       | 24 ++++++++++++++++++++
 7 files changed, 104 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f282762/YARN-1051-CHANGES.txt
----------------------------------------------------------------------
diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt
index c4106b2..6a27197 100644
--- a/YARN-1051-CHANGES.txt
+++ b/YARN-1051-CHANGES.txt
@@ -23,3 +23,6 @@ subsystem with the scheduler. (Subru Krishnan and Carlo Curino  via subru)
 
 YARN-2080. Integrating reservation system with ResourceManager and 
 client-RM protocol. (Subru Krishnan and Carlo Curino  via subru)
+
+MAPREDUCE-6103. Adding reservation APIs to MR resource manager
+delegate. (Subru Krishnan and Carlo Curino  via subru)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f282762/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
index 3f8d139..cfc3437 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 
 /**
  * The job submitter's view of the Job.
@@ -112,6 +113,7 @@ public class Job extends JobContextImpl implements JobContext {
   private JobStatus status;
   private long statustime;
   private Cluster cluster;
+  private ReservationId reservationId;
 
   /**
    * @deprecated Use {@link #getInstance()}
@@ -1523,5 +1525,24 @@ public class Job extends JobContextImpl implements JobContext {
     updateStatus();
     return status.isUber();
   }
+
+  /**
+   * Get the reservation to which the job is submitted to, if any
+   *
+   * @return the reservationId the identifier of the job's reservation, null if
+   *         the job does not have any reservation associated with it
+   */
+  public ReservationId getReservationId() {
+    return reservationId;
+  }
+
+  /**
+   * Set the reservation to which the job is submitted to
+   *
+   * @param reservationId the reservationId to set
+   */
+  public void setReservationId(ReservationId reservationId) {
+    this.reservationId = reservationId;
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f282762/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
index 6cd569a..d80521c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.QueueACL;
+
 import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
@@ -60,6 +61,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -427,6 +429,12 @@ class JobSubmitter {
             trackingIds.toArray(new String[trackingIds.size()]));
       }
 
+      // Set reservation info if it exists
+      ReservationId reservationId = job.getReservationId();
+      if (reservationId != null) {
+        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
+      }
+
       // Write job file to submit dir
       writeConf(conf, submitJobFile);
       

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f282762/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index 562120f..5b623b5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -63,6 +63,8 @@ public interface MRJobConfig {
 
   public static final String QUEUE_NAME = "mapreduce.job.queuename";
 
+  public static final String RESERVATION_ID = "mapreduce.job.reservation.id";
+
   public static final String JOB_TAGS = "mapreduce.job.tags";
 
   public static final String JVM_NUMTASKS_TORUN = "mapreduce.job.jvm.numtasks";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f282762/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
index b76d0f3..803390f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
@@ -43,6 +43,12 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -406,4 +412,22 @@ public class ResourceMgrDelegate extends YarnClient {
       throws YarnException, IOException {
     client.moveApplicationAcrossQueues(appId, queue);
   }
+
+  @Override
+  public ReservationSubmissionResponse submitReservation(
+      ReservationSubmissionRequest request) throws YarnException, IOException {
+    return client.submitReservation(request);
+  }
+
+  @Override
+  public ReservationUpdateResponse updateReservation(
+      ReservationUpdateRequest request) throws YarnException, IOException {
+    return client.updateReservation(request);
+  }
+
+  @Override
+  public ReservationDeleteResponse deleteReservation(
+      ReservationDeleteRequest request) throws YarnException, IOException {
+    return client.deleteReservation(request);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f282762/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 5120c85..9419d03 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -489,6 +490,26 @@ public class YARNRunner implements ClientProtocol {
     appContext.setQueue(                                       // Queue name
         jobConf.get(JobContext.QUEUE_NAME,
         YarnConfiguration.DEFAULT_QUEUE_NAME));
+    // add reservationID if present
+    ReservationId reservationID = null;
+    try {
+      reservationID =
+          ReservationId.parseReservationId(jobConf
+              .get(JobContext.RESERVATION_ID));
+    } catch (NumberFormatException e) {
+      // throw exception as reservationid as is invalid
+      String errMsg =
+          "Invalid reservationId: " + jobConf.get(JobContext.RESERVATION_ID)
+              + " specified for the app: " + applicationId;
+      LOG.warn(errMsg);
+      throw new IOException(errMsg);
+    }
+    if (reservationID != null) {
+      appContext.setReservationID(reservationID);
+      LOG.info("SUBMITTING ApplicationSubmissionContext app:" + applicationId
+          + " to queue:" + appContext.getQueue() + " with reservationId:"
+          + appContext.getReservationID());
+    }
     appContext.setApplicationName(                             // Job name
         jobConf.get(JobContext.JOB_NAME,
         YarnConfiguration.DEFAULT_APPLICATION_NAME));
@@ -503,6 +524,7 @@ public class YARNRunner implements ClientProtocol {
     if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
       appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
     }
+
     return appContext;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3f282762/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
index 69ede3a..5663a81 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
@@ -102,6 +102,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesReq
 import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -397,6 +403,24 @@ public class TestClientRedirect {
         throws YarnException, IOException {
       return null;
     }
+
+    @Override
+    public ReservationSubmissionResponse submitReservation(
+        ReservationSubmissionRequest request) throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public ReservationUpdateResponse updateReservation(
+        ReservationUpdateRequest request) throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public ReservationDeleteResponse deleteReservation(
+        ReservationDeleteRequest request) throws YarnException, IOException {
+      return null;
+    }
   }
 
   class HistoryService extends AMService implements HSClientProtocol {