You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/05/01 01:28:44 UTC

svn commit: r1332469 [2/3] - in /incubator/mesos/trunk: ./ ec2/ hadoop/ hadoop/mesos/ hadoop/mesos/ivy/ hadoop/mesos/src/ hadoop/mesos/src/java/ hadoop/mesos/src/java/org/ hadoop/mesos/src/java/org/apache/ hadoop/mesos/src/java/org/apache/hadoop/ hadoo...

Modified: incubator/mesos/trunk/hadoop/hadoop-0.20.205.0.patch
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/hadoop-0.20.205.0.patch?rev=1332469&r1=1332468&r2=1332469&view=diff
==============================================================================
--- incubator/mesos/trunk/hadoop/hadoop-0.20.205.0.patch (original)
+++ incubator/mesos/trunk/hadoop/hadoop-0.20.205.0.patch Mon Apr 30 23:28:43 2012
@@ -1,1458 +1,7 @@
-diff --git a/hadoop/hadoop-0.20.205.0/bin/mesos-executor b/hadoop/hadoop-0.20.205.0/bin/mesos-executor
-new file mode 100755
-index 0000000..ce8dccc
---- /dev/null
-+++ b/hadoop/hadoop-0.20.205.0/bin/mesos-executor
-@@ -0,0 +1,3 @@
-+#!/bin/sh
-+
-+exec `dirname ${0}`/hadoop org.apache.hadoop.mapred.FrameworkExecutor
-diff --git a/hadoop/hadoop-0.20.205.0/ivy/libraries.properties b/hadoop/hadoop-0.20.205.0/ivy/libraries.properties
-index b47b4c3..713f0c1 100644
---- a/hadoop/hadoop-0.20.205.0/ivy/libraries.properties
-+++ b/hadoop/hadoop-0.20.205.0/ivy/libraries.properties
-@@ -86,3 +86,5 @@ slf4j-log4j12.version=1.4.3
- wagon-http.version=1.0-beta-2
- xmlenc.version=0.52
- xerces.version=1.4.4
-+
-+protobuf-java.version=2.4.1
-diff --git a/hadoop/hadoop-0.20.205.0/src/contrib/build-contrib.xml b/hadoop/hadoop-0.20.205.0/src/contrib/build-contrib.xml
-index c4b6aae..2f6e092 100644
---- a/hadoop/hadoop-0.20.205.0/src/contrib/build-contrib.xml
-+++ b/hadoop/hadoop-0.20.205.0/src/contrib/build-contrib.xml
-@@ -90,6 +90,13 @@
-   <property name="ivy.artifact.retrieve.pattern"
-     			value="${ant.project.name}/[conf]/[artifact]-[revision].[ext]"/>
- 
-+  <!-- provides a means for contribs to add extra classpath entries -->
-+  <condition property="contrib.extra-classpath" value="">
-+    <not>
-+      <isset property="contrib.extra-classpath" />
-+    </not>
-+  </condition>
-+
-   <!-- the normal classpath -->
-   <path id="contrib-classpath">
-     <pathelement location="${build.classes}"/>
-@@ -100,6 +107,7 @@
-       <include name="**/*.jar" />
-     </fileset>
-     <path refid="${ant.project.name}.common-classpath"/>
-+    <pathelement path="${contrib.extra-classpath}"/>
-     <pathelement path="${clover.jar}"/>
-   </path>
- 
-diff --git a/hadoop/hadoop-0.20.205.0/src/contrib/build.xml b/hadoop/hadoop-0.20.205.0/src/contrib/build.xml
-index 3c19e25..ecb7198 100644
---- a/hadoop/hadoop-0.20.205.0/src/contrib/build.xml
-+++ b/hadoop/hadoop-0.20.205.0/src/contrib/build.xml
-@@ -55,6 +55,7 @@
-       <fileset dir="." includes="fairscheduler/build.xml"/>
-       <fileset dir="." includes="capacity-scheduler/build.xml"/>
-       <fileset dir="." includes="gridmix/build.xml"/>
-+      <fileset dir="." includes="mesos/build.xml"/>
-     </subant>
-      <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
-      <fail if="testsfailed">Tests failed!</fail>
-diff --git a/hadoop/hadoop-0.20.205.0/src/contrib/mesos/build.xml b/hadoop/hadoop-0.20.205.0/src/contrib/mesos/build.xml
-new file mode 100644
-index 0000000..50becd5
---- /dev/null
-+++ b/hadoop/hadoop-0.20.205.0/src/contrib/mesos/build.xml
-@@ -0,0 +1,38 @@
-+<?xml version="1.0"?>
-+
-+<!--
-+   Licensed to the Apache Software Foundation (ASF) under one or more
-+   contributor license agreements.  See the NOTICE file distributed with
-+   this work for additional information regarding copyright ownership.
-+   The ASF licenses this file to You under the Apache License, Version 2.0
-+   (the "License"); you may not use this file except in compliance with
-+   the License.  You may obtain a copy of the License at
-+
-+       http://www.apache.org/licenses/LICENSE-2.0
-+
-+   Unless required by applicable law or agreed to in writing, software
-+   distributed under the License is distributed on an "AS IS" BASIS,
-+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+   See the License for the specific language governing permissions and
-+   limitations under the License.
-+-->
-+
-+<!--
-+Before you can run these subtargets directly, you need
-+to call at top-level: ant deploy-contrib compile-core-test
-+-->
-+<project name="mesos" default="jar">
-+
-+  <!-- locate Mesos home -->
-+  <property environment="env" />
-+  <condition property="mesos.jar" value="${env.MESOS_JAR}">
-+    <isset property="env.MESOS_JAR" />
-+  </condition>
-+  <echo>Mesos jar: ${mesos.jar}</echo>
-+
-+  <!-- add it to our Classpath -->
-+  <property name="contrib.extra-classpath" value="${mesos.jar}" />
-+
-+  <import file="../build-contrib.xml"/>
-+
-+</project>
-diff --git a/hadoop/hadoop-0.20.205.0/src/contrib/mesos/ivy.xml b/hadoop/hadoop-0.20.205.0/src/contrib/mesos/ivy.xml
-new file mode 100644
-index 0000000..3ae7725
---- /dev/null
-+++ b/hadoop/hadoop-0.20.205.0/src/contrib/mesos/ivy.xml
-@@ -0,0 +1,42 @@
-+<?xml version="1.0" ?>
-+<ivy-module version="1.0">
-+  <info organisation="org.apache.hadoop" module="${ant.project.name}">
-+    <license name="Apache 2.0"/>
-+    <ivyauthor name="Apache Hadoop Team" url="http://hadoop.apache.org"/>
-+    <description>
-+        Apache Hadoop contrib
-+    </description>
-+  </info>
-+  <configurations defaultconfmapping="default">
-+    <!--these match the Maven configurations-->
-+    <conf name="default" extends="master,runtime"/>
-+    <conf name="master" description="contains the artifact but no dependencies"/>
-+    <conf name="runtime" description="runtime but not the artifact" />
-+
-+    <conf name="common" visibility="private"
-+      description="artifacts needed to compile/test the application"/>
-+  </configurations>
-+
-+  <publications>
-+    <!--get the artifact from our module name-->
-+    <artifact conf="master"/>
-+  </publications>
-+  <dependencies>
-+    <dependency org="commons-logging"
-+      name="commons-logging"
-+      rev="${commons-logging.version}"
-+      conf="common->default"/>
-+    <dependency org="log4j"
-+      name="log4j"
-+      rev="${log4j.version}"
-+      conf="common->master"/>
-+   <dependency org="junit"
-+      name="junit"
-+      rev="${junit.version}"
-+      conf="common->default"/>
-+   <dependency org="com.google.protobuf"
-+     name="protobuf-java"
-+     rev="${protobuf-java.version}"
-+     conf="common->default"/>
-+  </dependencies>
-+</ivy-module>
-diff --git a/hadoop/hadoop-0.20.205.0/src/contrib/mesos/ivy/libraries.properties b/hadoop/hadoop-0.20.205.0/src/contrib/mesos/ivy/libraries.properties
-new file mode 100644
-index 0000000..a470b37
---- /dev/null
-+++ b/hadoop/hadoop-0.20.205.0/src/contrib/mesos/ivy/libraries.properties
-@@ -0,0 +1,5 @@
-+#This properties file lists the versions of the various artifacts used by streaming.
-+#It drives ivy and the generation of a maven POM
-+
-+#Please list the dependencies name with version if they are different from the ones
-+#listed in the global libraries.properties file (in alphabetical order)
-diff --git a/hadoop/hadoop-0.20.205.0/src/contrib/mesos/src/java/org/apache/hadoop/mapred/FrameworkExecutor.java b/hadoop/hadoop-0.20.205.0/src/contrib/mesos/src/java/org/apache/hadoop/mapred/FrameworkExecutor.java
-new file mode 100644
-index 0000000..e369f39
---- /dev/null
-+++ b/hadoop/hadoop-0.20.205.0/src/contrib/mesos/src/java/org/apache/hadoop/mapred/FrameworkExecutor.java
-@@ -0,0 +1,200 @@
-+package org.apache.hadoop.mapred;
-+
-+import java.io.IOException;
-+import java.util.HashMap;
-+import java.util.HashSet;
-+import java.util.Map;
-+import java.util.Set;
-+import java.util.Map.Entry;
-+
-+import org.apache.commons.logging.Log;
-+import org.apache.commons.logging.LogFactory;
-+import org.apache.hadoop.mapred.TaskStatus.State;
-+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
-+import org.hsqldb.lib.Iterator;
-+
-+import org.apache.mesos.Executor;
-+import org.apache.mesos.ExecutorDriver;
-+import org.apache.mesos.MesosExecutorDriver;
-+import org.apache.mesos.Protos.ExecutorInfo;
-+import org.apache.mesos.Protos.FrameworkID;
-+import org.apache.mesos.Protos.FrameworkInfo;
-+import org.apache.mesos.Protos.SlaveID;
-+import org.apache.mesos.Protos.SlaveInfo;
-+import org.apache.mesos.Protos.Status;
-+import org.apache.mesos.Protos.TaskID;
-+import org.apache.mesos.Protos.TaskInfo;
-+import org.apache.mesos.Protos.TaskState;
-+
-+public class FrameworkExecutor implements Executor {
-+  public static final Log LOG =
-+    LogFactory.getLog(FrameworkExecutor.class);
-+
-+  private static FrameworkExecutor instance;
-+
-+  private ExecutorDriver driver;
-+  private SlaveID slaveId;
-+  private JobConf conf;
-+  private TaskTracker taskTracker;
-+
-+  private Set<TaskID> activeMesosTasks = new HashSet<TaskID>();
-+
-+  @Override
-+  public void registered(ExecutorDriver d,
-+                         ExecutorInfo executorInfo,
-+                         FrameworkInfo frameworkInfo,
-+                         SlaveInfo slaveInfo) {
-+    try {
-+      Thread.currentThread().setContextClassLoader(
-+        TaskTracker.class.getClassLoader());
-+
-+      this.driver = d;
-+      this.slaveId = slaveId;
-+
-+      // TODO: initialize all of JobConf from ExecutorArgs (using JT's conf)?
-+      conf = new JobConf();
-+      String jobTracker = executorInfo.getData().toStringUtf8();
-+      LOG.info("Setting JobTracker: " + jobTracker);
-+      conf.set("mapred.job.tracker", jobTracker);
-+
-+      // Attach our TaskTrackerInstrumentation to figure out when tasks end
-+      Class<?>[] instClasses = TaskTracker.getInstrumentationClasses(conf);
-+      String newInstClassList = "";
-+      for (Class<?> cls: instClasses) {
-+        newInstClassList += cls.getName() + ",";
-+      }
-+      newInstClassList += MesosTaskTrackerInstrumentation.class.getName();
-+      conf.set("mapred.tasktracker.instrumentation", newInstClassList);
-+
-+      // Get hostname from Mesos to make sure we match what it reports to the JT
-+      conf.set("slave.host.name", slaveInfo.getHostname());
-+
-+      taskTracker = new TaskTracker(conf);
-+      new Thread("TaskTracker run thread") {
-+        @Override
-+        public void run() {
-+          taskTracker.run();
-+        }
-+      }.start();
-+    } catch (Exception e) {
-+      throw new RuntimeException(
-+          "Failed to initialize FrameworkExecutor", e);
-+    }
-+  }
-+
-+  @Override
-+  public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) {}
-+
-+  @Override
-+  public void disconnected(ExecutorDriver driver) {}
-+
-+  @Override
-+  public void launchTask(ExecutorDriver d, TaskInfo task) {
-+    LOG.info("Asked to launch Mesos task " + task.getTaskId().getValue());
-+    activeMesosTasks.add(task.getTaskId());
-+  }
-+
-+  @Override
-+  public void killTask(ExecutorDriver d, TaskID taskId) {
-+    LOG.info("Asked to kill Mesos task " + taskId);
-+    // TODO: Tell the JobTracker about this using an E2S_KILL_REQUEST message!
-+  }
-+
-+  @Override
-+  public void frameworkMessage(ExecutorDriver d, byte[] msg) {
-+    try {
-+      HadoopFrameworkMessage hfm = new HadoopFrameworkMessage(msg);
-+      switch (hfm.type) {
-+        case S2E_SEND_STATUS_UPDATE: {
-+          TaskState s = TaskState.valueOf(hfm.arg1);
-+          LOG.info("Sending status update: " + hfm.arg2 + " is " + s);
-+          d.sendStatusUpdate(org.apache.mesos.Protos.TaskStatus.newBuilder()
-+                             .setTaskId(TaskID.newBuilder()
-+                                        .setValue(hfm.arg2).build())
-+                             .setState(s).build());
-+          break;
-+        }
-+        case S2E_SHUTDOWN_EXECUTOR: {
-+          taskTracker.close();
-+          System.exit(0);
-+        }
-+      }
-+    } catch (Exception e) {
-+      throw new RuntimeException(
-+          "Failed to deserialize HadoopFrameworkMessage", e);
-+    }
-+  }
-+
-+  public void statusUpdate(Task task, TaskStatus status) {
-+    // There are some tasks that get launched implicitly (e.g., the
-+    // setup/cleanup tasks) that don't go through the
-+    // MesosScheduler/FrameworkScheduler.assignTasks and thus don't
-+    // get extraData set properly! This means WE DO NOT EVER SEND
-+    // STATUS UPDATES FOR THESE TASKS. For this reason we also don't
-+    // need to specify any resources for the executor because Mesos
-+    // always assumes these tasks are running.
-+    if (task.extraData.equals("")) {
-+      LOG.info("Ignoring status update for task " + task);
-+      return;
-+    }
-+
-+    // Create a Mesos TaskID from extraData.
-+    TaskID taskId = TaskID.newBuilder()
-+      .setValue(task.extraData)
-+      .build();
-+
-+    // It appears as though we can get multiple duplicate status
-+    // updates for the same task, so check if we still have an active
-+    // task so that we only send the status update once.
-+    if (!activeMesosTasks.contains(taskId)) {
-+      LOG.info("Ignoring (duplicate) status update for task " + task);
-+      return;
-+    }
-+
-+    // Check whether the task has finished (either successfully or
-+    // not), and report to Mesos only if it has.
-+    State state = status.getRunState();
-+    TaskState mesosState = null;
-+    if (state == State.SUCCEEDED || state == State.COMMIT_PENDING)
-+      mesosState = TaskState.TASK_FINISHED;
-+    else if (state == State.FAILED || state == State.FAILED_UNCLEAN)
-+      mesosState = TaskState.TASK_FAILED;
-+    else if (state == State.KILLED || state == State.KILLED_UNCLEAN)
-+      mesosState = TaskState.TASK_KILLED;
-+
-+    if (mesosState == null) {
-+      LOG.info("Not sending status update for task " + task +
-+               " in state " + state);
-+      return;
-+    }
-+
-+    LOG.info("Attempting to send status update for " + task +
-+             " in state " + status.getRunState());
-+
-+    driver.sendStatusUpdate(
-+        org.apache.mesos.Protos.TaskStatus.newBuilder()
-+        .setTaskId(TaskID.newBuilder().setValue(task.extraData).build())
-+        .setState(mesosState)
-+        .build());
-+
-+    activeMesosTasks.remove(taskId);
-+  }
-+
-+  @Override
-+  public void error(ExecutorDriver d, String message) {
-+    LOG.error("FrameworkExecutor.error: " + message);
-+  }
-+
-+  @Override
-+  public void shutdown(ExecutorDriver d) {}
-+
-+  public static void main(String[] args) {
-+    instance = new FrameworkExecutor();
-+    MesosExecutorDriver driver = new MesosExecutorDriver(instance);
-+    System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1);
-+  }
-+
-+  static FrameworkExecutor getInstance() {
-+    return instance;
-+  }
-+}
-diff --git a/hadoop/hadoop-0.20.205.0/src/contrib/mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java b/hadoop/hadoop-0.20.205.0/src/contrib/mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java
-new file mode 100644
-index 0000000..312cfdb
---- /dev/null
-+++ b/hadoop/hadoop-0.20.205.0/src/contrib/mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java
-@@ -0,0 +1,894 @@
-+package org.apache.hadoop.mapred;
-+
-+import java.io.File;
-+import java.io.IOException;
-+import java.util.ArrayList;
-+import java.util.Collection;
-+import java.util.Collections;
-+import java.util.HashMap;
-+import java.util.Iterator;
-+import java.util.LinkedList;
-+import java.util.List;
-+import java.util.Map;
-+import java.util.Set;
-+import java.util.concurrent.atomic.AtomicInteger;
-+
-+import org.apache.commons.logging.Log;
-+import org.apache.commons.logging.LogFactory;
-+import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.mapred.TaskStatus.State;
-+import org.apache.hadoop.mapred.TaskTrackerStatus;
-+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-+import org.apache.hadoop.net.Node;
-+
-+import org.apache.mesos.Protos.CommandInfo;
-+import org.apache.mesos.Protos.ExecutorID;
-+import org.apache.mesos.Protos.ExecutorInfo;
-+import org.apache.mesos.Protos.FrameworkID;
-+import org.apache.mesos.Protos.FrameworkInfo;
-+import org.apache.mesos.Protos.MasterInfo;
-+import org.apache.mesos.Protos.Offer;
-+import org.apache.mesos.Protos.OfferID;
-+import org.apache.mesos.Protos.Resource;
-+import org.apache.mesos.Scheduler;
-+import org.apache.mesos.SchedulerDriver;
-+import org.apache.mesos.Protos.SlaveID;
-+import org.apache.mesos.Protos.Status;
-+import org.apache.mesos.Protos.TaskID;
-+import org.apache.mesos.Protos.TaskInfo;
-+import org.apache.mesos.Protos.TaskState;
-+import org.apache.mesos.Protos.Value;
-+
-+public class FrameworkScheduler implements Scheduler {
-+  public static final Log LOG =
-+    LogFactory.getLog(FrameworkScheduler.class);
-+  public static final long KILL_UNLAUNCHED_TASKS_SLEEP_TIME = 2000;
-+
-+  private static class MesosTask {
-+    final boolean isMap;
-+    final TaskID mesosId;
-+    final String host;
-+    final long creationTime;
-+
-+    TaskAttemptID hadoopId;
-+
-+    MesosTask(boolean isMap, TaskID mesosId, String host) {
-+      this.isMap = isMap;
-+      this.mesosId = mesosId;
-+      this.host = host;
-+      this.creationTime = System.currentTimeMillis();
-+    }
-+
-+    boolean isAssigned() {
-+      return hadoopId != null;
-+    }
-+
-+    void assign(Task task) {
-+      hadoopId = task.getTaskID();
-+    }
-+  }
-+
-+  private static class TaskTrackerInfo {
-+    SlaveID mesosSlaveId;
-+    List<MesosTask> maps = new LinkedList<MesosTask>();
-+    List<MesosTask> reduces = new LinkedList<MesosTask>();
-+    int maxMaps = 1;
-+    int maxReduces = 1;
-+
-+    public TaskTrackerInfo(SlaveID mesosSlaveId) {
-+      this.mesosSlaveId = mesosSlaveId;
-+    }
-+
-+    void add(MesosTask nt) {
-+      if (nt.isMap)
-+        maps.add(nt);
-+      else
-+        reduces.add(nt);
-+    }
-+
-+    public void remove(MesosTask nt) {
-+      if (nt.isMap)
-+        maps.remove(nt);
-+      else
-+        reduces.remove(nt);
-+    }
-+  }
-+
-+  private class KillTimedOutTasksThread extends Thread {
-+    @Override
-+    public void run() {
-+      while (running) {
-+        killTimedOutTasks();
-+        try { Thread.sleep(KILL_UNLAUNCHED_TASKS_SLEEP_TIME); }
-+        catch (Exception e) {}
-+      }
-+    }
-+  }
-+
-+  private MesosScheduler mesosSched;
-+  private SchedulerDriver driver;
-+  private FrameworkID frameworkId;
-+  private Configuration conf;
-+  private JobTracker jobTracker;
-+  private boolean running;
-+  private AtomicInteger nextMesosTaskId = new AtomicInteger(0);
-+
-+  private int cpusPerTask;
-+  private int memPerTask;
-+  private long localityWait;
-+
-+  private Map<String, TaskTrackerInfo> ttInfos =
-+    new HashMap<String, TaskTrackerInfo>();
-+
-+  private Map<TaskAttemptID, MesosTask> hadoopIdToMesosTask =
-+    new HashMap<TaskAttemptID, MesosTask>();
-+  private Map<TaskID, MesosTask> mesosIdToMesosTask =
-+    new HashMap<TaskID, MesosTask>();
-+
-+  // Counts of various kinds of Mesos tasks
-+  // TODO: Figure out a better way to keep track of these
-+  int unassignedMaps = 0;
-+  int unassignedReduces = 0;
-+  int assignedMaps = 0;
-+  int assignedReduces = 0;
-+
-+  // Variables used for delay scheduling
-+  boolean lastMapWasLocal = true;
-+  long timeWaitedForLocalMap = 0;
-+  long lastCanLaunchMapTime = -1;
-+
-+  public FrameworkScheduler(MesosScheduler mesosSched) {
-+    this.mesosSched = mesosSched;
-+    this.conf = mesosSched.getConf();
-+    this.jobTracker = mesosSched.jobTracker;
-+    cpusPerTask = conf.getInt("mapred.mesos.task.cpus", 1);
-+    memPerTask = conf.getInt("mapred.mesos.task.mem", 1024);
-+    localityWait = conf.getLong("mapred.mesos.localitywait", 5000);
-+  }
-+
-+  @Override
-+  public void registered(SchedulerDriver d,
-+                         FrameworkID frameworkId,
-+                         MasterInfo masterInfo) {
-+    this.driver = d;
-+    this.frameworkId = frameworkId;
-+    LOG.info("Registered with Mesos, with framework ID " + frameworkId);
-+    running = true;
-+    new KillTimedOutTasksThread().start();
-+  }
-+
-+  @Override
-+  public void reregistered(SchedulerDriver driver, MasterInfo masterInfo) {}
-+
-+  @Override
-+  public void disconnected(SchedulerDriver d) {}
-+
-+  public void cleanUp() {
-+    running = false;
-+  }
-+
-+  private static Resource makeResource(String name, double value) {
-+    return Resource.newBuilder().setName(name).setScalar(
-+        Value.Scalar.newBuilder().setValue(value).build()
-+    ).setType(Value.Type.SCALAR).build();
-+  }
-+
-+  private static double getResource(Collection<Resource> resources, String name) {
-+    for (Resource r : resources) {
-+      if (r.getName().equals(name)) {
-+        return r.getScalar().getValue();
-+      }
-+    }
-+    throw new IndexOutOfBoundsException(name);
-+  }
-+
-+  private static double getResource(Offer offer, String name) {
-+    return getResource(offer.getResourcesList(), name);
-+  }
-+
-+  private static double getResource(TaskInfo task, String name) {
-+    return getResource(task.getResourcesList(), name);
-+  }
-+
-+  @Override
-+  public void resourceOffers(SchedulerDriver d, List<Offer> offers) {
-+    try {
-+      synchronized(jobTracker) {
-+
-+        int numOffers = (int) offers.size();
-+        double[] cpus = new double[numOffers];
-+        double[] mem = new double[numOffers];
-+
-+        // Count up the amount of free CPUs and memory on each node
-+        for (int i = 0; i < numOffers; i++) {
-+          Offer offer = offers.get(i);
-+          LOG.info("Got resource offer " + offer.getId());
-+          cpus[i] = getResource(offer, "cpus");
-+          mem[i] = getResource(offer, "mem");
-+        }
-+
-+        // Assign tasks to the nodes in a round-robin manner, and stop when we
-+        // are unable to assign a task to any node.
-+        // We do this by keeping a linked list of indices of nodes for which
-+        // we are still considering assigning tasks. Whenever we can't find a
-+        // new task for a node, we remove it from the list. When the list is
-+        // empty, no further assignments can be made. This algorithm was chosen
-+        // because it minimizing the amount of scanning we need to do if we
-+        // get a large set of offered nodes.
-+        List<Integer> indices = new LinkedList<Integer>();
-+        List<List<TaskInfo>> replies =
-+            new ArrayList<List<TaskInfo>>(numOffers);
-+        for (int i = 0; i < numOffers; i++) {
-+          indices.add(i);
-+          replies.add(new ArrayList<TaskInfo>());
-+        }
-+        while (indices.size() > 0) {
-+          for (Iterator<Integer> it = indices.iterator(); it.hasNext();) {
-+            int i = it.next();
-+            Offer offer = offers.get(i);
-+            TaskInfo task = findTask(
-+                offer.getSlaveId(), offer.getHostname(), cpus[i], mem[i]);
-+            if (task != null) {
-+              cpus[i] -= getResource(task, "cpus");
-+              mem[i] -= getResource(task, "mem");
-+              replies.get(i).add(task);
-+            } else {
-+              it.remove();
-+            }
-+          }
-+        }
-+
-+        for (int i = 0; i < numOffers; i++) {
-+          OfferID offerId = offers.get(i).getId();
-+          Status status = d.launchTasks(offerId, replies.get(i));
-+          if (status != Status.DRIVER_RUNNING) {
-+            LOG.warn("SchedulerDriver returned irregular status: " + status);
-+          }
-+        }
-+      }
-+    } catch(Exception e) {
-+      LOG.error("Error in resourceOffer", e);
-+    }
-+  }
-+
-+  private TaskTrackerInfo getTaskTrackerInfo(String host, SlaveID slaveId) {
-+    if (ttInfos.containsKey(host)) {
-+      return ttInfos.get(host);
-+    } else {
-+      TaskTrackerInfo info = new TaskTrackerInfo(slaveId.toBuilder().build());
-+      ttInfos.put(host, info);
-+      return info;
-+    }
-+  }
-+
-+  // Find a single task for a given node. Assumes JobTracker is locked.
-+  private TaskInfo findTask(
-+      SlaveID slaveId, String host, double cpus, double mem) {
-+    if (cpus < cpusPerTask || mem < memPerTask) {
-+      return null; // Too few resources are left on the node
-+    }
-+
-+    TaskTrackerInfo ttInfo = getTaskTrackerInfo(host, slaveId);
-+
-+    // Pick whether to launch a map or a reduce based on available tasks
-+    String taskType = null;
-+    boolean haveMaps = canLaunchMap(host);
-+    boolean haveReduces = canLaunchReduce(host);
-+    //LOG.info("Looking at " + host + ": haveMaps=" + haveMaps +
-+    //         ", haveReduces=" + haveReduces);
-+    if (!haveMaps && !haveReduces) {
-+      return null;
-+    } else if (haveMaps && !haveReduces) {
-+      taskType = "map";
-+    } else if (haveReduces && !haveMaps) {
-+      taskType = "reduce";
-+    } else {
-+      float mapToReduceRatio = 1;
-+      if (ttInfo.reduces.size() < ttInfo.maps.size() / mapToReduceRatio)
-+        taskType = "reduce";
-+      else
-+        taskType = "map";
-+    }
-+    //LOG.info("Task type chosen: " + taskType);
-+
-+    // Get a Mesos task ID for the new task
-+    TaskID mesosId = newMesosTaskId();
-+
-+    // Remember that it is launched
-+    boolean isMap = taskType.equals("map");
-+    if (isMap) {
-+      unassignedMaps++;
-+    } else {
-+      unassignedReduces++;
-+    }
-+    MesosTask nt = new MesosTask(isMap, mesosId, host);
-+    mesosIdToMesosTask.put(mesosId, nt);
-+    ttInfo.add(nt);
-+
-+    LOG.info("Launching Mesos task " + mesosId.getValue() +
-+             " as " + taskType + " on " + host);
-+
-+    // Create a task description to pass back to Mesos.
-+    return TaskInfo.newBuilder()
-+        .setTaskId(mesosId)
-+        .setSlaveId(slaveId)
-+        .setName("task " + mesosId.getValue() + " (" + taskType + ")")
-+        .addResources(makeResource("cpus", cpusPerTask))
-+        .addResources(makeResource("mem", memPerTask))
-+        .setExecutor(getExecutorInfo())
-+        .build();
-+  }
-+
-+  private TaskID newMesosTaskId() {
-+    return TaskID.newBuilder().setValue(
-+        "" + nextMesosTaskId.getAndIncrement()
-+    ).build();
-+  }
-+
-+  public FrameworkInfo getFrameworkInfo() {
-+    String name = "Hadoop: " + jobTracker.getTrackerIdentifier() +
-+      " (RPC port: " + jobTracker.port + "," +
-+      " web UI port: " + jobTracker.infoPort + ")";
-+
-+    return FrameworkInfo.newBuilder().setUser("").setName(name).build();
-+  }
-+
-+  private static final ExecutorID EXECUTOR_ID =
-+      ExecutorID.newBuilder().setValue("default").build();
-+
-+  public ExecutorInfo getExecutorInfo() {
-+    try {
-+      String execPath = new File("bin/mesos-executor").getCanonicalPath();
-+      byte[] initArg = conf.get("mapred.job.tracker").getBytes("US-ASCII");
-+      return ExecutorInfo.newBuilder()
-+        .setCommand(CommandInfo.newBuilder()
-+                    .setValue(execPath).build())
-+        .setData(com.google.protobuf.ByteString.copyFrom(initArg))
-+        .setExecutorId(EXECUTOR_ID)
-+        .build();
-+    } catch (IOException e) {
-+      throw new RuntimeException(e);
-+    }
-+  }
-+
-+  // TODO: Make this return a count instead of a boolean?
-+  // TODO: Cache result for some time so we don't iterate through all jobs
-+  // and tasks every time we get a resource offer?
-+  private boolean canLaunchMap(String host) {
-+    // Check whether the TT is saturated on maps
-+    TaskTrackerInfo ttInfo = ttInfos.get(host);
-+    if (ttInfo == null) {
-+      throw new RuntimeException("Expecting TaskTrackerInfo for host " + host);
-+    }
-+
-+    if (ttInfo.maps.size() >= ttInfo.maxMaps) {
-+      return false;
-+    }
-+
-+    // Compute the total demand for maps to make sure we don't exceed it
-+    Collection<JobInProgress> jobs = jobTracker.jobs.values();
-+    int neededMaps = 0;
-+    for (JobInProgress job : jobs) {
-+      if (job.getStatus().getRunState() == JobStatus.RUNNING) {
-+        neededMaps += job.pendingMaps();
-+      }
-+    }
-+    // TODO (!!!): Count speculatable tasks and add them to neededMaps
-+    // For now, we just add 1
-+    if (jobs.size() > 0)
-+      neededMaps += 1;
-+
-+    if (unassignedMaps < neededMaps) {
-+      /*
-+      // Figure out what locality level to allow using delay scheduling
-+      long now = System.currentTimeMillis();
-+      if (lastCanLaunchMapTime == -1)
-+        lastCanLaunchMapTime = now;
-+      int maxLevel; // Cache level to search for maps in
-+      if (lastMapWasLocal) {
-+        timeWaitedForLocalMap += now - lastCanLaunchMapTime;
-+        if (timeWaitedForLocalMap >= localityWait) {
-+          maxLevel = Integer.MAX_VALUE;
-+        } else {
-+          maxLevel = 1;
-+        }
-+      } else {
-+        maxLevel = Integer.MAX_VALUE;
-+      }
-+      lastCanLaunchMapTime = now;
-+      */
-+      int maxLevel = Integer.MAX_VALUE;
-+      // Look for a map with the required level
-+      for (JobInProgress job: jobs) {
-+        int state = job.getStatus().getRunState();
-+        if (state == JobStatus.RUNNING) {
-+          int availLevel = availableMapLevel(job, host, maxLevel);
-+          if (availLevel != -1) {
-+            lastMapWasLocal = (availLevel == 0);
-+            return true;
-+          }
-+        }
-+      }
-+    }
-+
-+    // If we didn't launch any tasks, but there are pending jobs in the queue,
-+    // ensure that at least one TaskTracker is running to execute setup tasks
-+    int numTrackers = jobTracker.getClusterStatus().getTaskTrackers();
-+    if (jobs.size() > 0 && numTrackers == 0 && totalMesosTasks() == 0) {
-+      LOG.info("Going to launch map task for setup / cleanup");
-+      return true;
-+    }
-+
-+    return false;
-+  }
-+
-+  private int totalMesosTasks() {
-+    return unassignedMaps + unassignedReduces + assignedMaps + assignedReduces;
-+  }
-+
-+  // TODO: Make this return a count instead of a boolean?
-+  // TODO: Cache result for some time so we don't iterate through all jobs
-+  // and tasks every time we get a resource offer?
-+  private boolean canLaunchReduce(String host) {
-+    // Don't launch a reduce if we've only got one "slot"
-+    // available. We approximate this by not launching any reduce
-+    // tasks if there is only one TaskTracker.
-+    if (jobTracker.getClusterStatus().getTaskTrackers() <= 1) {
-+      Collection<JobInProgress> jobs = jobTracker.jobs.values();
-+      for (JobInProgress job : jobs) {
-+        if (job.getStatus().getRunState() == JobStatus.RUNNING) {
-+          if (job.pendingMaps() > 0) {
-+            return false;
-+          }
-+        }
-+      }
-+    }
-+
-+    // Check whether the TT is saturated on reduces
-+    TaskTrackerInfo ttInfo = ttInfos.get(host);
-+    if (ttInfo == null) {
-+      throw new RuntimeException("Expecting TaskTrackerInfo for host " + host);
-+    }
-+
-+    if (ttInfo.reduces.size() >= ttInfo.maxReduces) {
-+      return false;
-+    }
-+
-+    // Compute total demand for reduces, to make sure we don't exceed it
-+    Collection<JobInProgress> jobs = jobTracker.jobs.values();
-+    int neededReduces = 0;
-+    for (JobInProgress job : jobs) {
-+      if (job.getStatus().getRunState() == JobStatus.RUNNING) {
-+        neededReduces += job.pendingReduces();
-+      }
-+    }
-+    // TODO (!!!): Count speculatable tasks and add them to neededReduces
-+    // For now, we just add 1
-+    if (jobs.size() > 0)
-+      neededReduces += 1;
-+
-+    if (neededReduces > unassignedReduces) {
-+      // Find a reduce to launch
-+      for (JobInProgress job: jobs) {
-+        int state = job.getStatus().getRunState();
-+        if (state == JobStatus.RUNNING && hasReduceToLaunch(job)) {
-+          return true;
-+        }
-+      }
-+    }
-+
-+    return false;
-+  }
-+
-+  @Override
-+  public void statusUpdate(SchedulerDriver d, org.apache.mesos.Protos.TaskStatus status) {
-+    TaskState state = status.getState();
-+    LOG.info("Task " + status.getTaskId().getValue() + " is " + state);
-+    if (state == TaskState.TASK_FINISHED || state == TaskState.TASK_FAILED ||
-+        state == TaskState.TASK_KILLED || state == TaskState.TASK_LOST) {
-+      synchronized (jobTracker) {
-+        TaskID mesosId = status.getTaskId();
-+        MesosTask nt = mesosIdToMesosTask.get(mesosId);
-+        if (nt == null) {
-+          throw new RuntimeException(
-+              "Received status update for unknown task " + status.getTaskId());
-+        }
-+        removeTask(nt);
-+      }
-+    }
-+  }
-+
-+  /**
-+   * Called by JobTracker to ask us to launch tasks on a heartbeat.
-+   *
-+   * This is currently kind of silly; would be better to grab tasks when
-+   * we respond to the Mesos assignment, but then we'd need to be willing to
-+   * launch TaskTrackers everywhere
-+   */
-+  public List<Task> assignTasks(TaskTracker tt) {
-+    synchronized (jobTracker) {
-+      try {
-+        Collection<JobInProgress> jobs = jobTracker.jobs.values();
-+
-+        TaskTrackerStatus tts = tt.getStatus();
-+        String host = tts.getHost();
-+
-+        TaskTrackerInfo ttInfo = ttInfos.get(host);
-+        if (ttInfo == null) {
-+          throw new RuntimeException(
-+              "Expecting TaskTrackerInfo for host " + host);
-+        }
-+
-+        ttInfo.maxMaps = tts.getMaxMapSlots();
-+        ttInfo.maxReduces = tts.getMaxReduceSlots();
-+
-+        int clusterSize = jobTracker.getClusterStatus().getTaskTrackers();
-+        int numHosts = jobTracker.getNumberOfUniqueHosts();
-+
-+        // Assigned tasks
-+        List<Task> assignedTasks = new ArrayList<Task>();
-+
-+        // Identify unassigned maps and reduces on this TT
-+        List<MesosTask> assignableMaps = new ArrayList<MesosTask>();
-+        List<MesosTask> assignableReduces = new ArrayList<MesosTask>();
-+        for (MesosTask nt: ttInfo.maps)
-+          if (!nt.isAssigned())
-+            assignableMaps.add(nt);
-+        for (MesosTask nt: ttInfo.reduces)
-+          if (!nt.isAssigned())
-+            assignableReduces.add(nt);
-+
-+        LOG.info("Assigning tasks for " + host + " with " +
-+                 assignableMaps.size() + " map slots and " +
-+                 assignableReduces.size() + " reduce slots");
-+
-+        // Get some iterators for the unassigned tasks
-+        Iterator<MesosTask> mapIter = assignableMaps.iterator();
-+        Iterator<MesosTask> reduceIter = assignableReduces.iterator();
-+
-+        // Go through jobs in FIFO order and look for tasks to launch
-+        for (JobInProgress job: jobs) {
-+          if (job.getStatus().getRunState() == JobStatus.RUNNING) {
-+            // If the node has unassigned maps, try to launch map tasks
-+            while (mapIter.hasNext()) {
-+              Task task = job.obtainNewMapTask(tts, clusterSize, numHosts);
-+              if (task != null) {
-+                MesosTask nt = mapIter.next();
-+                nt.assign(task);
-+                unassignedMaps--;
-+                assignedMaps++;
-+                hadoopIdToMesosTask.put(task.getTaskID(), nt);
-+                assignedTasks.add(task);
-+                task.extraData = "" + nt.mesosId.getValue();
-+              } else {
-+                break;
-+              }
-+            }
-+            // If the node has unassigned reduces, try to launch reduce tasks
-+            while (reduceIter.hasNext()) {
-+              Task task = job.obtainNewReduceTask(tts, clusterSize, numHosts);
-+              if (task != null) {
-+                MesosTask nt = reduceIter.next();
-+                nt.assign(task);
-+                unassignedReduces--;
-+                assignedReduces++;
-+                hadoopIdToMesosTask.put(task.getTaskID(), nt);
-+                assignedTasks.add(task);
-+                task.extraData = "" + nt.mesosId.getValue();
-+              } else {
-+                break;
-+              }
-+            }
-+          }
-+        }
-+
-+        return assignedTasks;
-+      } catch (IOException e) {
-+        LOG.error("IOException in assignTasks", e);
-+        return null;
-+      }
-+    }
-+  }
-+
-+  private void removeTask(MesosTask nt) {
-+    synchronized (jobTracker) {
-+      mesosIdToMesosTask.remove(nt.mesosId);
-+      if (nt.hadoopId != null) {
-+        hadoopIdToMesosTask.remove(nt.hadoopId);
-+      }
-+      TaskTrackerInfo ttInfo = ttInfos.get(nt.host);
-+      if (ttInfo != null) {
-+        ttInfo.remove(nt);
-+      }
-+      if (nt.isMap) {
-+        if (nt.isAssigned())
-+          assignedMaps--;
-+        else
-+          unassignedMaps--;
-+      } else {
-+        if (nt.isAssigned())
-+          assignedReduces--;
-+        else
-+          unassignedReduces--;
-+      }
-+    }
-+  }
-+
-+  private void askExecutorToUpdateStatus(MesosTask nt, TaskState state) {
-+    TaskTrackerInfo ttInfo = ttInfos.get(nt.host);
-+    if (ttInfo != null) {
-+      HadoopFrameworkMessage message = new HadoopFrameworkMessage(
-+          HadoopFrameworkMessage.Type.S2E_SEND_STATUS_UPDATE,
-+          state.toString(),
-+          nt.mesosId.getValue());
-+      try {
-+        LOG.info("Asking slave " + ttInfo.mesosSlaveId.getValue() +
-+                 " to update status for task " + nt.mesosId.getValue() +
-+                 " to " + state);
-+        driver.sendFrameworkMessage(
-+            EXECUTOR_ID, ttInfo.mesosSlaveId, message.serialize());
-+      } catch (IOException e) {
-+        // This exception would only get thrown if we couldn't
-+        // serialize the HadoopFrameworkMessage, which is a serious
-+        // problem; crash the JT.
-+        throw new RuntimeException(
-+            "Failed to serialize HadoopFrameworkMessage", e);
-+      }
-+    }
-+  }
-+
-+  // Kill any unlaunched tasks that have timed out
-+  public void killTimedOutTasks() {
-+    synchronized (jobTracker) {
-+      long curTime = System.currentTimeMillis();
-+      long timeout = 20000;
-+      long minCreationTime = curTime - timeout;
-+      LOG.info("Killing tasks that started " + timeout + " milliseconds ago");
-+      for (TaskTrackerInfo tt: ttInfos.values()) {
-+        killTimedOutTasks(tt.maps, minCreationTime);
-+        killTimedOutTasks(tt.reduces, minCreationTime);
-+      }
-+      driver.reviveOffers();
-+    }
-+  }
-+
-+  private void killTimedOutTasks(List<MesosTask> tasks, long minCreationTime) {
-+    List<MesosTask> toRemove = new ArrayList<MesosTask>();
-+    for (MesosTask nt: tasks) {
-+      if (!nt.isAssigned() && nt.creationTime < minCreationTime) {
-+        toRemove.add(nt);
-+      }
-+    }
-+    for (MesosTask nt: toRemove) {
-+      LOG.info("Killing timedout task " + nt.mesosId.getValue() +
-+               " created at " + nt.creationTime);
-+      askExecutorToUpdateStatus(nt, TaskState.TASK_KILLED);
-+    }
-+  }
-+
-+  @Override
-+  public void frameworkMessage(SchedulerDriver d, ExecutorID eId, SlaveID sId, byte[] message) {
-+    // TODO: Respond to E2S_KILL_REQUEST message by killing a task
-+  }
-+
-+  @Override
-+  public void slaveLost(SchedulerDriver d, SlaveID slaveId) {}
-+
-+  @Override
-+  public void executorLost(SchedulerDriver d,
-+                           ExecutorID executorId,
-+                           SlaveID slaveId,
-+                           int status) {}
-+
-+  public void error(SchedulerDriver d, String message) {
-+    LOG.error("FrameworkScheduler.error: " + message);
-+  }
-+
-+  @Override
-+  public void offerRescinded(SchedulerDriver d, OfferID oId) {}
-+
-+  // Methods to check whether a job has runnable tasks
-+
-+  /**
-+   * Check whether the job can launch a map task on a given node, with a given
-+   * level of locality (maximum cache level). Also includes job setup and
-+   * cleanup tasks, as well as map cleanup tasks. Returns the locality level of
-+   * the launchable map if one exists, or -1 otherwise.
-+   *
-+   * This is currently fairly long because it replicates a lot of the logic
-+   * in findNewMapTask. Unfortunately, it's not easy to just use findNewMapTask
-+   * directly, because that requires a TaskTracker. One way to avoid requiring
-+   * this method would be to just launch TaskTrackers on every node, without
-+   * first checking for locality.
-+   */
-+  int availableMapLevel(JobInProgress job, String host, int maxCacheLevel) {
-+    synchronized (job) {
-+      // For scheduling a map task, we have two caches and a list (optional)
-+      //  I)   one for non-running task
-+      //  II)  one for running task (this is for handling speculation)
-+      //  III) a list of TIPs that have empty locations (e.g., dummy splits),
-+      //       the list is empty if all TIPs have associated locations
-+
-+      // First a look up is done on the non-running cache and on a miss, a look
-+      // up is done on the running cache. The order for lookup within the cache:
-+      //   1. from local node to root [bottom up]
-+      //   2. breadth wise for all the parent nodes at max level
-+
-+      //if (canLaunchJobCleanupTask()) return true;
-+      //if (canLaunchSetupTask()) return true;
-+      if (!job.mapCleanupTasks.isEmpty()) return 0;
-+
-+      // Return false right away if the task cache isn't ready, either because
-+      // we are still initializing or because we are cleaning up
-+      if (job.nonRunningMapCache == null) return -1;
-+
-+      // We fall to linear scan of the list (III above) if we have misses in the
-+      // above caches
-+
-+      Node node = jobTracker.getNode(host);
-+
-+      int maxLevel = job.getMaxCacheLevel();
-+
-+      //
-+      // I) Non-running TIP :
-+      //
-+
-+      // 1. check from local node to the root [bottom up cache lookup]
-+      //    i.e if the cache is available and the host has been resolved
-+      //    (node!=null)
-+      if (node != null) {
-+        Node key = node;
-+        int level = 0;
-+        // maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
-+        // called to schedule any task (local, rack-local, off-switch or speculative)
-+        // tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if findNewMapTask is
-+        //  (i.e. -1) if findNewMapTask is to only schedule off-switch/speculative
-+        // tasks
-+        int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
-+        for (level = 0;level < maxLevelToSchedule; ++level) {
-+          List <TaskInProgress> cacheForLevel = job.nonRunningMapCache.get(key);
-+          if (hasUnlaunchedTask(cacheForLevel)) {
-+            return level;
-+          }
-+          key = key.getParent();
-+        }
-+
-+        // Check if we need to only schedule a local task (node-local/rack-local)
-+        if (level == maxCacheLevel) {
-+          return -1;
-+        }
-+      }
-+
-+      //2. Search breadth-wise across parents at max level for non-running
-+      //   TIP if
-+      //     - cache exists and there is a cache miss
-+      //     - node information for the tracker is missing (tracker's topology
-+      //       info not obtained yet)
-+
-+      // collection of node at max level in the cache structure
-+      Collection<Node> nodesAtMaxLevel = jobTracker.getNodesAtMaxLevel();
-+
-+      // get the node parent at max level
-+      Node nodeParentAtMaxLevel =
-+        (node == null) ? null : JobTracker.getParentNode(node, maxLevel - 1);
-+
-+      for (Node parent : nodesAtMaxLevel) {
-+
-+        // skip the parent that has already been scanned
-+        if (parent == nodeParentAtMaxLevel) {
-+          continue;
-+        }
-+
-+        List<TaskInProgress> cache = job.nonRunningMapCache.get(parent);
-+        if (hasUnlaunchedTask(cache)) {
-+          return maxLevel-1;
-+        }
-+      }
-+
-+      // 3. Search non-local tips for a new task
-+      if (hasUnlaunchedTask(job.nonLocalMaps))
-+        return 0;
-+
-+      //
-+      // II) Running TIP :
-+      //
-+
-+      if (job.getMapSpeculativeExecution()) {
-+        long time = System.currentTimeMillis();
-+        float avgProg = job.status.mapProgress();
-+
-+        // 1. Check bottom up for speculative tasks from the running cache
-+        if (node != null) {
-+          Node key = node;
-+          for (int level = 0; level < maxLevel; ++level) {
-+            Set<TaskInProgress> cacheForLevel = job.runningMapCache.get(key);
-+            if (cacheForLevel != null) {
-+              for (TaskInProgress tip: cacheForLevel) {
-+                if (tip.isRunning() && tip.hasSpeculativeTask(time, avgProg)) {
-+                  return level;
-+                }
-+              }
-+            }
-+            key = key.getParent();
-+          }
-+        }
-+
-+        // 2. Check breadth-wise for speculative tasks
-+
-+        for (Node parent : nodesAtMaxLevel) {
-+          // ignore the parent which is already scanned
-+          if (parent == nodeParentAtMaxLevel) {
-+            continue;
-+          }
-+
-+          Set<TaskInProgress> cache = job.runningMapCache.get(parent);
-+          if (cache != null) {
-+            for (TaskInProgress tip: cache) {
-+              if (tip.isRunning() && tip.hasSpeculativeTask(time, avgProg)) {
-+                return maxLevel-1;
-+              }
-+            }
-+          }
-+        }
-+
-+        // 3. Check non-local tips for speculation
-+        for (TaskInProgress tip: job.nonLocalRunningMaps) {
-+          if (tip.isRunning() && tip.hasSpeculativeTask(time, avgProg)) {
-+            return 0;
-+          }
-+        }
-+      }
-+
-+      return -1;
-+    }
-+  }
-+
-+  /**
-+   * Check whether a task list (from the non-running map cache) contains any
-+   * unlaunched tasks.
-+   */
-+  boolean hasUnlaunchedTask(Collection<TaskInProgress> cache) {
-+    if (cache != null)
-+      for (TaskInProgress tip: cache)
-+        if (tip.isRunnable() && !tip.isRunning())
-+          return true;
-+    return false;
-+  }
-+
-+  /**
-+   * Check whether a job can launch a reduce task. Also includes reduce
-+   * cleanup tasks.
-+   *
-+   * As with hasMapToLaunch, this duplicates the logic inside
-+   * findNewReduceTask. Please see the comment there for an explanation.
-+   */
-+  boolean hasReduceToLaunch(JobInProgress job) {
-+    synchronized (job) {
-+      // Return false if not enough maps have finished to launch reduces
-+      if (!job.scheduleReduces()) return false;
-+
-+      // Check for a reduce cleanup task
-+      if (!job.reduceCleanupTasks.isEmpty()) return true;
-+
-+      // Return false right away if the task cache isn't ready, either because
-+      // we are still initializing or because we are cleaning up
-+      if (job.nonRunningReduces == null) return false;
-+
-+      // Check for an unlaunched reduce
-+      if (job.nonRunningReduces.size() > 0) return true;
-+
-+      // Check for a reduce to be speculated
-+      if (job.getReduceSpeculativeExecution()) {
-+        long time = System.currentTimeMillis();
-+        float avgProg = job.status.reduceProgress();
-+        for (TaskInProgress tip: job.runningReduces) {
-+          if (tip.isRunning() && tip.hasSpeculativeTask(time, avgProg)) {
-+            return true;
-+          }
-+        }
-+      }
-+
-+      return false;
-+    }
-+  }
-+}
-diff --git a/hadoop/hadoop-0.20.205.0/src/contrib/mesos/src/java/org/apache/hadoop/mapred/HadoopFrameworkMessage.java b/hadoop/hadoop-0.20.205.0/src/contrib/mesos/src/java/org/apache/hadoop/mapred/HadoopFrameworkMessage.java
-new file mode 100644
-index 0000000..810402d
---- /dev/null
-+++ b/hadoop/hadoop-0.20.205.0/src/contrib/mesos/src/java/org/apache/hadoop/mapred/HadoopFrameworkMessage.java
-@@ -0,0 +1,53 @@
-+package org.apache.hadoop.mapred;
-+
-+import java.io.ByteArrayInputStream;
-+import java.io.ByteArrayOutputStream;
-+import java.io.DataInputStream;
-+import java.io.DataOutputStream;
-+import java.io.IOException;
-+
-+public class HadoopFrameworkMessage {
-+  enum Type {
-+    S2E_SEND_STATUS_UPDATE, // Used by scheduler to ask executor to send a Mesos
-+                            // status update for a given task
-+    S2E_SHUTDOWN_EXECUTOR,  // Used by the scheduler to ask executor to shutdown
-+                            // (so that we can clean up TaskTrackers when idle)
-+    E2S_KILL_REQUEST,       // Used by executor to report a killTask from Mesos
-+  }
-+
-+  Type type;
-+  String arg1;
-+  String arg2;
-+
-+
-+  public HadoopFrameworkMessage(Type type, String arg1, String arg2) {
-+    this.type = type;
-+    this.arg1 = arg1;
-+    this.arg2 = arg2;
-+  }
-+
-+  public HadoopFrameworkMessage(Type type, String arg1) {
-+    this(type, arg1, "");
-+  }
-+
-+  public HadoopFrameworkMessage(byte[] bytes) throws IOException {
-+    DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
-+    String typeStr = in.readUTF();
-+    try {
-+      type = Type.valueOf(typeStr);
-+    } catch(IllegalArgumentException e) {
-+      throw new IOException("Unknown message type: " + typeStr);
-+    }
-+    arg1 = in.readUTF();
-+    arg2 = in.readUTF();
-+  }
-+
-+  public byte[] serialize() throws IOException {
-+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-+    DataOutputStream dos = new DataOutputStream(bos);
-+    dos.writeUTF(type.toString());
-+    dos.writeUTF(arg1);
-+    dos.writeUTF(arg2);
-+    return bos.toByteArray();
-+  }
-+}
-diff --git a/hadoop/hadoop-0.20.205.0/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/hadoop-0.20.205.0/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
-new file mode 100644
-index 0000000..a16af72
---- /dev/null
-+++ b/hadoop/hadoop-0.20.205.0/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
-@@ -0,0 +1,86 @@
-+package org.apache.hadoop.mapred;
-+
-+import java.io.IOException;
-+import java.util.ArrayList;
-+import java.util.Collection;
-+import java.util.List;
-+
-+import org.apache.mesos.MesosSchedulerDriver;
-+import org.apache.mesos.SchedulerDriver;
-+
-+import org.apache.commons.logging.Log;
-+import org.apache.commons.logging.LogFactory;
-+import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-+
-+public class MesosScheduler extends TaskScheduler {
-+  public static final Log LOG =
-+    LogFactory.getLog(MesosScheduler.class);
-+
-+  private boolean running = false;
-+  private FrameworkScheduler frameworkScheduler;
-+  private SchedulerDriver driver;
-+  JobTracker jobTracker;
-+
-+  private EagerTaskInitializationListener eagerInitListener;
-+
-+  public MesosScheduler() { 
-+  }
-+
-+  @Override
-+  public void start() throws IOException {
-+    try {
-+      LOG.info("Starting MesosScheduler");
-+      jobTracker = (JobTracker) super.taskTrackerManager;
-+
-+      Configuration conf = getConf();
-+      String master = conf.get("mapred.mesos.master", "local");
-+
-+      this.eagerInitListener = new EagerTaskInitializationListener(conf);
-+      eagerInitListener.setTaskTrackerManager(taskTrackerManager);
-+      eagerInitListener.start();
-+      taskTrackerManager.addJobInProgressListener(eagerInitListener);
-+
-+      frameworkScheduler = new FrameworkScheduler(this);
-+      driver = new MesosSchedulerDriver(
-+          frameworkScheduler, frameworkScheduler.getFrameworkInfo(), master);
-+
-+      driver.start();
-+    } catch (Exception e) {
-+      // If the MesosScheduler can't be loaded, the JT won't be useful at all,
-+      // so crash it now so that the user notices.
-+      LOG.fatal("Failed to start MesosScheduler", e);
-+      // TODO: Use System.exit(1) instead of RuntimeException?
-+      throw new RuntimeException("Failed to start MesosScheduler", e);
-+    }
-+  }
-+
-+  @Override
-+  public void terminate() throws IOException {
-+    try {
-+      if (running) {
-+        LOG.info("Stopping MesosScheduler");
-+        driver.stop();
-+        frameworkScheduler.cleanUp();
-+      }
-+      if (eagerInitListener != null) {
-+        taskTrackerManager.removeJobInProgressListener(eagerInitListener);
-+      }
-+    } catch (Exception e) {
-+      e.printStackTrace();
-+    }
-+  }
-+
-+  @Override
-+  public List<Task> assignTasks(TaskTracker taskTracker) throws IOException {
-+    return frameworkScheduler.assignTasks(taskTracker);
-+  }
-+
-+  @Override
-+  public Collection<JobInProgress> getJobs(String queueName) {
-+    // TODO Actually return some jobs
-+    ArrayList<JobInProgress> list = new ArrayList<JobInProgress>();
-+    return list;
-+  }
-+
-+}
-diff --git a/hadoop/hadoop-0.20.205.0/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosTaskTrackerInstrumentation.java b/hadoop/hadoop-0.20.205.0/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosTaskTrackerInstrumentation.java
-new file mode 100644
-index 0000000..a773f12
---- /dev/null
-+++ b/hadoop/hadoop-0.20.205.0/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosTaskTrackerInstrumentation.java
-@@ -0,0 +1,28 @@
-+package org.apache.hadoop.mapred;
-+
-+import java.io.File;
-+
-+import org.apache.commons.logging.Log;
-+import org.apache.commons.logging.LogFactory;
-+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
-+
-+public class MesosTaskTrackerInstrumentation extends TaskTrackerInstrumentation {
-+  public static final Log LOG =
-+    LogFactory.getLog(MesosTaskTrackerInstrumentation.class);
-+
-+  private FrameworkExecutor executor;
-+
-+  public MesosTaskTrackerInstrumentation(TaskTracker t) {
-+    super(t);
-+    executor = FrameworkExecutor.getInstance();
-+    if (executor == null) {
-+      throw new IllegalArgumentException("MesosTaskTrackerInstrumentation " +
-+          "is being used without an active FrameworkExecutor");
-+    }
-+  }
-+
-+  @Override
-+  public void statusUpdate(Task task, TaskStatus taskStatus) {
-+    executor.statusUpdate(task, taskStatus);
-+  }
-+}
-diff --git a/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/JobInProgress.java b/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
+diff --git a/src/mapred/org/apache/hadoop/mapred/JobInProgress.java b/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
 index 5dfbe88..9bd5769 100644
---- a/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
-+++ b/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
+--- a/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
++++ b/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
 @@ -3556,4 +3556,8 @@ public class JobInProgress {
      }
      return level;
@@ -1462,10 +11,10 @@ index 5dfbe88..9bd5769 100644
 +    return maxLevel;
 +  }
  }
-diff --git a/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/Task.java b/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/Task.java
+diff --git a/src/mapred/org/apache/hadoop/mapred/Task.java b/src/mapred/org/apache/hadoop/mapred/Task.java
 index 08a56a4..80fa25b 100644
---- a/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/Task.java
-+++ b/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/Task.java
+--- a/src/mapred/org/apache/hadoop/mapred/Task.java
++++ b/src/mapred/org/apache/hadoop/mapred/Task.java
 @@ -165,6 +165,7 @@ abstract public class Task implements Writable, Configurable {
    protected TaskUmbilicalProtocol umbilical;
    protected SecretKey tokenSecret;
@@ -1490,10 +39,10 @@ index 08a56a4..80fa25b 100644
    }
  
    @Override
-diff --git a/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/TaskRunner.java b/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
+diff --git a/src/mapred/org/apache/hadoop/mapred/TaskRunner.java b/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
 index ca54508..92ae18b 100644
---- a/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
-+++ b/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
+--- a/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
++++ b/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
 @@ -228,9 +228,12 @@ abstract class TaskRunner extends Thread {
        File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
        File stdout = logFiles[0];
@@ -1528,10 +77,10 @@ index ca54508..92ae18b 100644
            }
            throw new IOException("Task process exit with nonzero status of " +
                exitCode + ".");
-diff --git a/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java b/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
+diff --git a/src/mapred/org/apache/hadoop/mapred/TaskTracker.java b/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
 index 76d2f5f..5f43c40 100644
---- a/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
-+++ b/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
+--- a/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
++++ b/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
 @@ -399,10 +399,11 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
  
    private ShuffleServerInstrumentation shuffleServerMetrics;
@@ -1683,10 +232,10 @@ index 76d2f5f..5f43c40 100644
        return true;
      } else {
        LOG.warn("Progress from unknown child task: "+taskid);
-diff --git a/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java b/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
+diff --git a/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java b/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
 index 2d1543f..33d7502 100644
---- a/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
-+++ b/hadoop/hadoop-0.20.205.0/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
+--- a/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
++++ b/src/mapred/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
 @@ -64,6 +64,13 @@ class TaskTrackerInstrumentation  {
     */
    public void reportTaskEnd(TaskAttemptID t) {}

Added: incubator/mesos/trunk/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch?rev=1332469&view=auto
==============================================================================
--- incubator/mesos/trunk/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch (added)
+++ incubator/mesos/trunk/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch Mon Apr 30 23:28:43 2012
@@ -0,0 +1,13 @@
+diff --git a/conf/hadoop-env.sh b/conf/hadoop-env.sh
+index ada5bef..76aaf48 100644
+--- a/conf/hadoop-env.sh
++++ b/conf/hadoop-env.sh
+@@ -9,7 +9,7 @@
+ # export JAVA_HOME=/usr/lib/j2sdk1.5-sun
+ 
+ # Extra Java CLASSPATH elements.  Optional.
+-# export HADOOP_CLASSPATH=
++export HADOOP_CLASSPATH=${HADOOP_HOME}/build/contrib/mesos/classes
+ 
+ # The maximum amount of heap to use, in MB. Default is 1000.
+ # export HADOOP_HEAPSIZE=2000

Added: incubator/mesos/trunk/hadoop/hadoop-0.20.205.0_mesos.patch
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/hadoop-0.20.205.0_mesos.patch?rev=1332469&view=auto
==============================================================================
--- incubator/mesos/trunk/hadoop/hadoop-0.20.205.0_mesos.patch (added)
+++ incubator/mesos/trunk/hadoop/hadoop-0.20.205.0_mesos.patch Mon Apr 30 23:28:43 2012
@@ -0,0 +1,22 @@
+diff --git a/ivy/libraries.properties b/ivy/libraries.properties
+index b47b4c3..713f0c1 100644
+--- a/ivy/libraries.properties
++++ b/ivy/libraries.properties
+@@ -86,3 +86,5 @@ slf4j-log4j12.version=1.4.3
+ wagon-http.version=1.0-beta-2
+ xmlenc.version=0.52
+ xerces.version=1.4.4
++
++protobuf-java.version=2.4.1
+diff --git a/src/contrib/build.xml b/src/contrib/build.xml
+index 3c19e25..ecb7198 100644
+--- a/src/contrib/build.xml
++++ b/src/contrib/build.xml
+@@ -55,6 +55,7 @@
+       <fileset dir="." includes="fairscheduler/build.xml"/>
+       <fileset dir="." includes="capacity-scheduler/build.xml"/>
+       <fileset dir="." includes="gridmix/build.xml"/>
++      <fileset dir="." includes="mesos/build.xml"/>
+     </subant>
+      <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
+      <fail if="testsfailed">Tests failed!</fail>

Copied: incubator/mesos/trunk/hadoop/mapred-site.xml.patch (from r1332468, incubator/mesos/trunk/hadoop/hadoop-0.20.205.0_conf_mapred-site.xml.patch)
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/mapred-site.xml.patch?p2=incubator/mesos/trunk/hadoop/mapred-site.xml.patch&p1=incubator/mesos/trunk/hadoop/hadoop-0.20.205.0_conf_mapred-site.xml.patch&r1=1332468&r2=1332469&rev=1332469&view=diff
==============================================================================
--- incubator/mesos/trunk/hadoop/hadoop-0.20.205.0_conf_mapred-site.xml.patch (original)
+++ incubator/mesos/trunk/hadoop/mapred-site.xml.patch Mon Apr 30 23:28:43 2012
@@ -1,7 +1,7 @@
-diff --git a/hadoop/hadoop-0.20.205.0/conf/mapred-site.xml b/hadoop/hadoop-0.20.205.0/conf/mapred-site.xml
+diff --git a/conf/mapred-site.xml b/conf/mapred-site.xml
 index 970c8fe..f9f272d 100644
---- a/hadoop/hadoop-0.20.205.0/conf/mapred-site.xml
-+++ b/hadoop/hadoop-0.20.205.0/conf/mapred-site.xml
+--- a/conf/mapred-site.xml
++++ b/conf/mapred-site.xml
 @@ -4,5 +4,16 @@
  <!-- Put site-specific property overrides in this file. -->
  

Added: incubator/mesos/trunk/hadoop/mesos-executor
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/mesos-executor?rev=1332469&view=auto
==============================================================================
--- incubator/mesos/trunk/hadoop/mesos-executor (added)
+++ incubator/mesos/trunk/hadoop/mesos-executor Mon Apr 30 23:28:43 2012
@@ -0,0 +1,3 @@
+#!/bin/sh
+
+exec `dirname ${0}`/hadoop org.apache.hadoop.mapred.FrameworkExecutor

Propchange: incubator/mesos/trunk/hadoop/mesos-executor
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/mesos/trunk/hadoop/mesos/build.xml
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/mesos/build.xml?rev=1332469&view=auto
==============================================================================
--- incubator/mesos/trunk/hadoop/mesos/build.xml (added)
+++ incubator/mesos/trunk/hadoop/mesos/build.xml Mon Apr 30 23:28:43 2012
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<!--
+Before you can run these subtargets directly, you need
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="mesos" default="jar">
+
+  <import file="../build-contrib.xml"/>
+
+</project>

Added: incubator/mesos/trunk/hadoop/mesos/ivy.xml
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/mesos/ivy.xml?rev=1332469&view=auto
==============================================================================
--- incubator/mesos/trunk/hadoop/mesos/ivy.xml (added)
+++ incubator/mesos/trunk/hadoop/mesos/ivy.xml Mon Apr 30 23:28:43 2012
@@ -0,0 +1,42 @@
+<?xml version="1.0" ?>
+<ivy-module version="1.0">
+  <info organisation="org.apache.hadoop" module="${ant.project.name}">
+    <license name="Apache 2.0"/>
+    <ivyauthor name="Apache Hadoop Team" url="http://hadoop.apache.org"/>
+    <description>
+        Apache Hadoop contrib
+    </description>
+  </info>
+  <configurations defaultconfmapping="default">
+    <!--these match the Maven configurations-->
+    <conf name="default" extends="master,runtime"/>
+    <conf name="master" description="contains the artifact but no dependencies"/>
+    <conf name="runtime" description="runtime but not the artifact" />
+
+    <conf name="common" visibility="private"
+      description="artifacts needed to compile/test the application"/>
+  </configurations>
+
+  <publications>
+    <!--get the artifact from our module name-->
+    <artifact conf="master"/>
+  </publications>
+  <dependencies>
+    <dependency org="commons-logging"
+      name="commons-logging"
+      rev="${commons-logging.version}"
+      conf="common->default"/>
+    <dependency org="log4j"
+      name="log4j"
+      rev="${log4j.version}"
+      conf="common->master"/>
+   <dependency org="junit"
+      name="junit"
+      rev="${junit.version}"
+      conf="common->default"/>
+   <dependency org="com.google.protobuf"
+     name="protobuf-java"
+     rev="${protobuf-java.version}"
+     conf="common->default"/>
+  </dependencies>
+</ivy-module>

Added: incubator/mesos/trunk/hadoop/mesos/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/mesos/ivy/libraries.properties?rev=1332469&view=auto
==============================================================================
--- incubator/mesos/trunk/hadoop/mesos/ivy/libraries.properties (added)
+++ incubator/mesos/trunk/hadoop/mesos/ivy/libraries.properties Mon Apr 30 23:28:43 2012
@@ -0,0 +1,5 @@
+#This properties file lists the versions of the various artifacts used by streaming.
+#It drives ivy and the generation of a maven POM
+
+#Please list the dependencies name with version if they are different from the ones
+#listed in the global libraries.properties file (in alphabetical order)

Added: incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/FrameworkExecutor.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/FrameworkExecutor.java?rev=1332469&view=auto
==============================================================================
--- incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/FrameworkExecutor.java (added)
+++ incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/FrameworkExecutor.java Mon Apr 30 23:28:43 2012
@@ -0,0 +1,200 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.TaskStatus.State;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.hsqldb.lib.Iterator;
+
+import org.apache.mesos.Executor;
+import org.apache.mesos.ExecutorDriver;
+import org.apache.mesos.MesosExecutorDriver;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.SlaveInfo;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskState;
+
+public class FrameworkExecutor implements Executor {
+  public static final Log LOG =
+    LogFactory.getLog(FrameworkExecutor.class);
+
+  private static FrameworkExecutor instance;
+
+  private ExecutorDriver driver;
+  private SlaveID slaveId;
+  private JobConf conf;
+  private TaskTracker taskTracker;
+
+  private Set<TaskID> activeMesosTasks = new HashSet<TaskID>();
+
+  @Override
+  public void registered(ExecutorDriver d,
+                         ExecutorInfo executorInfo,
+                         FrameworkInfo frameworkInfo,
+                         SlaveInfo slaveInfo) {
+    try {
+      Thread.currentThread().setContextClassLoader(
+        TaskTracker.class.getClassLoader());
+
+      this.driver = d;
+      this.slaveId = slaveId;
+
+      // TODO: initialize all of JobConf from ExecutorArgs (using JT's conf)?
+      conf = new JobConf();
+      String jobTracker = executorInfo.getData().toStringUtf8();
+      LOG.info("Setting JobTracker: " + jobTracker);
+      conf.set("mapred.job.tracker", jobTracker);
+
+      // Attach our TaskTrackerInstrumentation to figure out when tasks end
+      Class<?>[] instClasses = TaskTracker.getInstrumentationClasses(conf);
+      String newInstClassList = "";
+      for (Class<?> cls: instClasses) {
+        newInstClassList += cls.getName() + ",";
+      }
+      newInstClassList += MesosTaskTrackerInstrumentation.class.getName();
+      conf.set("mapred.tasktracker.instrumentation", newInstClassList);
+
+      // Get hostname from Mesos to make sure we match what it reports to the JT
+      conf.set("slave.host.name", slaveInfo.getHostname());
+
+      taskTracker = new TaskTracker(conf);
+      new Thread("TaskTracker run thread") {
+        @Override
+        public void run() {
+          taskTracker.run();
+        }
+      }.start();
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Failed to initialize FrameworkExecutor", e);
+    }
+  }
+
+  @Override
+  public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) {}
+
+  @Override
+  public void disconnected(ExecutorDriver driver) {}
+
+  @Override
+  public void launchTask(ExecutorDriver d, TaskInfo task) {
+    LOG.info("Asked to launch Mesos task " + task.getTaskId().getValue());
+    activeMesosTasks.add(task.getTaskId());
+  }
+
+  @Override
+  public void killTask(ExecutorDriver d, TaskID taskId) {
+    LOG.info("Asked to kill Mesos task " + taskId);
+    // TODO: Tell the JobTracker about this using an E2S_KILL_REQUEST message!
+  }
+
+  @Override
+  public void frameworkMessage(ExecutorDriver d, byte[] msg) {
+    try {
+      HadoopFrameworkMessage hfm = new HadoopFrameworkMessage(msg);
+      switch (hfm.type) {
+        case S2E_SEND_STATUS_UPDATE: {
+          TaskState s = TaskState.valueOf(hfm.arg1);
+          LOG.info("Sending status update: " + hfm.arg2 + " is " + s);
+          d.sendStatusUpdate(org.apache.mesos.Protos.TaskStatus.newBuilder()
+                             .setTaskId(TaskID.newBuilder()
+                                        .setValue(hfm.arg2).build())
+                             .setState(s).build());
+          break;
+        }
+        case S2E_SHUTDOWN_EXECUTOR: {
+          taskTracker.close();
+          System.exit(0);
+        }
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Failed to deserialize HadoopFrameworkMessage", e);
+    }
+  }
+
+  public void statusUpdate(Task task, TaskStatus status) {
+    // There are some tasks that get launched implicitly (e.g., the
+    // setup/cleanup tasks) that don't go through the
+    // MesosScheduler/FrameworkScheduler.assignTasks and thus don't
+    // get extraData set properly! This means WE DO NOT EVER SEND
+    // STATUS UPDATES FOR THESE TASKS. For this reason we also don't
+    // need to specify any resources for the executor because Mesos
+    // always assumes these tasks are running.
+    if (task.extraData.equals("")) {
+      LOG.info("Ignoring status update for task " + task);
+      return;
+    }
+
+    // Create a Mesos TaskID from extraData.
+    TaskID taskId = TaskID.newBuilder()
+      .setValue(task.extraData)
+      .build();
+
+    // It appears as though we can get multiple duplicate status
+    // updates for the same task, so check if we still have an active
+    // task so that we only send the status update once.
+    if (!activeMesosTasks.contains(taskId)) {
+      LOG.info("Ignoring (duplicate) status update for task " + task);
+      return;
+    }
+
+    // Check whether the task has finished (either successfully or
+    // not), and report to Mesos only if it has.
+    State state = status.getRunState();
+    TaskState mesosState = null;
+    if (state == State.SUCCEEDED || state == State.COMMIT_PENDING)
+      mesosState = TaskState.TASK_FINISHED;
+    else if (state == State.FAILED || state == State.FAILED_UNCLEAN)
+      mesosState = TaskState.TASK_FAILED;
+    else if (state == State.KILLED || state == State.KILLED_UNCLEAN)
+      mesosState = TaskState.TASK_KILLED;
+
+    if (mesosState == null) {
+      LOG.info("Not sending status update for task " + task +
+               " in state " + state);
+      return;
+    }
+
+    LOG.info("Attempting to send status update for " + task +
+             " in state " + status.getRunState());
+
+    driver.sendStatusUpdate(
+        org.apache.mesos.Protos.TaskStatus.newBuilder()
+        .setTaskId(TaskID.newBuilder().setValue(task.extraData).build())
+        .setState(mesosState)
+        .build());
+
+    activeMesosTasks.remove(taskId);
+  }
+
+  @Override
+  public void error(ExecutorDriver d, String message) {
+    LOG.error("FrameworkExecutor.error: " + message);
+  }
+
+  @Override
+  public void shutdown(ExecutorDriver d) {}
+
+  public static void main(String[] args) {
+    instance = new FrameworkExecutor();
+    MesosExecutorDriver driver = new MesosExecutorDriver(instance);
+    System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1);
+  }
+
+  static FrameworkExecutor getInstance() {
+    return instance;
+  }
+}