You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ma...@apache.org on 2011/11/09 02:43:08 UTC

svn commit: r1199594 - in /incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred: FrameworkScheduler.java MesosScheduler.java

Author: matei
Date: Wed Nov  9 01:43:08 2011
New Revision: 1199594

URL: http://svn.apache.org/viewvc?rev=1199594&view=rev
Log:
MESOS-13. Port Hadoop framework to new API. Contributed by Charles Reiss.


Modified:
    incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java
    incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java

Modified: incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java?rev=1199594&r1=1199593&r2=1199594&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java (original)
+++ incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java Wed Nov  9 01:43:08 2011
@@ -25,11 +25,12 @@ import org.apache.mesos.Protos.Framework
 import org.apache.mesos.Protos.OfferID;
 import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.SlaveOffer;
+import org.apache.mesos.Protos.Offer;
 import org.apache.mesos.Protos.TaskID;
 import org.apache.mesos.Protos.TaskDescription;
 import org.apache.mesos.Protos.TaskState;
 import org.apache.mesos.Protos.TaskStatus;
+import org.apache.mesos.Protos.Status;
 import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
 
@@ -176,7 +177,7 @@ public class FrameworkScheduler implemen
     throw new IndexOutOfBoundsException(name);
   }
 
-  private static double getResource(SlaveOffer offer, String name) {
+  private static double getResource(Offer offer, String name) {
     return getResource(offer.getResourcesList(), name);
   }
 
@@ -185,12 +186,9 @@ public class FrameworkScheduler implemen
   }
   
   @Override
-  public void resourceOffer(SchedulerDriver d, OfferID oid,
-      List<SlaveOffer> offers) {
+  public void resourceOffers(SchedulerDriver d, List<Offer> offers) {
     try {
       synchronized(jobTracker) {
-        LOG.info("Got resource offer " + oid);
-        List<TaskDescription> tasks = new ArrayList<TaskDescription>();
         
         int numOffers = (int) offers.size();
         double[] cpus = new double[numOffers];
@@ -198,7 +196,8 @@ public class FrameworkScheduler implemen
 
         // Count up the amount of free CPUs and memory on each node 
         for (int i = 0; i < numOffers; i++) {
-          SlaveOffer offer = offers.get(i);
+          Offer offer = offers.get(i);
+          LOG.info("Got resource offer " + offer.getId());
           cpus[i] = getResource(offer, "cpus");
           mem[i] = getResource(offer, "mem");
         }
@@ -212,28 +211,35 @@ public class FrameworkScheduler implemen
         // because it minimizing the amount of scanning we need to do if we
         // get a large set of offered nodes.
         List<Integer> indices = new LinkedList<Integer>();
+        List<List<TaskDescription>> replies =
+            new ArrayList<List<TaskDescription>>(numOffers);
         for (int i = 0; i < numOffers; i++) {
           indices.add(i);
+          replies.add(new ArrayList<TaskDescription>());
         }
         while (indices.size() > 0) {
           for (Iterator<Integer> it = indices.iterator(); it.hasNext();) {
             int i = it.next();
-            SlaveOffer offer = offers.get(i);
+            Offer offer = offers.get(i);
             TaskDescription task = findTask(
                 offer.getSlaveId(), offer.getHostname(), cpus[i], mem[i]);
             if (task != null) {
               cpus[i] -= getResource(task, "cpus");
               mem[i] -= getResource(task, "mem");
-              tasks.add(task);
+              replies.get(i).add(task);
             } else {
               it.remove();
             }
           }
         }
-        
-        Map<String, String> params = new HashMap<String, String>();
-        params.put("timeout", "1");
-        d.replyToOffer(oid, tasks, params);
+
+        for (int i = 0; i < numOffers; i++) {
+          OfferID offerId = offers.get(i).getId();
+          Status status = d.launchTasks(offerId, replies.get(i));
+          if (status != Status.OK) {
+            LOG.warn("SchedulerDriver returned irregular status: " + status);
+          }
+        }
       }
     } catch(Exception e) {
       LOG.error("Error in resourceOffer", e);
@@ -311,8 +317,7 @@ public class FrameworkScheduler implemen
     ).build();
   }
 
-  @Override
-  public String getFrameworkName(SchedulerDriver driver) {
+  public String getFrameworkName() {
     return "Hadoop: " + jobTracker.getTrackerIdentifier() +
            " (RPC port: " + jobTracker.port + "," +
            " web UI port: " + jobTracker.infoPort + ")";
@@ -321,8 +326,7 @@ public class FrameworkScheduler implemen
   private static final ExecutorID EXECUTOR_ID =
     ExecutorID.newBuilder().setValue("default").build();
 
-  @Override
-  public ExecutorInfo getExecutorInfo(SchedulerDriver driver) {
+  public ExecutorInfo getExecutorInfo() {
     try {
       String execPath = new File("bin/mesos-executor").getCanonicalPath();
       byte[] initArg = conf.get("mapred.job.tracker").getBytes("US-ASCII");

Modified: incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java?rev=1199594&r1=1199593&r2=1199594&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java (original)
+++ incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java Wed Nov  9 01:43:08 2011
@@ -46,7 +46,10 @@ public class MesosScheduler extends Task
       taskTrackerManager.addJobInProgressListener(eagerInitListener);
       
       frameworkScheduler = new FrameworkScheduler(this); 
-      driver = new MesosSchedulerDriver(frameworkScheduler, master);
+      driver = new MesosSchedulerDriver(frameworkScheduler,
+          frameworkScheduler.getFrameworkName(),
+          frameworkScheduler.getExecutorInfo(),
+          master);
       
       driver.start();
     } catch (Exception e) {