You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ma...@apache.org on 2011/11/09 02:43:08 UTC
svn commit: r1199594 - in
/incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred:
FrameworkScheduler.java MesosScheduler.java
Author: matei
Date: Wed Nov 9 01:43:08 2011
New Revision: 1199594
URL: http://svn.apache.org/viewvc?rev=1199594&view=rev
Log:
MESOS-13. Port Hadoop framework to new API. Contributed by Charles Reiss.
Modified:
incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java
incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
Modified: incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java?rev=1199594&r1=1199593&r2=1199594&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java (original)
+++ incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java Wed Nov 9 01:43:08 2011
@@ -25,11 +25,12 @@ import org.apache.mesos.Protos.Framework
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.Resource;
import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.SlaveOffer;
+import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.TaskID;
import org.apache.mesos.Protos.TaskDescription;
import org.apache.mesos.Protos.TaskState;
import org.apache.mesos.Protos.TaskStatus;
+import org.apache.mesos.Protos.Status;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
@@ -176,7 +177,7 @@ public class FrameworkScheduler implemen
throw new IndexOutOfBoundsException(name);
}
- private static double getResource(SlaveOffer offer, String name) {
+ private static double getResource(Offer offer, String name) {
return getResource(offer.getResourcesList(), name);
}
@@ -185,12 +186,9 @@ public class FrameworkScheduler implemen
}
@Override
- public void resourceOffer(SchedulerDriver d, OfferID oid,
- List<SlaveOffer> offers) {
+ public void resourceOffers(SchedulerDriver d, List<Offer> offers) {
try {
synchronized(jobTracker) {
- LOG.info("Got resource offer " + oid);
- List<TaskDescription> tasks = new ArrayList<TaskDescription>();
int numOffers = (int) offers.size();
double[] cpus = new double[numOffers];
@@ -198,7 +196,8 @@ public class FrameworkScheduler implemen
// Count up the amount of free CPUs and memory on each node
for (int i = 0; i < numOffers; i++) {
- SlaveOffer offer = offers.get(i);
+ Offer offer = offers.get(i);
+ LOG.info("Got resource offer " + offer.getId());
cpus[i] = getResource(offer, "cpus");
mem[i] = getResource(offer, "mem");
}
@@ -212,28 +211,35 @@ public class FrameworkScheduler implemen
// because it minimizing the amount of scanning we need to do if we
// get a large set of offered nodes.
List<Integer> indices = new LinkedList<Integer>();
+ List<List<TaskDescription>> replies =
+ new ArrayList<List<TaskDescription>>(numOffers);
for (int i = 0; i < numOffers; i++) {
indices.add(i);
+ replies.add(new ArrayList<TaskDescription>());
}
while (indices.size() > 0) {
for (Iterator<Integer> it = indices.iterator(); it.hasNext();) {
int i = it.next();
- SlaveOffer offer = offers.get(i);
+ Offer offer = offers.get(i);
TaskDescription task = findTask(
offer.getSlaveId(), offer.getHostname(), cpus[i], mem[i]);
if (task != null) {
cpus[i] -= getResource(task, "cpus");
mem[i] -= getResource(task, "mem");
- tasks.add(task);
+ replies.get(i).add(task);
} else {
it.remove();
}
}
}
-
- Map<String, String> params = new HashMap<String, String>();
- params.put("timeout", "1");
- d.replyToOffer(oid, tasks, params);
+
+ for (int i = 0; i < numOffers; i++) {
+ OfferID offerId = offers.get(i).getId();
+ Status status = d.launchTasks(offerId, replies.get(i));
+ if (status != Status.OK) {
+ LOG.warn("SchedulerDriver returned irregular status: " + status);
+ }
+ }
}
} catch(Exception e) {
LOG.error("Error in resourceOffer", e);
@@ -311,8 +317,7 @@ public class FrameworkScheduler implemen
).build();
}
- @Override
- public String getFrameworkName(SchedulerDriver driver) {
+ public String getFrameworkName() {
return "Hadoop: " + jobTracker.getTrackerIdentifier() +
" (RPC port: " + jobTracker.port + "," +
" web UI port: " + jobTracker.infoPort + ")";
@@ -321,8 +326,7 @@ public class FrameworkScheduler implemen
private static final ExecutorID EXECUTOR_ID =
ExecutorID.newBuilder().setValue("default").build();
- @Override
- public ExecutorInfo getExecutorInfo(SchedulerDriver driver) {
+ public ExecutorInfo getExecutorInfo() {
try {
String execPath = new File("bin/mesos-executor").getCanonicalPath();
byte[] initArg = conf.get("mapred.job.tracker").getBytes("US-ASCII");
Modified: incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java?rev=1199594&r1=1199593&r2=1199594&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java (original)
+++ incubator/mesos/trunk/frameworks/hadoop-0.20.2/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java Wed Nov 9 01:43:08 2011
@@ -46,7 +46,10 @@ public class MesosScheduler extends Task
taskTrackerManager.addJobInProgressListener(eagerInitListener);
frameworkScheduler = new FrameworkScheduler(this);
- driver = new MesosSchedulerDriver(frameworkScheduler, master);
+ driver = new MesosSchedulerDriver(frameworkScheduler,
+ frameworkScheduler.getFrameworkName(),
+ frameworkScheduler.getExecutorInfo(),
+ master);
driver.start();
} catch (Exception e) {