You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/08/03 00:21:18 UTC

[1/5] Refactored Hadoop on Mesos from contrib to JAR.

Updated Branches:
  refs/heads/master f6d95e89e -> c621ae052


http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
deleted file mode 100644
index 027389d..0000000
--- a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
+++ /dev/null
@@ -1,848 +0,0 @@
-package org.apache.hadoop.mapred;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.Iterator;
-
-import org.apache.commons.httpclient.HttpHost;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.mesos.MesosSchedulerDriver;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.CommandInfo;
-import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.FrameworkInfo;
-import org.apache.mesos.Protos.MasterInfo;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.TaskState;
-import org.apache.mesos.Protos.Value;
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-
-import static org.apache.hadoop.util.StringUtils.join;
-
-public class MesosScheduler extends TaskScheduler implements Scheduler {
-  public static final Log LOG = LogFactory.getLog(MesosScheduler.class);
-
-  private SchedulerDriver driver;
-  private TaskScheduler taskScheduler;
-  private JobTracker jobTracker;
-  private Configuration conf;
-
-  // This is the memory overhead for a jvm process. This needs to be added
-  // to a jvm process's resource requirement, in addition to its heap size.
-  private static final double JVM_MEM_OVERHEAD_PERCENT_DEFAULT = 0.1; // 10%.
-
-  // TODO(vinod): Consider parsing the slot memory from the configuration jvm
-  // heap options (e.g: mapred.child.java.opts).
-
-  // NOTE: It appears that there's no real resource requirements for a
-  // map / reduce slot. We therefore define a default slot as:
-  // 0.2 cores.
-  // 512 MB memory.
-  // 1 GB of disk space.
-  private static final double SLOT_CPUS_DEFAULT = 0.2; // 0.2 cores.
-  private static final int SLOT_DISK_DEFAULT = 1024; // 1 GB.
-  private static final int SLOT_JVM_HEAP_DEFAULT = 256; // 256MB.
-
-  private static final double TASKTRACKER_CPUS = 1.0; // 1 core.
-  private static final int TASKTRACKER_MEM_DEFAULT = 1024; // 1 GB.
-
-  // The default behavior in Hadoop is to use 4 slots per TaskTracker:
-  private static final int MAP_SLOTS_DEFAULT = 2;
-  private static final int REDUCE_SLOTS_DEFAULT = 2;
-
-  // The amount of time to wait for task trackers to launch before
-  // giving up.
-  private static final long LAUNCH_TIMEOUT_MS = 300000; // 5 minutes
-
-  // Count of the launched trackers for TaskID generation.
-  private long launchedTrackers = 0;
-
-  // Maintains a mapping from {tracker host:port -> MesosTracker}.
-  // Used for tracking the slots of each TaskTracker and the corresponding
-  // Mesos TaskID.
-  private Map<HttpHost, MesosTracker> mesosTrackers =
-    new HashMap<HttpHost, MesosTracker>();
-
-  private JobInProgressListener jobListener = new JobInProgressListener() {
-    @Override
-    public void jobAdded(JobInProgress job) throws IOException {
-      LOG.info("Added job " + job.getJobID());
-    }
-
-    @Override
-    public void jobRemoved(JobInProgress job) {
-      LOG.info("Removed job " + job.getJobID());
-    }
-
-    @Override
-    public void jobUpdated(JobChangeEvent event) {
-      synchronized (MesosScheduler.this) {
-        JobInProgress job = event.getJobInProgress();
-
-        // If the job is complete, kill all the corresponding idle TaskTrackers.
-        if (!job.isComplete()) {
-          return;
-        }
-
-        LOG.info("Completed job : " + job.getJobID());
-
-        List<TaskInProgress> completed = new ArrayList<TaskInProgress>();
-
-        // Map tasks.
-        completed.addAll(job.reportTasksInProgress(true, true));
-
-        // Reduce tasks.
-        completed.addAll(job.reportTasksInProgress(false, true));
-
-        for (TaskInProgress task : completed) {
-          // Check that this task actually belongs to this job
-          if (task.getJob().getJobID() != job.getJobID()) {
-            continue;
-          }
-
-          for (TaskStatus status : task.getTaskStatuses()) {
-            // Make a copy to iterate over keys and delete values.
-            Set<HttpHost> trackers = new HashSet<HttpHost>(
-                mesosTrackers.keySet());
-
-            // Remove the task from the map.
-            for (HttpHost tracker : trackers) {
-              MesosTracker mesosTracker = mesosTrackers.get(tracker);
-
-              if (!mesosTracker.active) {
-                LOG.warn("Ignoring TaskTracker: " + tracker
-                    + " because it might not have sent a hearbeat");
-                continue;
-              }
-
-              LOG.info("Removing completed task : " + status.getTaskID()
-                  + " of tracker " + status.getTaskTracker());
-
-              mesosTracker.hadoopJobs.remove(job.getJobID());
-
-              // If the TaskTracker doesn't have any running tasks, kill it.
-              if (mesosTracker.hadoopJobs.isEmpty()) {
-                LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host "
-                    + mesosTracker.host);
-
-                driver.killTask(mesosTracker.taskId);
-		tracker.timer.cancel();
-                mesosTrackers.remove(tracker);
-              }
-            }
-          }
-        }
-      }
-    }
-  };
-
-  public MesosScheduler() {}
-
-  // TaskScheduler methods.
-  @Override
-  public synchronized void start() throws IOException {
-    conf = getConf();
-    String taskTrackerClass = conf.get("mapred.mesos.taskScheduler",
-        "org.apache.hadoop.mapred.JobQueueTaskScheduler");
-
-    try {
-      taskScheduler =
-        (TaskScheduler) Class.forName(taskTrackerClass).newInstance();
-      taskScheduler.setConf(conf);
-      taskScheduler.setTaskTrackerManager(taskTrackerManager);
-    } catch (ClassNotFoundException e) {
-      LOG.fatal("Failed to initialize the TaskScheduler", e);
-      System.exit(1);
-    } catch (InstantiationException e) {
-      LOG.fatal("Failed to initialize the TaskScheduler", e);
-      System.exit(1);
-    } catch (IllegalAccessException e) {
-      LOG.fatal("Failed to initialize the TaskScheduler", e);
-      System.exit(1);
-    }
-
-    // Add the job listener to get job related updates.
-    taskTrackerManager.addJobInProgressListener(jobListener);
-
-    LOG.info("Starting MesosScheduler");
-    jobTracker = (JobTracker) super.taskTrackerManager;
-
-    String master = conf.get("mapred.mesos.master", "local");
-
-    try {
-      FrameworkInfo frameworkInfo = FrameworkInfo
-        .newBuilder()
-        .setUser("")
-        .setCheckpoint(conf.getBoolean("mapred.mesos.checkpoint", false))
-        .setName("Hadoop: (RPC port: " + jobTracker.port + ","
-            + " WebUI port: " + jobTracker.infoPort + ")").build();
-
-      driver = new MesosSchedulerDriver(this, frameworkInfo, master);
-      driver.start();
-    } catch (Exception e) {
-      // If the MesosScheduler can't be loaded, the JobTracker won't be useful
-      // at all, so crash it now so that the user notices.
-      LOG.fatal("Failed to start MesosScheduler", e);
-      System.exit(1);
-    }
-
-    taskScheduler.start();
-  }
-
-  @Override
-  public synchronized void terminate() throws IOException {
-    try {
-      LOG.info("Stopping MesosScheduler");
-      driver.stop();
-    } catch (Exception e) {
-      LOG.error("Failed to stop Mesos scheduler", e);
-    }
-
-    taskScheduler.terminate();
-  }
-
-  @Override
-  public synchronized List<Task> assignTasks(TaskTracker taskTracker)
-    throws IOException {
-    HttpHost tracker = new HttpHost(taskTracker.getStatus().getHost(),
-        taskTracker.getStatus().getHttpPort());
-
-    if (!mesosTrackers.containsKey(tracker)) {
-      // TODO(bmahler): Consider allowing non-Mesos TaskTrackers.
-      LOG.info("Unknown/exited TaskTracker: " + tracker + ". ");
-      return null;
-    }
-
-    // Let the underlying task scheduler do the actual task scheduling.
-    List<Task> tasks = taskScheduler.assignTasks(taskTracker);
-
-    // The Hadoop Fair Scheduler is known to return null.
-    if (tasks != null) {
-      // Keep track of which TaskTracker contains which tasks.
-      for (Task task : tasks) {
-        mesosTrackers.get(tracker).hadoopJobs.add(task.getJobID());
-      }
-    }
-
-    return tasks;
-  }
-
-  @Override
-  public synchronized Collection<JobInProgress> getJobs(String queueName) {
-    return taskScheduler.getJobs(queueName);
-  }
-
-  @Override
-  public synchronized void refresh() throws IOException {
-    taskScheduler.refresh();
-  }
-
-  // Mesos Scheduler methods.
-  // These are synchronized, where possible. Some of these methods need to access the
-  // JobTracker, which can lead to a deadlock:
-  // See: https://issues.apache.org/jira/browse/MESOS-429
-  // The workaround employed here is to unsynchronize those methods needing access to
-  // the JobTracker state and use explicit synchronization instead as appropriate.
-  // TODO(bmahler): Provide a cleaner solution to this issue. One solution is to
-  // split up the Scheduler and TaskScheduler implementations in order to break the
-  // locking cycle. This would require a synchronized class to store the shared
-  // state across our Scheduler and TaskScheduler implementations, and provide
-  // atomic operations as needed.
-  @Override
-  public synchronized void registered(SchedulerDriver schedulerDriver,
-      FrameworkID frameworkID, MasterInfo masterInfo) {
-    LOG.info("Registered as " + frameworkID.getValue()
-        + " with master " + masterInfo);
-  }
-
-  @Override
-  public synchronized void reregistered(SchedulerDriver schedulerDriver,
-      MasterInfo masterInfo) {
-    LOG.info("Re-registered with master " + masterInfo);
-  }
-
-  public synchronized void killTracker(MesosTracker tracker) {
-    driver.killTask(tracker.taskId);
-    mesosTrackers.remove(tracker.host);
-  }
-
-  // For some reason, pendingMaps() and pendingReduces() doesn't return the
-  // values we expect. We observed negative values, which may be related to
-  // https://issues.apache.org/jira/browse/MAPREDUCE-1238. Below is the
-  // algorithm that is used to calculate the pending tasks within the Hadoop
-  // JobTracker sources (see 'printTaskSummary' in
-  // src/org/apache/hadoop/mapred/jobdetails_jsp.java).
-  private int getPendingTasks(TaskInProgress[] tasks) {
-    int totalTasks = tasks.length;
-    int runningTasks = 0;
-    int finishedTasks = 0;
-    int killedTasks = 0;
-    for (int i = 0; i < totalTasks; ++i) {
-      TaskInProgress task = tasks[i];
-      if (task == null) {
-        continue;
-      }
-      if (task.isComplete()) {
-        finishedTasks += 1;
-      } else if (task.isRunning()) {
-        runningTasks += 1;
-      } else if (task.wasKilled()) {
-        killedTasks += 1;
-      }
-    }
-    int pendingTasks = totalTasks - runningTasks - killedTasks - finishedTasks;
-    return pendingTasks;
-  }
-
-  // This method uses explicit synchronization in order to avoid deadlocks when
-  // accessing the JobTracker.
-  @Override
-  public void resourceOffers(SchedulerDriver schedulerDriver,
-      List<Offer> offers) {
-    // Before synchronizing, we pull all needed information from the JobTracker.
-    final HttpHost jobTrackerAddress =
-      new HttpHost(jobTracker.getHostname(), jobTracker.getTrackerPort());
-
-    final Collection<TaskTrackerStatus> taskTrackers = jobTracker.taskTrackers();
-
-    final List<JobInProgress> jobsInProgress = new ArrayList<JobInProgress>();
-    for (JobStatus status : jobTracker.jobsToComplete()) {
-      jobsInProgress.add(jobTracker.getJob(status.getJobID()));
-    }
-
-    synchronized (this) {
-      // Compute the number of pending maps and reduces.
-      int pendingMaps = 0;
-      int pendingReduces = 0;
-      for (JobInProgress progress : jobsInProgress) {
-        // JobStatus.pendingMaps/Reduces may return the wrong value on
-        // occasion.  This seems to be safer.
-        pendingMaps += getPendingTasks(progress.getTasks(TaskType.MAP));
-        pendingReduces += getPendingTasks(progress.getTasks(TaskType.REDUCE));
-      }
-
-      // Mark active (heartbeated) TaskTrackers and compute idle slots.
-      int idleMapSlots = 0;
-      int idleReduceSlots = 0;
-      int unhealthyTrackers = 0;
-
-      for (TaskTrackerStatus status : taskTrackers) {
-        if (!status.getHealthStatus().isNodeHealthy()) {
-          // Skip this node if it's unhealthy.
-          ++unhealthyTrackers;
-          continue;
-        }
-
-        HttpHost host = new HttpHost(status.getHost(), status.getHttpPort());
-        if (mesosTrackers.containsKey(host)) {
-          mesosTrackers.get(host).active = true;
-          mesosTrackers.get(host).timer.cancel();
-          idleMapSlots += status.getAvailableMapSlots();
-          idleReduceSlots += status.getAvailableReduceSlots();
-        }
-      }
-
-      // Consider the TaskTrackers that have yet to become active as being idle,
-      // otherwise we will launch excessive TaskTrackers.
-      int inactiveMapSlots = 0;
-      int inactiveReduceSlots = 0;
-      for (MesosTracker tracker : mesosTrackers.values()) {
-        if (!tracker.active) {
-          inactiveMapSlots += tracker.mapSlots;
-          inactiveReduceSlots += tracker.reduceSlots;
-        }
-      }
-
-      // To ensure Hadoop jobs begin promptly, we can specify a minimum number
-      // of 'hot slots' to be available for use.  This addresses the
-      // TaskTracker spin up delay that exists with Hadoop on Mesos.  This can
-      // be a nuisance with lower latency applications, such as ad-hoc Hive
-      // queries.
-      int minimumMapSlots = conf.getInt("mapred.mesos.total.map.slots.minimum", 0);
-      int minimumReduceSlots =
-        conf.getInt("mapred.mesos.total.reduce.slots.minimum", 0);
-
-      // Compute how many slots we need to allocate.
-      int neededMapSlots = Math.max(
-          minimumMapSlots - (idleMapSlots + inactiveMapSlots),
-          pendingMaps - (idleMapSlots + inactiveMapSlots));
-      int neededReduceSlots = Math.max(
-          minimumReduceSlots  - (idleReduceSlots + inactiveReduceSlots),
-          pendingReduces - (idleReduceSlots + inactiveReduceSlots));
-
-      LOG.info(join("\n", Arrays.asList(
-              "JobTracker Status",
-              "      Pending Map Tasks: " + pendingMaps,
-              "   Pending Reduce Tasks: " + pendingReduces,
-              "         Idle Map Slots: " + idleMapSlots,
-              "      Idle Reduce Slots: " + idleReduceSlots,
-              "     Inactive Map Slots: " + inactiveMapSlots
-              + " (launched but no hearbeat yet)",
-              "  Inactive Reduce Slots: " + inactiveReduceSlots
-              + " (launched but no hearbeat yet)",
-              "       Needed Map Slots: " + neededMapSlots,
-              "    Needed Reduce Slots: " + neededReduceSlots,
-              "     Unhealthy Trackers: " + unhealthyTrackers)));
-
-      // Launch TaskTrackers to satisfy the slot requirements.
-      // TODO(bmahler): Consider slotting intelligently.
-      // Ex: If more map slots are needed, but no reduce slots are needed,
-      // launch a map-only TaskTracker to better satisfy the slot needs.
-      for (Offer offer : offers) {
-        if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
-          driver.declineOffer(offer.getId());
-          continue;
-        }
-
-        // Ensure these values aren't < 0.
-        neededMapSlots = Math.max(0, neededMapSlots);
-        neededReduceSlots = Math.max(0, neededReduceSlots);
-
-        double cpus = -1.0;
-        double mem = -1.0;
-        double disk = -1.0;
-        Set<Integer> ports = new HashSet<Integer>();
-
-        // Pull out the cpus, memory, disk, and 2 ports from the offer.
-        for (Resource resource : offer.getResourcesList()) {
-          if (resource.getName().equals("cpus")
-              && resource.getType() == Value.Type.SCALAR) {
-            cpus = resource.getScalar().getValue();
-          } else if (resource.getName().equals("mem")
-              && resource.getType() == Value.Type.SCALAR) {
-            mem = resource.getScalar().getValue();
-          } else if (resource.getName().equals("disk")
-              && resource.getType() == Value.Type.SCALAR) {
-            disk = resource.getScalar().getValue();
-          } else if (resource.getName().equals("ports")
-              && resource.getType() == Value.Type.RANGES) {
-            for (Value.Range range : resource.getRanges().getRangeList()) {
-              Integer begin = (int)range.getBegin();
-              Integer end = (int)range.getEnd();
-              if (end < begin) {
-                LOG.warn("Ignoring invalid port range: begin=" + begin + " end=" + end);
-                continue;
-              }
-              while (begin <= end && ports.size() < 2) {
-                ports.add(begin);
-                begin += 1;
-              }
-            }
-          }
-        }
-
-        int mapSlotsMax = conf.getInt("mapred.tasktracker.map.tasks.maximum",
-            MAP_SLOTS_DEFAULT);
-        int reduceSlotsMax =
-          conf.getInt("mapred.tasktracker.reduce.tasks.maximum",
-              REDUCE_SLOTS_DEFAULT);
-
-        // What's the minimum number of map and reduce slots we should try to
-        // launch?
-        long mapSlots = 0;
-        long reduceSlots = 0;
-
-        double slotCpus = conf.getFloat("mapred.mesos.slot.cpus",
-            (float) SLOT_CPUS_DEFAULT);
-        double slotDisk = conf.getInt("mapred.mesos.slot.disk",
-            SLOT_DISK_DEFAULT);
-
-        int slotMem = conf.getInt("mapred.mesos.slot.mem",
-            SLOT_JVM_HEAP_DEFAULT);
-        long slotJVMHeap = Math.round((double)slotMem -
-            (JVM_MEM_OVERHEAD_PERCENT_DEFAULT * slotMem));
-
-        int tasktrackerMem = conf.getInt("mapred.mesos.tasktracker.mem",
-              TASKTRACKER_MEM_DEFAULT);
-        long tasktrackerJVMHeap = Math.round((double)tasktrackerMem -
-            (JVM_MEM_OVERHEAD_PERCENT_DEFAULT * tasktrackerMem));
-
-        // Minimum resource requirements for the container (TaskTracker + map/red
-        // tasks).
-        double containerCpus = TASKTRACKER_CPUS;
-        double containerMem = tasktrackerMem;
-        double containerDisk = 0;
-
-        // Determine how many slots we can allocate.
-        int slots = mapSlotsMax + reduceSlotsMax;
-        slots = (int)Math.min(slots, (cpus - containerCpus) / slotCpus);
-        slots = (int)Math.min(slots, (mem - containerMem) / slotMem);
-        slots = (int)Math.min(slots, (disk - containerDisk) / slotDisk);
-
-        // Is this offer too small for even the minimum slots?
-        if (slots < 1 || ports.size() < 2) {
-          LOG.info(join("\n", Arrays.asList(
-                  "Declining offer with insufficient resources for a TaskTracker: ",
-                  "  cpus: offered " + cpus + " needed " + containerCpus,
-                  "  mem : offered " + mem + " needed " + containerMem,
-                  "  disk: offered " + disk + " needed " + containerDisk,
-                  "  ports: " + (ports.size() < 2
-                    ? " less than 2 offered"
-                    : " at least 2 (sufficient)"))));
-
-          driver.declineOffer(offer.getId());
-          continue;
-        }
-
-        // Is the number of slots we need sufficiently small? If so, we can
-        // allocate exactly the number we need.
-        if (slots >= neededMapSlots + neededReduceSlots && neededMapSlots <
-            mapSlotsMax && neededReduceSlots < reduceSlotsMax) {
-          mapSlots = neededMapSlots;
-          reduceSlots = neededReduceSlots;
-        } else {
-          // Allocate slots fairly for this resource offer.
-          double mapFactor = (double)neededMapSlots / (neededMapSlots + neededReduceSlots);
-          double reduceFactor = (double)neededReduceSlots / (neededMapSlots + neededReduceSlots);
-          // To avoid map/reduce slot starvation, don't allow more than 50%
-          // spread between map/reduce slots when we need both mappers and
-          // reducers.
-          if (neededMapSlots > 0 && neededReduceSlots > 0) {
-            if (mapFactor < 0.25) {
-              mapFactor = 0.25;
-            } else if (mapFactor > 0.75) {
-              mapFactor = 0.75;
-            }
-            if (reduceFactor < 0.25) {
-              reduceFactor = 0.25;
-            } else if (reduceFactor > 0.75) {
-              reduceFactor = 0.75;
-            }
-          }
-          mapSlots = Math.min(Math.min((long)(mapFactor * slots), mapSlotsMax), neededMapSlots);
-          // The remaining slots are allocated for reduces.
-          slots -= mapSlots;
-          reduceSlots = Math.min(Math.min(slots, reduceSlotsMax), neededReduceSlots);
-        }
-
-        Iterator<Integer> portIter = ports.iterator();
-        HttpHost httpAddress = new HttpHost(offer.getHostname(), portIter.next());
-        HttpHost reportAddress = new HttpHost(offer.getHostname(), portIter.next());
-
-          // Check that this tracker is not already launched.  This problem was
-          // observed on a few occasions, but not reliably.  The main symptom was
-          // that entries in `mesosTrackers` were being lost, and task trackers
-          // would be 'lost' mysteriously (probably because the ports were in
-          // use).  This problem has since gone away with a rewrite of the port
-          // selection code, but the check + logging is left here.
-          // TODO(brenden): Diagnose this to determine root cause.
-
-        if (mesosTrackers.containsKey(httpAddress)) {
-          LOG.info(join("\n", Arrays.asList(
-                  "Declining offer because host/port combination is in use: ",
-                  "  cpus: offered " + cpus + " needed " + containerCpus,
-                  "  mem : offered " + mem + " needed " + containerMem,
-                  "  disk: offered " + disk + " needed " + containerDisk,
-                  "  ports: " + ports)));
-
-          driver.declineOffer(offer.getId());
-          continue;
-        }
-
-        TaskID taskId = TaskID.newBuilder()
-          .setValue("Task_Tracker_" + launchedTrackers++).build();
-
-        LOG.info("Launching task " + taskId.getValue() + " on "
-            + httpAddress.toString() + " with mapSlots=" + mapSlots + " reduceSlots=" + reduceSlots);
-
-        // Add this tracker to Mesos tasks.
-        mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId,
-              mapSlots, reduceSlots, this));
-
-        // Create the environment depending on whether the executor is going to be
-        // run locally.
-        // TODO(vinod): Do not pass the mapred config options as environment
-        // variables.
-        Protos.Environment.Builder envBuilder = Protos.Environment
-          .newBuilder()
-          .addVariables(
-              Protos.Environment.Variable
-              .newBuilder()
-              .setName("mapred.job.tracker")
-              .setValue(jobTrackerAddress.getHostName() + ':'
-                + jobTrackerAddress.getPort()))
-          .addVariables(
-              Protos.Environment.Variable
-              .newBuilder()
-              .setName("mapred.task.tracker.http.address")
-              .setValue(
-                httpAddress.getHostName() + ':' + httpAddress.getPort()))
-          .addVariables(
-              Protos.Environment.Variable
-              .newBuilder()
-              .setName("mapred.task.tracker.report.address")
-              .setValue(reportAddress.getHostName() + ':'
-                + reportAddress.getPort()))
-          .addVariables(
-              Protos.Environment.Variable.newBuilder()
-              .setName("mapred.map.child.java.opts")
-              .setValue("-Xmx" + slotJVMHeap + "m"))
-          .addVariables(
-              Protos.Environment.Variable.newBuilder()
-              .setName("mapred.reduce.child.java.opts")
-              .setValue("-Xmx" + slotJVMHeap + "m"))
-          .addVariables(
-              Protos.Environment.Variable.newBuilder()
-              .setName("HADOOP_HEAPSIZE")
-              .setValue("" + tasktrackerJVMHeap))
-          .addVariables(
-              Protos.Environment.Variable.newBuilder()
-              .setName("mapred.tasktracker.map.tasks.maximum")
-              .setValue("" + mapSlots))
-          .addVariables(
-              Protos.Environment.Variable.newBuilder()
-              .setName("mapred.tasktracker.reduce.tasks.maximum")
-              .setValue("" + reduceSlots));
-
-        // Set java specific environment, appropriately.
-        Map<String, String> env = System.getenv();
-        if (env.containsKey("JAVA_HOME")) {
-          envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
-              .setName("JAVA_HOME")
-              .setValue(env.get("JAVA_HOME")));
-        }
-
-        if (env.containsKey("JAVA_LIBRARY_PATH")) {
-          envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
-              .setName("JAVA_LIBRARY_PATH")
-              .setValue(env.get("JAVA_LIBRARY_PATH")));
-        }
-
-        // Command info differs when performing a local run.
-        CommandInfo commandInfo = null;
-        String master = conf.get("mapred.mesos.master", "local");
-
-        if (master.equals("local")) {
-          try {
-            commandInfo = CommandInfo.newBuilder()
-              .setEnvironment(envBuilder)
-              .setValue(new File("bin/mesos-executor").getCanonicalPath())
-              .build();
-          } catch (IOException e) {
-            LOG.fatal("Failed to find Mesos executor ", e);
-            System.exit(1);
-          }
-        } else {
-          String uri = conf.get("mapred.mesos.executor");
-          commandInfo = CommandInfo.newBuilder()
-            .setEnvironment(envBuilder)
-            .setValue("cd hadoop-* && ./bin/mesos-executor")
-            .addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
-        }
-
-        TaskInfo info = TaskInfo
-          .newBuilder()
-          .setName(taskId.getValue())
-          .setTaskId(taskId)
-          .setSlaveId(offer.getSlaveId())
-          .addResources(
-              Resource
-              .newBuilder()
-              .setName("cpus")
-              .setType(Value.Type.SCALAR)
-              .setScalar(Value.Scalar.newBuilder().setValue(
-                  (mapSlots + reduceSlots) * slotCpus)))
-          .addResources(
-              Resource
-              .newBuilder()
-              .setName("mem")
-              .setType(Value.Type.SCALAR)
-              .setScalar(Value.Scalar.newBuilder().setValue(
-                  (mapSlots + reduceSlots) * slotMem)))
-          .addResources(
-              Resource
-              .newBuilder()
-              .setName("disk")
-              .setType(Value.Type.SCALAR)
-              .setScalar(Value.Scalar.newBuilder().setValue(
-                  (mapSlots + reduceSlots) * slotDisk)))
-          .addResources(
-              Resource
-              .newBuilder()
-              .setName("ports")
-              .setType(Value.Type.RANGES)
-              .setRanges(
-                Value.Ranges
-                .newBuilder()
-                .addRange(Value.Range.newBuilder()
-                  .setBegin(httpAddress.getPort())
-                  .setEnd(httpAddress.getPort()))
-                .addRange(Value.Range.newBuilder()
-                  .setBegin(reportAddress.getPort())
-                  .setEnd(reportAddress.getPort()))))
-          .setExecutor(
-              ExecutorInfo
-              .newBuilder()
-              .setExecutorId(ExecutorID.newBuilder().setValue(
-                  "executor_" + taskId.getValue()))
-              .setName("Hadoop TaskTracker")
-              .setSource(taskId.getValue())
-              .addResources(
-                Resource
-                .newBuilder()
-                .setName("cpus")
-                .setType(Value.Type.SCALAR)
-                .setScalar(Value.Scalar.newBuilder().setValue(
-                    (TASKTRACKER_CPUS))))
-              .addResources(
-                Resource
-                .newBuilder()
-                .setName("mem")
-                .setType(Value.Type.SCALAR)
-                .setScalar(Value.Scalar.newBuilder().setValue(
-                    (tasktrackerMem)))).setCommand(commandInfo))
-                    .build();
-
-        driver.launchTasks(offer.getId(), Arrays.asList(info));
-
-        neededMapSlots -= mapSlots;
-        neededReduceSlots -= reduceSlots;
-      }
-
-      if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
-        LOG.info("Satisfied map and reduce slots needed.");
-      } else {
-        LOG.info("Unable to fully satisfy needed map/reduce slots: "
-            + (neededMapSlots > 0 ? neededMapSlots + " map slots " : "")
-            + (neededReduceSlots > 0 ? neededReduceSlots + " reduce slots " : "")
-            + "remaining");
-      }
-    }
-  }
-
-  @Override
-  public synchronized void offerRescinded(SchedulerDriver schedulerDriver,
-      OfferID offerID) {
-    LOG.warn("Rescinded offer: " + offerID.getValue());
-  }
-
-  @Override
-  public synchronized void statusUpdate(SchedulerDriver schedulerDriver,
-      Protos.TaskStatus taskStatus) {
-    LOG.info("Status update of " + taskStatus.getTaskId().getValue()
-        + " to " + taskStatus.getState().name()
-        + " with message " + taskStatus.getMessage());
-
-    // Remove the TaskTracker if the corresponding Mesos task has reached a
-    // terminal state.
-    switch (taskStatus.getState()) {
-      case TASK_FINISHED:
-      case TASK_FAILED:
-      case TASK_KILLED:
-      case TASK_LOST:
-        // Make a copy to iterate over keys and delete values.
-        Set<HttpHost> trackers = new HashSet<HttpHost>(mesosTrackers.keySet());
-
-        // Remove the task from the map.
-        for (HttpHost tracker : trackers) {
-          if (mesosTrackers.get(tracker).taskId.equals(taskStatus.getTaskId())) {
-            LOG.info("Removing terminated TaskTracker: " + tracker);
-	    tracker.timer.cancel();
-            mesosTrackers.remove(tracker);
-          }
-        }
-        break;
-      case TASK_STAGING:
-      case TASK_STARTING:
-      case TASK_RUNNING:
-        break;
-      default:
-        LOG.error("Unexpected TaskStatus: " + taskStatus.getState().name());
-        break;
-    }
-  }
-
-  @Override
-  public synchronized void frameworkMessage(SchedulerDriver schedulerDriver,
-      ExecutorID executorID, SlaveID slaveID, byte[] bytes) {
-    LOG.info("Framework Message of " + bytes.length + " bytes"
-        + " from executor " + executorID.getValue()
-        + " on slave " + slaveID.getValue());
-  }
-
-  @Override
-  public synchronized void disconnected(SchedulerDriver schedulerDriver) {
-    LOG.warn("Disconnected from Mesos master.");
-  }
-
-  @Override
-  public synchronized void slaveLost(SchedulerDriver schedulerDriver,
-      SlaveID slaveID) {
-    LOG.warn("Slave lost: " + slaveID.getValue());
-  }
-
-  @Override
-  public synchronized void executorLost(SchedulerDriver schedulerDriver,
-      ExecutorID executorID, SlaveID slaveID, int status) {
-    LOG.warn("Executor " + executorID.getValue()
-        + " lost with status " + status + " on slave " + slaveID);
-  }
-
-  @Override
-  public synchronized void error(SchedulerDriver schedulerDriver, String s) {
-    LOG.error("Error from scheduler driver: " + s);
-  }
-
-  /**
-   * Used to track the our launched TaskTrackers.
-   */
-  private class MesosTracker {
-    public volatile HttpHost host;
-    public TaskID taskId;
-    public long mapSlots;
-    public long reduceSlots;
-    public volatile boolean active = false; // Set once tracked by the JobTracker.
-    public Timer timer;
-    public volatile MesosScheduler scheduler;
-
-    // Tracks Hadoop job tasks running on the tracker.
-    public Set<JobID> hadoopJobs = new HashSet<JobID>();
-
-    public MesosTracker(HttpHost host, TaskID taskId, long mapSlots,
-        long reduceSlots, MesosScheduler scheduler) {
-      this.host = host;
-      this.taskId = taskId;
-      this.mapSlots = mapSlots;
-      this.reduceSlots = reduceSlots;
-      this.scheduler = scheduler;
-
-      this.timer = new Timer();
-      timer.schedule(new TimerTask() {
-        @Override
-        public void run() {
-          synchronized (MesosTracker.this.scheduler) {
-            // If the tracker activated while we were awaiting to acquire the
-            // lock, return.
-            if (MesosTracker.this.active) return;
-
-            LOG.warn("Tracker " + MesosTracker.this.host + " failed to launch within " +
-              LAUNCH_TIMEOUT_MS / 1000 + " seconds, killing it");
-            MesosTracker.this.scheduler.killTracker(MesosTracker.this);
-          }
-        }
-      }, LAUNCH_TIMEOUT_MS);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop/pom.xml b/hadoop/pom.xml
new file mode 100644
index 0000000..baf3e5b
--- /dev/null
+++ b/hadoop/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.mesos</groupId>
+    <artifactId>hadoop-mesos</artifactId>
+    <version>0.0.1</version>
+
+    <properties>
+        <encoding>UTF-8</encoding>
+
+        <!-- language versions -->
+        <java.abi>1.6</java.abi>
+
+        <!-- runtime deps versions -->
+        <commons-logging.version>1.1.1</commons-logging.version>
+        <hadoop-core.version>1.2.0</hadoop-core.version>
+        <mesos.version>0.14.0</mesos.version>
+        <protobuf.version>2.4.1</protobuf.version>
+
+        <!-- test deps versions -->
+        <junit.version>4.11</junit.version>
+        <mockito.version>1.9.5</mockito.version>
+    </properties>
+
+    <!-- TODO(benh): Remove repository once we've published a JDK6 JAR. -->
+    <repositories>
+        <repository>
+            <id>mesosphere-public-repo</id>
+            <name>Mesosphere Public Repo</name>
+            <url>http://s3.amazonaws.com/mesosphere-maven-public/</url>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <dependency>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+            <version>${commons-logging.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-core</artifactId>
+            <version>${hadoop-core.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.mesos</groupId>
+            <!-- TODO(benh): Use 'mesos' after we've published a JDK6 JAR. -->
+            <artifactId>mesos_jdk6</artifactId>
+            <version>${mesos.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+
+        <!-- Test scope -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>${java.abi}</source>
+                    <target>${java.abi}</target>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.0</version>
+                <configuration>
+                    <filters>
+                        <filter>
+                            <artifact>*:*</artifact>
+                            <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                            </excludes>
+                        </filter>
+                    </filters>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+ 
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/hadoop/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java
new file mode 100644
index 0000000..ccf0090
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java
@@ -0,0 +1,145 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.mesos.Executor;
+import org.apache.mesos.ExecutorDriver;
+import org.apache.mesos.MesosExecutorDriver;
+import org.apache.mesos.Protos.Environment.Variable;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Protos.SlaveInfo;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.TaskStatus;
+
+public class MesosExecutor implements Executor {
+  public static final Log LOG = LogFactory.getLog(MesosExecutor.class);
+
+  private JobConf conf;
+  private TaskTracker taskTracker;
+
+  @Override
+  public void registered(ExecutorDriver driver, ExecutorInfo executorInfo,
+      FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) {
+    LOG.info("Executor registered with the slave");
+
+    conf = new JobConf();
+
+    // Get TaskTracker's config options from environment variables set by the
+    // JobTracker.
+    if (executorInfo.getCommand().hasEnvironment()) {
+      for (Variable variable : executorInfo.getCommand().getEnvironment()
+          .getVariablesList()) {
+        LOG.info("Setting config option : " + variable.getName() + " to "
+            + variable.getValue());
+        conf.set(variable.getName(), variable.getValue());
+      }
+    }
+
+    // Get hostname from Mesos to make sure we match what it reports
+    // to the JobTracker.
+    conf.set("slave.host.name", slaveInfo.getHostname());
+
+    // Set the mapred.local directory inside the executor sandbox, so that
+    // different TaskTrackers on the same host do not step on each other.
+    conf.set("mapred.local.dir", System.getProperty("user.dir") + "/mapred");
+  }
+
+  @Override
+  public void launchTask(final ExecutorDriver driver, final TaskInfo task) {
+    LOG.info("Launching task : " + task.getTaskId().getValue());
+
+    // NOTE: We need to manually set the context class loader here because,
+    // the TaskTracker is unable to find LoginModule class otherwise.
+    Thread.currentThread().setContextClassLoader(
+        TaskTracker.class.getClassLoader());
+
+    try {
+      taskTracker = new TaskTracker(conf);
+    } catch (IOException e) {
+      LOG.fatal("Failed to start TaskTracker", e);
+      System.exit(1);
+    } catch (InterruptedException e) {
+      LOG.fatal("Failed to start TaskTracker", e);
+      System.exit(1);
+    }
+
+    // Spin up a TaskTracker in a new thread.
+    new Thread("TaskTracker Run Thread") {
+      @Override
+      public void run() {
+        taskTracker.run();
+
+        // Send a TASK_FINISHED status update.
+        // We do this here because we want to send it in a separate thread
+        // than was used to call killTask().
+        driver.sendStatusUpdate(TaskStatus.newBuilder()
+            .setTaskId(task.getTaskId())
+            .setState(TaskState.TASK_FINISHED)
+            .build());
+
+        // Give some time for the update to reach the slave.
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          LOG.error("Failed to sleep TaskTracker thread", e);
+        }
+
+        // Stop the executor.
+        driver.stop();
+      }
+    }.start();
+
+    driver.sendStatusUpdate(TaskStatus.newBuilder()
+        .setTaskId(task.getTaskId())
+        .setState(TaskState.TASK_RUNNING).build());
+  }
+
+  @Override
+  public void killTask(ExecutorDriver driver, TaskID taskId) {
+    LOG.info("Killing task : " + taskId.getValue());
+    try {
+      taskTracker.shutdown();
+    } catch (IOException e) {
+      LOG.error("Failed to shutdown TaskTracker", e);
+    } catch (InterruptedException e) {
+      LOG.error("Failed to shutdown TaskTracker", e);
+    }
+  }
+
+  @Override
+  public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) {
+    LOG.info("Executor reregistered with the slave");
+  }
+
+  @Override
+  public void disconnected(ExecutorDriver driver) {
+    LOG.info("Executor disconnected from the slave");
+  }
+
+  @Override
+  public void frameworkMessage(ExecutorDriver d, byte[] msg) {
+    LOG.info("Executor received framework message of length: " + msg.length
+        + " bytes");
+  }
+
+  @Override
+  public void error(ExecutorDriver d, String message) {
+    LOG.error("MesosExecutor.error: " + message);
+  }
+
+  @Override
+  public void shutdown(ExecutorDriver d) {
+    LOG.info("Executor asked to shutdown");
+  }
+
+  public static void main(String[] args) {
+    MesosExecutorDriver driver = new MesosExecutorDriver(new MesosExecutor());
+    System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
new file mode 100644
index 0000000..7a1469d
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
@@ -0,0 +1,845 @@
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.Iterator;
+
+import org.apache.commons.httpclient.HttpHost;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.CommandInfo;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Protos.MasterInfo;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.Value;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import static org.apache.hadoop.util.StringUtils.join;
+
+public class MesosScheduler extends TaskScheduler implements Scheduler {
+  public static final Log LOG = LogFactory.getLog(MesosScheduler.class);
+
+  private SchedulerDriver driver;
+  private TaskScheduler taskScheduler;
+  private JobTracker jobTracker;
+  private Configuration conf;
+
+  // This is the memory overhead for a jvm process. This needs to be added
+  // to a jvm process's resource requirement, in addition to its heap size.
+  private static final double JVM_MEM_OVERHEAD_PERCENT_DEFAULT = 0.1; // 10%.
+
+  // TODO(vinod): Consider parsing the slot memory from the configuration jvm
+  // heap options (e.g: mapred.child.java.opts).
+
+  // NOTE: It appears that there's no real resource requirements for a
+  // map / reduce slot. We therefore define a default slot as:
+  // 0.2 cores.
+  // 512 MB memory.
+  // 1 GB of disk space.
+  private static final double SLOT_CPUS_DEFAULT = 0.2; // 0.2 cores.
+  private static final int SLOT_DISK_DEFAULT = 1024; // 1 GB.
+  private static final int SLOT_JVM_HEAP_DEFAULT = 256; // 256MB.
+
+  private static final double TASKTRACKER_CPUS = 1.0; // 1 core.
+  private static final int TASKTRACKER_MEM_DEFAULT = 1024; // 1 GB.
+
+  // The default behavior in Hadoop is to use 4 slots per TaskTracker:
+  private static final int MAP_SLOTS_DEFAULT = 2;
+  private static final int REDUCE_SLOTS_DEFAULT = 2;
+
+  // The amount of time to wait for task trackers to launch before
+  // giving up.
+  private static final long LAUNCH_TIMEOUT_MS = 300000; // 5 minutes
+
+  // Count of the launched trackers for TaskID generation.
+  private long launchedTrackers = 0;
+
+  // Maintains a mapping from {tracker host:port -> MesosTracker}.
+  // Used for tracking the slots of each TaskTracker and the corresponding
+  // Mesos TaskID.
+  private Map<HttpHost, MesosTracker> mesosTrackers =
+    new HashMap<HttpHost, MesosTracker>();
+
+  private JobInProgressListener jobListener = new JobInProgressListener() {
+    @Override
+    public void jobAdded(JobInProgress job) throws IOException {
+      LOG.info("Added job " + job.getJobID());
+    }
+
+    @Override
+    public void jobRemoved(JobInProgress job) {
+      LOG.info("Removed job " + job.getJobID());
+    }
+
+    @Override
+    public void jobUpdated(JobChangeEvent event) {
+      synchronized (MesosScheduler.this) {
+        JobInProgress job = event.getJobInProgress();
+
+        // If the job is complete, kill all the corresponding idle TaskTrackers.
+        if (!job.isComplete()) {
+          return;
+        }
+
+        LOG.info("Completed job : " + job.getJobID());
+
+        List<TaskInProgress> completed = new ArrayList<TaskInProgress>();
+
+        // Map tasks.
+        completed.addAll(job.reportTasksInProgress(true, true));
+
+        // Reduce tasks.
+        completed.addAll(job.reportTasksInProgress(false, true));
+
+        for (TaskInProgress task : completed) {
+          // Check that this task actually belongs to this job
+          if (task.getJob().getJobID() != job.getJobID()) {
+            continue;
+          }
+
+          for (TaskStatus status : task.getTaskStatuses()) {
+            // Make a copy to iterate over keys and delete values.
+            Set<HttpHost> trackers = new HashSet<HttpHost>(
+                mesosTrackers.keySet());
+
+            // Remove the task from the map.
+            for (HttpHost tracker : trackers) {
+              MesosTracker mesosTracker = mesosTrackers.get(tracker);
+
+              if (!mesosTracker.active) {
+                LOG.warn("Ignoring TaskTracker: " + tracker
+                    + " because it might not have sent a hearbeat");
+                continue;
+              }
+
+              LOG.info("Removing completed task : " + status.getTaskID()
+                  + " of tracker " + status.getTaskTracker());
+
+              mesosTracker.hadoopJobs.remove(job.getJobID());
+
+              // If the TaskTracker doesn't have any running tasks, kill it.
+              if (mesosTracker.hadoopJobs.isEmpty()) {
+                LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host "
+                    + mesosTracker.host);
+
+                driver.killTask(mesosTracker.taskId);
+		tracker.timer.cancel();
+                mesosTrackers.remove(tracker);
+              }
+            }
+          }
+        }
+      }
+    }
+  };
+
+  public MesosScheduler() {}
+
+  // TaskScheduler methods.
+  @Override
+  public synchronized void start() throws IOException {
+    conf = getConf();
+    String taskTrackerClass = conf.get("mapred.mesos.taskScheduler",
+        "org.apache.hadoop.mapred.JobQueueTaskScheduler");
+
+    try {
+      taskScheduler =
+        (TaskScheduler) Class.forName(taskTrackerClass).newInstance();
+      taskScheduler.setConf(conf);
+      taskScheduler.setTaskTrackerManager(taskTrackerManager);
+    } catch (ClassNotFoundException e) {
+      LOG.fatal("Failed to initialize the TaskScheduler", e);
+      System.exit(1);
+    } catch (InstantiationException e) {
+      LOG.fatal("Failed to initialize the TaskScheduler", e);
+      System.exit(1);
+    } catch (IllegalAccessException e) {
+      LOG.fatal("Failed to initialize the TaskScheduler", e);
+      System.exit(1);
+    }
+
+    // Add the job listener to get job related updates.
+    taskTrackerManager.addJobInProgressListener(jobListener);
+
+    LOG.info("Starting MesosScheduler");
+    jobTracker = (JobTracker) super.taskTrackerManager;
+
+    String master = conf.get("mapred.mesos.master", "local");
+
+    try {
+      FrameworkInfo frameworkInfo = FrameworkInfo
+        .newBuilder()
+        .setUser("")
+        .setCheckpoint(conf.getBoolean("mapred.mesos.checkpoint", false))
+        .setName("Hadoop: (RPC port: " + jobTracker.port + ","
+            + " WebUI port: " + jobTracker.infoPort + ")").build();
+
+      driver = new MesosSchedulerDriver(this, frameworkInfo, master);
+      driver.start();
+    } catch (Exception e) {
+      // If the MesosScheduler can't be loaded, the JobTracker won't be useful
+      // at all, so crash it now so that the user notices.
+      LOG.fatal("Failed to start MesosScheduler", e);
+      System.exit(1);
+    }
+
+    taskScheduler.start();
+  }
+
+  @Override
+  public synchronized void terminate() throws IOException {
+    try {
+      LOG.info("Stopping MesosScheduler");
+      driver.stop();
+    } catch (Exception e) {
+      LOG.error("Failed to stop Mesos scheduler", e);
+    }
+
+    taskScheduler.terminate();
+  }
+
+  @Override
+  public synchronized List<Task> assignTasks(TaskTracker taskTracker)
+    throws IOException {
+    HttpHost tracker = new HttpHost(taskTracker.getStatus().getHost(),
+        taskTracker.getStatus().getHttpPort());
+
+    if (!mesosTrackers.containsKey(tracker)) {
+      // TODO(bmahler): Consider allowing non-Mesos TaskTrackers.
+      LOG.info("Unknown/exited TaskTracker: " + tracker + ". ");
+      return null;
+    }
+
+    // Let the underlying task scheduler do the actual task scheduling.
+    List<Task> tasks = taskScheduler.assignTasks(taskTracker);
+
+    // The Hadoop Fair Scheduler is known to return null.
+    if (tasks != null) {
+      // Keep track of which TaskTracker contains which tasks.
+      for (Task task : tasks) {
+        mesosTrackers.get(tracker).hadoopJobs.add(task.getJobID());
+      }
+    }
+
+    return tasks;
+  }
+
+  @Override
+  public synchronized Collection<JobInProgress> getJobs(String queueName) {
+    return taskScheduler.getJobs(queueName);
+  }
+
+  @Override
+  public synchronized void refresh() throws IOException {
+    taskScheduler.refresh();
+  }
+
+  // Mesos Scheduler methods.
+  // These are synchronized, where possible. Some of these methods need to access the
+  // JobTracker, which can lead to a deadlock:
+  // See: https://issues.apache.org/jira/browse/MESOS-429
+  // The workaround employed here is to unsynchronize those methods needing access to
+  // the JobTracker state and use explicit synchronization instead as appropriate.
+  // TODO(bmahler): Provide a cleaner solution to this issue. One solution is to
+  // split up the Scheduler and TaskScheduler implementations in order to break the
+  // locking cycle. This would require a synchronized class to store the shared
+  // state across our Scheduler and TaskScheduler implementations, and provide
+  // atomic operations as needed.
+  @Override
+  public synchronized void registered(SchedulerDriver schedulerDriver,
+      FrameworkID frameworkID, MasterInfo masterInfo) {
+    LOG.info("Registered as " + frameworkID.getValue()
+        + " with master " + masterInfo);
+  }
+
+  @Override
+  public synchronized void reregistered(SchedulerDriver schedulerDriver,
+      MasterInfo masterInfo) {
+    LOG.info("Re-registered with master " + masterInfo);
+  }
+
+  public synchronized void killTracker(MesosTracker tracker) {
+    driver.killTask(tracker.taskId);
+    mesosTrackers.remove(tracker.host);
+  }
+
+  // For some reason, pendingMaps() and pendingReduces() doesn't return the
+  // values we expect. We observed negative values, which may be related to
+  // https://issues.apache.org/jira/browse/MAPREDUCE-1238. Below is the
+  // algorithm that is used to calculate the pending tasks within the Hadoop
+  // JobTracker sources (see 'printTaskSummary' in
+  // src/org/apache/hadoop/mapred/jobdetails_jsp.java).
+  private int getPendingTasks(TaskInProgress[] tasks) {
+    int totalTasks = tasks.length;
+    int runningTasks = 0;
+    int finishedTasks = 0;
+    int killedTasks = 0;
+    for (int i = 0; i < totalTasks; ++i) {
+      TaskInProgress task = tasks[i];
+      if (task == null) {
+        continue;
+      }
+      if (task.isComplete()) {
+        finishedTasks += 1;
+      } else if (task.isRunning()) {
+        runningTasks += 1;
+      } else if (task.wasKilled()) {
+        killedTasks += 1;
+      }
+    }
+    int pendingTasks = totalTasks - runningTasks - killedTasks - finishedTasks;
+    return pendingTasks;
+  }
+
+  // This method uses explicit synchronization in order to avoid deadlocks when
+  // accessing the JobTracker.
+  @Override
+  public void resourceOffers(SchedulerDriver schedulerDriver,
+      List<Offer> offers) {
+    // Before synchronizing, we pull all needed information from the JobTracker.
+    final HttpHost jobTrackerAddress =
+      new HttpHost(jobTracker.getHostname(), jobTracker.getTrackerPort());
+
+    final Collection<TaskTrackerStatus> taskTrackers = jobTracker.taskTrackers();
+
+    final List<JobInProgress> jobsInProgress = new ArrayList<JobInProgress>();
+    for (JobStatus status : jobTracker.jobsToComplete()) {
+      jobsInProgress.add(jobTracker.getJob(status.getJobID()));
+    }
+
+    synchronized (this) {
+      // Compute the number of pending maps and reduces.
+      int pendingMaps = 0;
+      int pendingReduces = 0;
+      for (JobInProgress progress : jobsInProgress) {
+        // JobStatus.pendingMaps/Reduces may return the wrong value on
+        // occasion.  This seems to be safer.
+        pendingMaps += getPendingTasks(progress.getTasks(TaskType.MAP));
+        pendingReduces += getPendingTasks(progress.getTasks(TaskType.REDUCE));
+      }
+
+      // Mark active (heartbeated) TaskTrackers and compute idle slots.
+      int idleMapSlots = 0;
+      int idleReduceSlots = 0;
+      int unhealthyTrackers = 0;
+
+      for (TaskTrackerStatus status : taskTrackers) {
+        if (!status.getHealthStatus().isNodeHealthy()) {
+          // Skip this node if it's unhealthy.
+          ++unhealthyTrackers;
+          continue;
+        }
+
+        HttpHost host = new HttpHost(status.getHost(), status.getHttpPort());
+        if (mesosTrackers.containsKey(host)) {
+          mesosTrackers.get(host).active = true;
+          mesosTrackers.get(host).timer.cancel();
+          idleMapSlots += status.getAvailableMapSlots();
+          idleReduceSlots += status.getAvailableReduceSlots();
+        }
+      }
+
+      // Consider the TaskTrackers that have yet to become active as being idle,
+      // otherwise we will launch excessive TaskTrackers.
+      int inactiveMapSlots = 0;
+      int inactiveReduceSlots = 0;
+      for (MesosTracker tracker : mesosTrackers.values()) {
+        if (!tracker.active) {
+          inactiveMapSlots += tracker.mapSlots;
+          inactiveReduceSlots += tracker.reduceSlots;
+        }
+      }
+
+      // To ensure Hadoop jobs begin promptly, we can specify a minimum number
+      // of 'hot slots' to be available for use.  This addresses the
+      // TaskTracker spin up delay that exists with Hadoop on Mesos.  This can
+      // be a nuisance with lower latency applications, such as ad-hoc Hive
+      // queries.
+      int minimumMapSlots = conf.getInt("mapred.mesos.total.map.slots.minimum", 0);
+      int minimumReduceSlots =
+        conf.getInt("mapred.mesos.total.reduce.slots.minimum", 0);
+
+      // Compute how many slots we need to allocate.
+      int neededMapSlots = Math.max(
+          minimumMapSlots - (idleMapSlots + inactiveMapSlots),
+          pendingMaps - (idleMapSlots + inactiveMapSlots));
+      int neededReduceSlots = Math.max(
+          minimumReduceSlots  - (idleReduceSlots + inactiveReduceSlots),
+          pendingReduces - (idleReduceSlots + inactiveReduceSlots));
+
+      LOG.info(join("\n", Arrays.asList(
+              "JobTracker Status",
+              "      Pending Map Tasks: " + pendingMaps,
+              "   Pending Reduce Tasks: " + pendingReduces,
+              "         Idle Map Slots: " + idleMapSlots,
+              "      Idle Reduce Slots: " + idleReduceSlots,
+              "     Inactive Map Slots: " + inactiveMapSlots
+              + " (launched but no hearbeat yet)",
+              "  Inactive Reduce Slots: " + inactiveReduceSlots
+              + " (launched but no hearbeat yet)",
+              "       Needed Map Slots: " + neededMapSlots,
+              "    Needed Reduce Slots: " + neededReduceSlots,
+              "     Unhealthy Trackers: " + unhealthyTrackers)));
+
+      // Launch TaskTrackers to satisfy the slot requirements.
+      // TODO(bmahler): Consider slotting intelligently.
+      // Ex: If more map slots are needed, but no reduce slots are needed,
+      // launch a map-only TaskTracker to better satisfy the slot needs.
+      for (Offer offer : offers) {
+        if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
+          driver.declineOffer(offer.getId());
+          continue;
+        }
+
+        // Ensure these values aren't < 0.
+        neededMapSlots = Math.max(0, neededMapSlots);
+        neededReduceSlots = Math.max(0, neededReduceSlots);
+
+        double cpus = -1.0;
+        double mem = -1.0;
+        double disk = -1.0;
+        Set<Integer> ports = new HashSet<Integer>();
+
+        // Pull out the cpus, memory, disk, and 2 ports from the offer.
+        for (Resource resource : offer.getResourcesList()) {
+          if (resource.getName().equals("cpus")
+              && resource.getType() == Value.Type.SCALAR) {
+            cpus = resource.getScalar().getValue();
+          } else if (resource.getName().equals("mem")
+              && resource.getType() == Value.Type.SCALAR) {
+            mem = resource.getScalar().getValue();
+          } else if (resource.getName().equals("disk")
+              && resource.getType() == Value.Type.SCALAR) {
+            disk = resource.getScalar().getValue();
+          } else if (resource.getName().equals("ports")
+              && resource.getType() == Value.Type.RANGES) {
+            for (Value.Range range : resource.getRanges().getRangeList()) {
+              Integer begin = (int)range.getBegin();
+              Integer end = (int)range.getEnd();
+              if (end < begin) {
+                LOG.warn("Ignoring invalid port range: begin=" + begin + " end=" + end);
+                continue;
+              }
+              while (begin <= end && ports.size() < 2) {
+                ports.add(begin);
+                begin += 1;
+              }
+            }
+          }
+        }
+
+        int mapSlotsMax = conf.getInt("mapred.tasktracker.map.tasks.maximum",
+            MAP_SLOTS_DEFAULT);
+        int reduceSlotsMax =
+          conf.getInt("mapred.tasktracker.reduce.tasks.maximum",
+              REDUCE_SLOTS_DEFAULT);
+
+        // What's the minimum number of map and reduce slots we should try to
+        // launch?
+        long mapSlots = 0;
+        long reduceSlots = 0;
+
+        double slotCpus = conf.getFloat("mapred.mesos.slot.cpus",
+            (float) SLOT_CPUS_DEFAULT);
+        double slotDisk = conf.getInt("mapred.mesos.slot.disk",
+            SLOT_DISK_DEFAULT);
+
+        int slotMem = conf.getInt("mapred.mesos.slot.mem",
+            SLOT_JVM_HEAP_DEFAULT);
+        long slotJVMHeap = Math.round((double)slotMem -
+            (JVM_MEM_OVERHEAD_PERCENT_DEFAULT * slotMem));
+
+        int tasktrackerMem = conf.getInt("mapred.mesos.tasktracker.mem",
+              TASKTRACKER_MEM_DEFAULT);
+        long tasktrackerJVMHeap = Math.round((double)tasktrackerMem -
+            (JVM_MEM_OVERHEAD_PERCENT_DEFAULT * tasktrackerMem));
+
+        // Minimum resource requirements for the container (TaskTracker + map/red
+        // tasks).
+        double containerCpus = TASKTRACKER_CPUS;
+        double containerMem = tasktrackerMem;
+        double containerDisk = 0;
+
+        // Determine how many slots we can allocate.
+        int slots = mapSlotsMax + reduceSlotsMax;
+        slots = (int)Math.min(slots, (cpus - containerCpus) / slotCpus);
+        slots = (int)Math.min(slots, (mem - containerMem) / slotMem);
+        slots = (int)Math.min(slots, (disk - containerDisk) / slotDisk);
+
+        // Is this offer too small for even the minimum slots?
+        if (slots < 1 || ports.size() < 2) {
+          LOG.info(join("\n", Arrays.asList(
+                  "Declining offer with insufficient resources for a TaskTracker: ",
+                  "  cpus: offered " + cpus + " needed " + containerCpus,
+                  "  mem : offered " + mem + " needed " + containerMem,
+                  "  disk: offered " + disk + " needed " + containerDisk,
+                  "  ports: " + (ports.size() < 2
+                    ? " less than 2 offered"
+                    : " at least 2 (sufficient)"))));
+
+          driver.declineOffer(offer.getId());
+          continue;
+        }
+
+        // Is the number of slots we need sufficiently small? If so, we can
+        // allocate exactly the number we need.
+        if (slots >= neededMapSlots + neededReduceSlots && neededMapSlots <
+            mapSlotsMax && neededReduceSlots < reduceSlotsMax) {
+          mapSlots = neededMapSlots;
+          reduceSlots = neededReduceSlots;
+        } else {
+          // Allocate slots fairly for this resource offer.
+          double mapFactor = (double)neededMapSlots / (neededMapSlots + neededReduceSlots);
+          double reduceFactor = (double)neededReduceSlots / (neededMapSlots + neededReduceSlots);
+          // To avoid map/reduce slot starvation, don't allow more than 50%
+          // spread between map/reduce slots when we need both mappers and
+          // reducers.
+          if (neededMapSlots > 0 && neededReduceSlots > 0) {
+            if (mapFactor < 0.25) {
+              mapFactor = 0.25;
+            } else if (mapFactor > 0.75) {
+              mapFactor = 0.75;
+            }
+            if (reduceFactor < 0.25) {
+              reduceFactor = 0.25;
+            } else if (reduceFactor > 0.75) {
+              reduceFactor = 0.75;
+            }
+          }
+          mapSlots = Math.min(Math.min((long)(mapFactor * slots), mapSlotsMax), neededMapSlots);
+          // The remaining slots are allocated for reduces.
+          slots -= mapSlots;
+          reduceSlots = Math.min(Math.min(slots, reduceSlotsMax), neededReduceSlots);
+        }
+
+        Iterator<Integer> portIter = ports.iterator();
+        HttpHost httpAddress = new HttpHost(offer.getHostname(), portIter.next());
+        HttpHost reportAddress = new HttpHost(offer.getHostname(), portIter.next());
+
+          // Check that this tracker is not already launched.  This problem was
+          // observed on a few occasions, but not reliably.  The main symptom was
+          // that entries in `mesosTrackers` were being lost, and task trackers
+          // would be 'lost' mysteriously (probably because the ports were in
+          // use).  This problem has since gone away with a rewrite of the port
+          // selection code, but the check + logging is left here.
+          // TODO(brenden): Diagnose this to determine root cause.
+
+        if (mesosTrackers.containsKey(httpAddress)) {
+          LOG.info(join("\n", Arrays.asList(
+                  "Declining offer because host/port combination is in use: ",
+                  "  cpus: offered " + cpus + " needed " + containerCpus,
+                  "  mem : offered " + mem + " needed " + containerMem,
+                  "  disk: offered " + disk + " needed " + containerDisk,
+                  "  ports: " + ports)));
+
+          driver.declineOffer(offer.getId());
+          continue;
+        }
+
+        TaskID taskId = TaskID.newBuilder()
+          .setValue("Task_Tracker_" + launchedTrackers++).build();
+
+        LOG.info("Launching task " + taskId.getValue() + " on "
+            + httpAddress.toString() + " with mapSlots=" + mapSlots + " reduceSlots=" + reduceSlots);
+
+        // Add this tracker to Mesos tasks.
+        mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId,
+              mapSlots, reduceSlots, this));
+
+        // Create the environment depending on whether the executor is going to be
+        // run locally.
+        // TODO(vinod): Do not pass the mapred config options as environment
+        // variables.
+        Protos.Environment.Builder envBuilder = Protos.Environment
+          .newBuilder()
+          .addVariables(
+              Protos.Environment.Variable
+              .newBuilder()
+              .setName("mapred.job.tracker")
+              .setValue(jobTrackerAddress.getHostName() + ':'
+                + jobTrackerAddress.getPort()))
+          .addVariables(
+              Protos.Environment.Variable
+              .newBuilder()
+              .setName("mapred.task.tracker.http.address")
+              .setValue(
+                httpAddress.getHostName() + ':' + httpAddress.getPort()))
+          .addVariables(
+              Protos.Environment.Variable
+              .newBuilder()
+              .setName("mapred.task.tracker.report.address")
+              .setValue(reportAddress.getHostName() + ':'
+                + reportAddress.getPort()))
+          .addVariables(
+              Protos.Environment.Variable.newBuilder()
+              .setName("mapred.map.child.java.opts")
+              .setValue("-Xmx" + slotJVMHeap + "m"))
+          .addVariables(
+              Protos.Environment.Variable.newBuilder()
+              .setName("mapred.reduce.child.java.opts")
+              .setValue("-Xmx" + slotJVMHeap + "m"))
+          .addVariables(
+              Protos.Environment.Variable.newBuilder()
+              .setName("HADOOP_HEAPSIZE")
+              .setValue("" + tasktrackerJVMHeap))
+          .addVariables(
+              Protos.Environment.Variable.newBuilder()
+              .setName("mapred.tasktracker.map.tasks.maximum")
+              .setValue("" + mapSlots))
+          .addVariables(
+              Protos.Environment.Variable.newBuilder()
+              .setName("mapred.tasktracker.reduce.tasks.maximum")
+              .setValue("" + reduceSlots));
+
+        // Set java specific environment, appropriately.
+        Map<String, String> env = System.getenv();
+        if (env.containsKey("JAVA_HOME")) {
+          envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
+              .setName("JAVA_HOME")
+              .setValue(env.get("JAVA_HOME")));
+        }
+
+        if (env.containsKey("JAVA_LIBRARY_PATH")) {
+          envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
+              .setName("JAVA_LIBRARY_PATH")
+              .setValue(env.get("JAVA_LIBRARY_PATH")));
+        }
+
+        // Command info differs when performing a local run.
+        CommandInfo commandInfo = null;
+        String master = conf.get("mapred.mesos.master", "local");
+
+        if (master.equals("local")) {
+          commandInfo = CommandInfo.newBuilder()
+            .setEnvironment(envBuilder)
+            .setValue(new File("bin/hadoop").getCanonicalPath() +
+                      " org.apache.hadoop.mapred.MesosExecutor")
+            .build();
+        } else {
+          String uri = conf.get("mapred.mesos.executor");
+          commandInfo = CommandInfo.newBuilder()
+            .setEnvironment(envBuilder)
+            .setValue("cd hadoop-* && " +
+                      "./bin/hadoop org.apache.hadoop.mapred.MesosExecutor")
+            .addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
+        }
+
+        TaskInfo info = TaskInfo
+          .newBuilder()
+          .setName(taskId.getValue())
+          .setTaskId(taskId)
+          .setSlaveId(offer.getSlaveId())
+          .addResources(
+              Resource
+              .newBuilder()
+              .setName("cpus")
+              .setType(Value.Type.SCALAR)
+              .setScalar(Value.Scalar.newBuilder().setValue(
+                  (mapSlots + reduceSlots) * slotCpus)))
+          .addResources(
+              Resource
+              .newBuilder()
+              .setName("mem")
+              .setType(Value.Type.SCALAR)
+              .setScalar(Value.Scalar.newBuilder().setValue(
+                  (mapSlots + reduceSlots) * slotMem)))
+          .addResources(
+              Resource
+              .newBuilder()
+              .setName("disk")
+              .setType(Value.Type.SCALAR)
+              .setScalar(Value.Scalar.newBuilder().setValue(
+                  (mapSlots + reduceSlots) * slotDisk)))
+          .addResources(
+              Resource
+              .newBuilder()
+              .setName("ports")
+              .setType(Value.Type.RANGES)
+              .setRanges(
+                Value.Ranges
+                .newBuilder()
+                .addRange(Value.Range.newBuilder()
+                  .setBegin(httpAddress.getPort())
+                  .setEnd(httpAddress.getPort()))
+                .addRange(Value.Range.newBuilder()
+                  .setBegin(reportAddress.getPort())
+                  .setEnd(reportAddress.getPort()))))
+          .setExecutor(
+              ExecutorInfo
+              .newBuilder()
+              .setExecutorId(ExecutorID.newBuilder().setValue(
+                  "executor_" + taskId.getValue()))
+              .setName("Hadoop TaskTracker")
+              .setSource(taskId.getValue())
+              .addResources(
+                Resource
+                .newBuilder()
+                .setName("cpus")
+                .setType(Value.Type.SCALAR)
+                .setScalar(Value.Scalar.newBuilder().setValue(
+                    (TASKTRACKER_CPUS))))
+              .addResources(
+                Resource
+                .newBuilder()
+                .setName("mem")
+                .setType(Value.Type.SCALAR)
+                .setScalar(Value.Scalar.newBuilder().setValue(
+                    (tasktrackerMem)))).setCommand(commandInfo))
+                    .build();
+
+        driver.launchTasks(offer.getId(), Arrays.asList(info));
+
+        neededMapSlots -= mapSlots;
+        neededReduceSlots -= reduceSlots;
+      }
+
+      if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
+        LOG.info("Satisfied map and reduce slots needed.");
+      } else {
+        LOG.info("Unable to fully satisfy needed map/reduce slots: "
+            + (neededMapSlots > 0 ? neededMapSlots + " map slots " : "")
+            + (neededReduceSlots > 0 ? neededReduceSlots + " reduce slots " : "")
+            + "remaining");
+      }
+    }
+  }
+
+  @Override
+  public synchronized void offerRescinded(SchedulerDriver schedulerDriver,
+      OfferID offerID) {
+    LOG.warn("Rescinded offer: " + offerID.getValue());
+  }
+
+  @Override
+  public synchronized void statusUpdate(SchedulerDriver schedulerDriver,
+      Protos.TaskStatus taskStatus) {
+    LOG.info("Status update of " + taskStatus.getTaskId().getValue()
+        + " to " + taskStatus.getState().name()
+        + " with message " + taskStatus.getMessage());
+
+    // Remove the TaskTracker if the corresponding Mesos task has reached a
+    // terminal state.
+    switch (taskStatus.getState()) {
+      case TASK_FINISHED:
+      case TASK_FAILED:
+      case TASK_KILLED:
+      case TASK_LOST:
+        // Make a copy to iterate over keys and delete values.
+        Set<HttpHost> trackers = new HashSet<HttpHost>(mesosTrackers.keySet());
+
+        // Remove the task from the map.
+        for (HttpHost tracker : trackers) {
+          if (mesosTrackers.get(tracker).taskId.equals(taskStatus.getTaskId())) {
+            LOG.info("Removing terminated TaskTracker: " + tracker);
+	    tracker.timer.cancel();
+            mesosTrackers.remove(tracker);
+          }
+        }
+        break;
+      case TASK_STAGING:
+      case TASK_STARTING:
+      case TASK_RUNNING:
+        break;
+      default:
+        LOG.error("Unexpected TaskStatus: " + taskStatus.getState().name());
+        break;
+    }
+  }
+
+  @Override
+  public synchronized void frameworkMessage(SchedulerDriver schedulerDriver,
+      ExecutorID executorID, SlaveID slaveID, byte[] bytes) {
+    LOG.info("Framework Message of " + bytes.length + " bytes"
+        + " from executor " + executorID.getValue()
+        + " on slave " + slaveID.getValue());
+  }
+
+  @Override
+  public synchronized void disconnected(SchedulerDriver schedulerDriver) {
+    LOG.warn("Disconnected from Mesos master.");
+  }
+
+  @Override
+  public synchronized void slaveLost(SchedulerDriver schedulerDriver,
+      SlaveID slaveID) {
+    LOG.warn("Slave lost: " + slaveID.getValue());
+  }
+
+  @Override
+  public synchronized void executorLost(SchedulerDriver schedulerDriver,
+      ExecutorID executorID, SlaveID slaveID, int status) {
+    LOG.warn("Executor " + executorID.getValue()
+        + " lost with status " + status + " on slave " + slaveID);
+  }
+
+  @Override
+  public synchronized void error(SchedulerDriver schedulerDriver, String s) {
+    LOG.error("Error from scheduler driver: " + s);
+  }
+
+  /**
+   * Used to track the our launched TaskTrackers.
+   */
+  private class MesosTracker {
+    public volatile HttpHost host;
+    public TaskID taskId;
+    public long mapSlots;
+    public long reduceSlots;
+    public volatile boolean active = false; // Set once tracked by the JobTracker.
+    public Timer timer;
+    public volatile MesosScheduler scheduler;
+
+    // Tracks Hadoop job tasks running on the tracker.
+    public Set<JobID> hadoopJobs = new HashSet<JobID>();
+
+    public MesosTracker(HttpHost host, TaskID taskId, long mapSlots,
+        long reduceSlots, MesosScheduler scheduler) {
+      this.host = host;
+      this.taskId = taskId;
+      this.mapSlots = mapSlots;
+      this.reduceSlots = reduceSlots;
+      this.scheduler = scheduler;
+
+      this.timer = new Timer();
+      timer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          synchronized (MesosTracker.this.scheduler) {
+            // If the tracker activated while we were awaiting to acquire the
+            // lock, return.
+            if (MesosTracker.this.active) return;
+
+            LOG.warn("Tracker " + MesosTracker.this.host + " failed to launch within " +
+              LAUNCH_TIMEOUT_MS / 1000 + " seconds, killing it");
+            MesosTracker.this.scheduler.killTracker(MesosTracker.this);
+          }
+        }
+      }, LAUNCH_TIMEOUT_MS);
+    }
+  }
+}


[5/5] git commit: Removed Hadoop from repository.

Posted by be...@apache.org.
Removed Hadoop from repository.

Review: https://reviews.apache.org/r/13245


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c621ae05
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c621ae05
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c621ae05

Branch: refs/heads/master
Commit: c621ae052f15ce6f18d6e08a7c9064b6a7a74711
Parents: 3a7b926
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Aug 2 14:30:12 2013 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Aug 2 15:02:31 2013 -0700

----------------------------------------------------------------------
 Makefile.am                                     |   2 +-
 README                                          |  12 -
 configure.ac                                    |   1 -
 hadoop/README.md                                | 128 ---
 hadoop/pom.xml                                  | 110 ---
 .../org/apache/hadoop/mapred/MesosExecutor.java | 145 ----
 .../apache/hadoop/mapred/MesosScheduler.java    | 845 -------------------
 7 files changed, 1 insertion(+), 1242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c621ae05/Makefile.am
----------------------------------------------------------------------
diff --git a/Makefile.am b/Makefile.am
index fc1c1d0..260486b 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -18,7 +18,7 @@ ACLOCAL_AMFLAGS = -I m4
 
 AUTOMAKE_OPTIONS = foreign
 
-SUBDIRS = . 3rdparty src ec2 hadoop jenkins
+SUBDIRS = . 3rdparty src ec2 jenkins
 
 EXTRA_DIST =
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c621ae05/README
----------------------------------------------------------------------
diff --git a/README b/README
index 7fb6cba..e94531d 100644
--- a/README
+++ b/README
@@ -95,18 +95,6 @@ addition, see [build]/bin for scripts that can run the tests and also
 launch the tests in GDB.
 
 
-Hadoop
-======
-
-Included in the distribution is a runnable tutorial on using Hadoop on
-Mesos (./hadoop/TUTORIAL.sh). Try it out!
-
-You can also "build" a self-contained distribution of Hadoop with the
-necessary Mesos components by doing 'make hadoop-0.20.205.0' or 'make
-hadoop-0.20.2-cdh3u3' from within [build]/hadoop (this uses the
-tutorial mentioned above).
-
-
 Installing
 ==========
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c621ae05/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index 4dc1cc7..7c9adac 100644
--- a/configure.ac
+++ b/configure.ac
@@ -78,7 +78,6 @@ AC_CONFIG_SUBDIRS([3rdparty/libprocess])
 
 AC_CONFIG_FILES([Makefile])
 AC_CONFIG_FILES([ec2/Makefile])
-AC_CONFIG_FILES([hadoop/Makefile])
 AC_CONFIG_FILES([jenkins/Makefile])
 AC_CONFIG_FILES([jenkins/pom.xml])
 AC_CONFIG_FILES([src/Makefile])

http://git-wip-us.apache.org/repos/asf/mesos/blob/c621ae05/hadoop/README.md
----------------------------------------------------------------------
diff --git a/hadoop/README.md b/hadoop/README.md
deleted file mode 100644
index 36d7eb4..0000000
--- a/hadoop/README.md
+++ /dev/null
@@ -1,128 +0,0 @@
-Hadoop on Mesos
----------------
-
-#### Overview ####
-
-To run _Hadoop on Mesos_ you need to add the `hadoop-mesos-0.0.1.jar`
-library to your Hadoop distribution (any distribution that supports
-`hadoop-core-1.2.0` should work) and set some new configuration
-properties. Read on for details.
-
-#### Build ####
-
-You can build `hadoop-mesos-0.0.1.jar` using Maven:
-
-```
-$ mvn package
-```
-
-If successful, the JAR will be at `target/hadoop-mesos-0.0.1.jar`.
-
-> NOTE: If you want to build against a different version of Mesos than
-> the default you'll need to update `mesos-version` in `pom.xml`.
-
-We plan to provide already built JARs at http://repository.apache.org
-in the near future!
-
-#### Package ####
-
-You'll need to download an existing Hadoop distribution. For this
-guide, we'll use [CDH4.2.1][CDH4.2.1]. First grab the tar archive and
-extract it.
-
-```
-$ wget http://archive.cloudera.com/cdh4/cdh/4/mr1-2.0.0-mr1-cdh4.2.1.tar.gz
-...
-$ tar zxf mr1-2.0.0-mr1-cdh4.2.1.tar.gz
-```
-
-> **Take note**, the extracted directory is `hadoop-2.0.0-mr1-cdh4.2.1`.
-
-Now copy `hadoop-mesos-0.0.1.jar` into the `lib` folder.
-
-```
-$ cp /path/to/hadoop-mesos-0.0.1.jar hadoop-2.0.0-mr1-cdh4.2.1/lib/
-```
-
-_That's it!_ You now have a _Hadoop on Mesos_ distribution!
-
-[CDH4.2.1]: http://www.cloudera.com/content/support/en/documentation/cdh4-documentation/cdh4-documentation-v4-2-1.html
-
-#### Upload ####
-
-You'll want to upload your _Hadoop on Mesos_ distribution somewhere
-that Mesos can access in order to launch each `TaskTracker`. For
-example, if you're already running HDFS:
-
-```
-$ tar czf hadoop-2.0.0-mr1-cdh4.2.1.tar.gz hadoop-2.0.0-mr1-cdh4.2.1
-$ hadoop fs -put hadoop-2.0.0-mr1-cdh4.2.1.tar.gz /hadoop-2.0.0-mr1-cdh4.2.1.tar.gz
-```
-
-> **Consider** any permissions issues with your uploaded location
-> (i.e., on HDFS you'll probably want to make the file world
-> readable).
-
-Now you'll need to configure your `JobTracker` to launch each
-`TaskTracker` on Mesos!
-
-#### Configure ####
-
-Along with the normal configuration properties you might want to set
-to launch a `JobTracker`, you'll need to set some Mesos specific ones
-too.
-
-Here are the mandatory configuration properties for
-`conf/mapred-site.xml` (initialized to values representative of
-running in [pseudo distributed
-operation](http://hadoop.apache.org/docs/stable/single_node_setup.html#PseudoDistributed):
-
-```
-<property>
-  <name>mapred.job.tracker</name>
-  <value>localhost:9001</value>
-</property>
-<property>
-  <name>mapred.jobtracker.taskScheduler</name>
-  <value>org.apache.hadoop.mapred.MesosScheduler</value>
-</property>
-<property>
-  <name>mapred.mesos.taskScheduler</name>
-  <value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value>
-</property>
-<property>
-  <name>mapred.mesos.master</name>
-  <value>localhost:5050</value>
-</property>
-<property>
-  <name>mapred.mesos.executor</name>
-  <value>hdfs://localhost:9000/hadoop-2.0.0-mr1-cdh4.2.1.tar.gz</value>
-</property>
-```
-
-#### Start ####
-
-Now you can start the `JobTracker` but you'll need to include the path
-to the Mesos native library.
-
-On Linux:
-
-```
-$ MESOS_NATIVE_LIBRARY=/path/to/libmesos.so hadoop jobtracker
-```
-
-And on OS X:
-
-```
-$ MESOS_NATIVE_LIBRARY=/path/to/libmesos.dylib hadoop jobtracker
-```
-
-> **NOTE: You do not need to worry about distributing your Hadoop
-> configuration! All of the configuration properties read by the**
-> `JobTracker` **along with any necessary** `TaskTracker` **specific
-> _overrides_ will get serialized and passed to each** `TaskTracker`
-> **on startup.**
-
-_Please email user@mesos.apache.org with questions!_
-
-----------

http://git-wip-us.apache.org/repos/asf/mesos/blob/c621ae05/hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop/pom.xml b/hadoop/pom.xml
deleted file mode 100644
index baf3e5b..0000000
--- a/hadoop/pom.xml
+++ /dev/null
@@ -1,110 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <groupId>org.apache.mesos</groupId>
-    <artifactId>hadoop-mesos</artifactId>
-    <version>0.0.1</version>
-
-    <properties>
-        <encoding>UTF-8</encoding>
-
-        <!-- language versions -->
-        <java.abi>1.6</java.abi>
-
-        <!-- runtime deps versions -->
-        <commons-logging.version>1.1.1</commons-logging.version>
-        <hadoop-core.version>1.2.0</hadoop-core.version>
-        <mesos.version>0.14.0</mesos.version>
-        <protobuf.version>2.4.1</protobuf.version>
-
-        <!-- test deps versions -->
-        <junit.version>4.11</junit.version>
-        <mockito.version>1.9.5</mockito.version>
-    </properties>
-
-    <!-- TODO(benh): Remove repository once we've published a JDK6 JAR. -->
-    <repositories>
-        <repository>
-            <id>mesosphere-public-repo</id>
-            <name>Mesosphere Public Repo</name>
-            <url>http://s3.amazonaws.com/mesosphere-maven-public/</url>
-        </repository>
-    </repositories>
-
-    <dependencies>
-        <dependency>
-            <groupId>commons-logging</groupId>
-            <artifactId>commons-logging</artifactId>
-            <version>${commons-logging.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-core</artifactId>
-            <version>${hadoop-core.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.mesos</groupId>
-            <!-- TODO(benh): Use 'mesos' after we've published a JDK6 JAR. -->
-            <artifactId>mesos_jdk6</artifactId>
-            <version>${mesos.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java</artifactId>
-            <version>${protobuf.version}</version>
-        </dependency>
-
-        <!-- Test scope -->
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>${junit.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <version>3.1</version>
-                <configuration>
-                    <source>${java.abi}</source>
-                    <target>${java.abi}</target>
-                    <showDeprecation>true</showDeprecation>
-                    <showWarnings>true</showWarnings>
-                </configuration>
-            </plugin>
-
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <version>2.0</version>
-                <configuration>
-                    <filters>
-                        <filter>
-                            <artifact>*:*</artifact>
-                            <excludes>
-                                <exclude>META-INF/*.SF</exclude>
-                                <exclude>META-INF/*.DSA</exclude>
-                                <exclude>META-INF/*.RSA</exclude>
-                            </excludes>
-                        </filter>
-                    </filters>
-                </configuration>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
- 
-        </plugins>
-    </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/mesos/blob/c621ae05/hadoop/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/hadoop/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java
deleted file mode 100644
index ccf0090..0000000
--- a/hadoop/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.mesos.Executor;
-import org.apache.mesos.ExecutorDriver;
-import org.apache.mesos.MesosExecutorDriver;
-import org.apache.mesos.Protos.Environment.Variable;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.FrameworkInfo;
-import org.apache.mesos.Protos.SlaveInfo;
-import org.apache.mesos.Protos.Status;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.TaskState;
-import org.apache.mesos.Protos.TaskStatus;
-
-public class MesosExecutor implements Executor {
-  public static final Log LOG = LogFactory.getLog(MesosExecutor.class);
-
-  private JobConf conf;
-  private TaskTracker taskTracker;
-
-  @Override
-  public void registered(ExecutorDriver driver, ExecutorInfo executorInfo,
-      FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) {
-    LOG.info("Executor registered with the slave");
-
-    conf = new JobConf();
-
-    // Get TaskTracker's config options from environment variables set by the
-    // JobTracker.
-    if (executorInfo.getCommand().hasEnvironment()) {
-      for (Variable variable : executorInfo.getCommand().getEnvironment()
-          .getVariablesList()) {
-        LOG.info("Setting config option : " + variable.getName() + " to "
-            + variable.getValue());
-        conf.set(variable.getName(), variable.getValue());
-      }
-    }
-
-    // Get hostname from Mesos to make sure we match what it reports
-    // to the JobTracker.
-    conf.set("slave.host.name", slaveInfo.getHostname());
-
-    // Set the mapred.local directory inside the executor sandbox, so that
-    // different TaskTrackers on the same host do not step on each other.
-    conf.set("mapred.local.dir", System.getProperty("user.dir") + "/mapred");
-  }
-
-  @Override
-  public void launchTask(final ExecutorDriver driver, final TaskInfo task) {
-    LOG.info("Launching task : " + task.getTaskId().getValue());
-
-    // NOTE: We need to manually set the context class loader here because,
-    // the TaskTracker is unable to find LoginModule class otherwise.
-    Thread.currentThread().setContextClassLoader(
-        TaskTracker.class.getClassLoader());
-
-    try {
-      taskTracker = new TaskTracker(conf);
-    } catch (IOException e) {
-      LOG.fatal("Failed to start TaskTracker", e);
-      System.exit(1);
-    } catch (InterruptedException e) {
-      LOG.fatal("Failed to start TaskTracker", e);
-      System.exit(1);
-    }
-
-    // Spin up a TaskTracker in a new thread.
-    new Thread("TaskTracker Run Thread") {
-      @Override
-      public void run() {
-        taskTracker.run();
-
-        // Send a TASK_FINISHED status update.
-        // We do this here because we want to send it in a separate thread
-        // than was used to call killTask().
-        driver.sendStatusUpdate(TaskStatus.newBuilder()
-            .setTaskId(task.getTaskId())
-            .setState(TaskState.TASK_FINISHED)
-            .build());
-
-        // Give some time for the update to reach the slave.
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          LOG.error("Failed to sleep TaskTracker thread", e);
-        }
-
-        // Stop the executor.
-        driver.stop();
-      }
-    }.start();
-
-    driver.sendStatusUpdate(TaskStatus.newBuilder()
-        .setTaskId(task.getTaskId())
-        .setState(TaskState.TASK_RUNNING).build());
-  }
-
-  @Override
-  public void killTask(ExecutorDriver driver, TaskID taskId) {
-    LOG.info("Killing task : " + taskId.getValue());
-    try {
-      taskTracker.shutdown();
-    } catch (IOException e) {
-      LOG.error("Failed to shutdown TaskTracker", e);
-    } catch (InterruptedException e) {
-      LOG.error("Failed to shutdown TaskTracker", e);
-    }
-  }
-
-  @Override
-  public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) {
-    LOG.info("Executor reregistered with the slave");
-  }
-
-  @Override
-  public void disconnected(ExecutorDriver driver) {
-    LOG.info("Executor disconnected from the slave");
-  }
-
-  @Override
-  public void frameworkMessage(ExecutorDriver d, byte[] msg) {
-    LOG.info("Executor received framework message of length: " + msg.length
-        + " bytes");
-  }
-
-  @Override
-  public void error(ExecutorDriver d, String message) {
-    LOG.error("MesosExecutor.error: " + message);
-  }
-
-  @Override
-  public void shutdown(ExecutorDriver d) {
-    LOG.info("Executor asked to shutdown");
-  }
-
-  public static void main(String[] args) {
-    MesosExecutorDriver driver = new MesosExecutorDriver(new MesosExecutor());
-    System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/mesos/blob/c621ae05/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
deleted file mode 100644
index 0332dea..0000000
--- a/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
+++ /dev/null
@@ -1,845 +0,0 @@
-package org.apache.hadoop.mapred;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.Iterator;
-
-import org.apache.commons.httpclient.HttpHost;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.mesos.MesosSchedulerDriver;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.CommandInfo;
-import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.FrameworkInfo;
-import org.apache.mesos.Protos.MasterInfo;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.TaskState;
-import org.apache.mesos.Protos.Value;
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-
-import static org.apache.hadoop.util.StringUtils.join;
-
-public class MesosScheduler extends TaskScheduler implements Scheduler {
-  public static final Log LOG = LogFactory.getLog(MesosScheduler.class);
-
-  private SchedulerDriver driver;
-  private TaskScheduler taskScheduler;
-  private JobTracker jobTracker;
-  private Configuration conf;
-
-  // This is the memory overhead for a jvm process. This needs to be added
-  // to a jvm process's resource requirement, in addition to its heap size.
-  private static final double JVM_MEM_OVERHEAD_PERCENT_DEFAULT = 0.1; // 10%.
-
-  // TODO(vinod): Consider parsing the slot memory from the configuration jvm
-  // heap options (e.g: mapred.child.java.opts).
-
-  // NOTE: It appears that there's no real resource requirements for a
-  // map / reduce slot. We therefore define a default slot as:
-  // 0.2 cores.
-  // 512 MB memory.
-  // 1 GB of disk space.
-  private static final double SLOT_CPUS_DEFAULT = 0.2; // 0.2 cores.
-  private static final int SLOT_DISK_DEFAULT = 1024; // 1 GB.
-  private static final int SLOT_JVM_HEAP_DEFAULT = 256; // 256MB.
-
-  private static final double TASKTRACKER_CPUS = 1.0; // 1 core.
-  private static final int TASKTRACKER_MEM_DEFAULT = 1024; // 1 GB.
-
-  // The default behavior in Hadoop is to use 4 slots per TaskTracker:
-  private static final int MAP_SLOTS_DEFAULT = 2;
-  private static final int REDUCE_SLOTS_DEFAULT = 2;
-
-  // The amount of time to wait for task trackers to launch before
-  // giving up.
-  private static final long LAUNCH_TIMEOUT_MS = 300000; // 5 minutes
-
-  // Count of the launched trackers for TaskID generation.
-  private long launchedTrackers = 0;
-
-  // Maintains a mapping from {tracker host:port -> MesosTracker}.
-  // Used for tracking the slots of each TaskTracker and the corresponding
-  // Mesos TaskID.
-  private Map<HttpHost, MesosTracker> mesosTrackers =
-    new HashMap<HttpHost, MesosTracker>();
-
-  private JobInProgressListener jobListener = new JobInProgressListener() {
-    @Override
-    public void jobAdded(JobInProgress job) throws IOException {
-      LOG.info("Added job " + job.getJobID());
-    }
-
-    @Override
-    public void jobRemoved(JobInProgress job) {
-      LOG.info("Removed job " + job.getJobID());
-    }
-
-    @Override
-    public void jobUpdated(JobChangeEvent event) {
-      synchronized (MesosScheduler.this) {
-        JobInProgress job = event.getJobInProgress();
-
-        // If the job is complete, kill all the corresponding idle TaskTrackers.
-        if (!job.isComplete()) {
-          return;
-        }
-
-        LOG.info("Completed job : " + job.getJobID());
-
-        List<TaskInProgress> completed = new ArrayList<TaskInProgress>();
-
-        // Map tasks.
-        completed.addAll(job.reportTasksInProgress(true, true));
-
-        // Reduce tasks.
-        completed.addAll(job.reportTasksInProgress(false, true));
-
-        for (TaskInProgress task : completed) {
-          // Check that this task actually belongs to this job
-          if (task.getJob().getJobID() != job.getJobID()) {
-            continue;
-          }
-
-          for (TaskStatus status : task.getTaskStatuses()) {
-            // Make a copy to iterate over keys and delete values.
-            Set<HttpHost> trackers = new HashSet<HttpHost>(
-                mesosTrackers.keySet());
-
-            // Remove the task from the map.
-            for (HttpHost tracker : trackers) {
-              MesosTracker mesosTracker = mesosTrackers.get(tracker);
-
-              if (!mesosTracker.active) {
-                LOG.warn("Ignoring TaskTracker: " + tracker
-                    + " because it might not have sent a hearbeat");
-                continue;
-              }
-
-              LOG.info("Removing completed task : " + status.getTaskID()
-                  + " of tracker " + status.getTaskTracker());
-
-              mesosTracker.hadoopJobs.remove(job.getJobID());
-
-              // If the TaskTracker doesn't have any running tasks, kill it.
-              if (mesosTracker.hadoopJobs.isEmpty()) {
-                LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host "
-                    + mesosTracker.host);
-
-                driver.killTask(mesosTracker.taskId);
-		mesosTracker.timer.cancel();
-                mesosTrackers.remove(tracker);
-              }
-            }
-          }
-        }
-      }
-    }
-  };
-
-  public MesosScheduler() {}
-
-  // TaskScheduler methods.
-  @Override
-  public synchronized void start() throws IOException {
-    conf = getConf();
-    String taskTrackerClass = conf.get("mapred.mesos.taskScheduler",
-        "org.apache.hadoop.mapred.JobQueueTaskScheduler");
-
-    try {
-      taskScheduler =
-        (TaskScheduler) Class.forName(taskTrackerClass).newInstance();
-      taskScheduler.setConf(conf);
-      taskScheduler.setTaskTrackerManager(taskTrackerManager);
-    } catch (ClassNotFoundException e) {
-      LOG.fatal("Failed to initialize the TaskScheduler", e);
-      System.exit(1);
-    } catch (InstantiationException e) {
-      LOG.fatal("Failed to initialize the TaskScheduler", e);
-      System.exit(1);
-    } catch (IllegalAccessException e) {
-      LOG.fatal("Failed to initialize the TaskScheduler", e);
-      System.exit(1);
-    }
-
-    // Add the job listener to get job related updates.
-    taskTrackerManager.addJobInProgressListener(jobListener);
-
-    LOG.info("Starting MesosScheduler");
-    jobTracker = (JobTracker) super.taskTrackerManager;
-
-    String master = conf.get("mapred.mesos.master", "local");
-
-    try {
-      FrameworkInfo frameworkInfo = FrameworkInfo
-        .newBuilder()
-        .setUser("")
-        .setCheckpoint(conf.getBoolean("mapred.mesos.checkpoint", false))
-        .setName("Hadoop: (RPC port: " + jobTracker.port + ","
-            + " WebUI port: " + jobTracker.infoPort + ")").build();
-
-      driver = new MesosSchedulerDriver(this, frameworkInfo, master);
-      driver.start();
-    } catch (Exception e) {
-      // If the MesosScheduler can't be loaded, the JobTracker won't be useful
-      // at all, so crash it now so that the user notices.
-      LOG.fatal("Failed to start MesosScheduler", e);
-      System.exit(1);
-    }
-
-    taskScheduler.start();
-  }
-
-  @Override
-  public synchronized void terminate() throws IOException {
-    try {
-      LOG.info("Stopping MesosScheduler");
-      driver.stop();
-    } catch (Exception e) {
-      LOG.error("Failed to stop Mesos scheduler", e);
-    }
-
-    taskScheduler.terminate();
-  }
-
-  @Override
-  public synchronized List<Task> assignTasks(TaskTracker taskTracker)
-    throws IOException {
-    HttpHost tracker = new HttpHost(taskTracker.getStatus().getHost(),
-        taskTracker.getStatus().getHttpPort());
-
-    if (!mesosTrackers.containsKey(tracker)) {
-      // TODO(bmahler): Consider allowing non-Mesos TaskTrackers.
-      LOG.info("Unknown/exited TaskTracker: " + tracker + ". ");
-      return null;
-    }
-
-    // Let the underlying task scheduler do the actual task scheduling.
-    List<Task> tasks = taskScheduler.assignTasks(taskTracker);
-
-    // The Hadoop Fair Scheduler is known to return null.
-    if (tasks != null) {
-      // Keep track of which TaskTracker contains which tasks.
-      for (Task task : tasks) {
-        mesosTrackers.get(tracker).hadoopJobs.add(task.getJobID());
-      }
-    }
-
-    return tasks;
-  }
-
-  @Override
-  public synchronized Collection<JobInProgress> getJobs(String queueName) {
-    return taskScheduler.getJobs(queueName);
-  }
-
-  @Override
-  public synchronized void refresh() throws IOException {
-    taskScheduler.refresh();
-  }
-
-  // Mesos Scheduler methods.
-  // These are synchronized, where possible. Some of these methods need to access the
-  // JobTracker, which can lead to a deadlock:
-  // See: https://issues.apache.org/jira/browse/MESOS-429
-  // The workaround employed here is to unsynchronize those methods needing access to
-  // the JobTracker state and use explicit synchronization instead as appropriate.
-  // TODO(bmahler): Provide a cleaner solution to this issue. One solution is to
-  // split up the Scheduler and TaskScheduler implementations in order to break the
-  // locking cycle. This would require a synchronized class to store the shared
-  // state across our Scheduler and TaskScheduler implementations, and provide
-  // atomic operations as needed.
-  @Override
-  public synchronized void registered(SchedulerDriver schedulerDriver,
-      FrameworkID frameworkID, MasterInfo masterInfo) {
-    LOG.info("Registered as " + frameworkID.getValue()
-        + " with master " + masterInfo);
-  }
-
-  @Override
-  public synchronized void reregistered(SchedulerDriver schedulerDriver,
-      MasterInfo masterInfo) {
-    LOG.info("Re-registered with master " + masterInfo);
-  }
-
-  public synchronized void killTracker(MesosTracker tracker) {
-    driver.killTask(tracker.taskId);
-    mesosTrackers.remove(tracker.host);
-  }
-
-  // For some reason, pendingMaps() and pendingReduces() doesn't return the
-  // values we expect. We observed negative values, which may be related to
-  // https://issues.apache.org/jira/browse/MAPREDUCE-1238. Below is the
-  // algorithm that is used to calculate the pending tasks within the Hadoop
-  // JobTracker sources (see 'printTaskSummary' in
-  // src/org/apache/hadoop/mapred/jobdetails_jsp.java).
-  private int getPendingTasks(TaskInProgress[] tasks) {
-    int totalTasks = tasks.length;
-    int runningTasks = 0;
-    int finishedTasks = 0;
-    int killedTasks = 0;
-    for (int i = 0; i < totalTasks; ++i) {
-      TaskInProgress task = tasks[i];
-      if (task == null) {
-        continue;
-      }
-      if (task.isComplete()) {
-        finishedTasks += 1;
-      } else if (task.isRunning()) {
-        runningTasks += 1;
-      } else if (task.wasKilled()) {
-        killedTasks += 1;
-      }
-    }
-    int pendingTasks = totalTasks - runningTasks - killedTasks - finishedTasks;
-    return pendingTasks;
-  }
-
-  // This method uses explicit synchronization in order to avoid deadlocks when
-  // accessing the JobTracker.
-  @Override
-  public void resourceOffers(SchedulerDriver schedulerDriver,
-      List<Offer> offers) {
-    // Before synchronizing, we pull all needed information from the JobTracker.
-    final HttpHost jobTrackerAddress =
-      new HttpHost(jobTracker.getHostname(), jobTracker.getTrackerPort());
-
-    final Collection<TaskTrackerStatus> taskTrackers = jobTracker.taskTrackers();
-
-    final List<JobInProgress> jobsInProgress = new ArrayList<JobInProgress>();
-    for (JobStatus status : jobTracker.jobsToComplete()) {
-      jobsInProgress.add(jobTracker.getJob(status.getJobID()));
-    }
-
-    synchronized (this) {
-      // Compute the number of pending maps and reduces.
-      int pendingMaps = 0;
-      int pendingReduces = 0;
-      for (JobInProgress progress : jobsInProgress) {
-        // JobStatus.pendingMaps/Reduces may return the wrong value on
-        // occasion.  This seems to be safer.
-        pendingMaps += getPendingTasks(progress.getTasks(TaskType.MAP));
-        pendingReduces += getPendingTasks(progress.getTasks(TaskType.REDUCE));
-      }
-
-      // Mark active (heartbeated) TaskTrackers and compute idle slots.
-      int idleMapSlots = 0;
-      int idleReduceSlots = 0;
-      int unhealthyTrackers = 0;
-
-      for (TaskTrackerStatus status : taskTrackers) {
-        if (!status.getHealthStatus().isNodeHealthy()) {
-          // Skip this node if it's unhealthy.
-          ++unhealthyTrackers;
-          continue;
-        }
-
-        HttpHost host = new HttpHost(status.getHost(), status.getHttpPort());
-        if (mesosTrackers.containsKey(host)) {
-          mesosTrackers.get(host).active = true;
-          mesosTrackers.get(host).timer.cancel();
-          idleMapSlots += status.getAvailableMapSlots();
-          idleReduceSlots += status.getAvailableReduceSlots();
-        }
-      }
-
-      // Consider the TaskTrackers that have yet to become active as being idle,
-      // otherwise we will launch excessive TaskTrackers.
-      int inactiveMapSlots = 0;
-      int inactiveReduceSlots = 0;
-      for (MesosTracker tracker : mesosTrackers.values()) {
-        if (!tracker.active) {
-          inactiveMapSlots += tracker.mapSlots;
-          inactiveReduceSlots += tracker.reduceSlots;
-        }
-      }
-
-      // To ensure Hadoop jobs begin promptly, we can specify a minimum number
-      // of 'hot slots' to be available for use.  This addresses the
-      // TaskTracker spin up delay that exists with Hadoop on Mesos.  This can
-      // be a nuisance with lower latency applications, such as ad-hoc Hive
-      // queries.
-      int minimumMapSlots = conf.getInt("mapred.mesos.total.map.slots.minimum", 0);
-      int minimumReduceSlots =
-        conf.getInt("mapred.mesos.total.reduce.slots.minimum", 0);
-
-      // Compute how many slots we need to allocate.
-      int neededMapSlots = Math.max(
-          minimumMapSlots - (idleMapSlots + inactiveMapSlots),
-          pendingMaps - (idleMapSlots + inactiveMapSlots));
-      int neededReduceSlots = Math.max(
-          minimumReduceSlots  - (idleReduceSlots + inactiveReduceSlots),
-          pendingReduces - (idleReduceSlots + inactiveReduceSlots));
-
-      LOG.info(join("\n", Arrays.asList(
-              "JobTracker Status",
-              "      Pending Map Tasks: " + pendingMaps,
-              "   Pending Reduce Tasks: " + pendingReduces,
-              "         Idle Map Slots: " + idleMapSlots,
-              "      Idle Reduce Slots: " + idleReduceSlots,
-              "     Inactive Map Slots: " + inactiveMapSlots
-              + " (launched but no hearbeat yet)",
-              "  Inactive Reduce Slots: " + inactiveReduceSlots
-              + " (launched but no hearbeat yet)",
-              "       Needed Map Slots: " + neededMapSlots,
-              "    Needed Reduce Slots: " + neededReduceSlots,
-              "     Unhealthy Trackers: " + unhealthyTrackers)));
-
-      // Launch TaskTrackers to satisfy the slot requirements.
-      // TODO(bmahler): Consider slotting intelligently.
-      // Ex: If more map slots are needed, but no reduce slots are needed,
-      // launch a map-only TaskTracker to better satisfy the slot needs.
-      for (Offer offer : offers) {
-        if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
-          driver.declineOffer(offer.getId());
-          continue;
-        }
-
-        // Ensure these values aren't < 0.
-        neededMapSlots = Math.max(0, neededMapSlots);
-        neededReduceSlots = Math.max(0, neededReduceSlots);
-
-        double cpus = -1.0;
-        double mem = -1.0;
-        double disk = -1.0;
-        Set<Integer> ports = new HashSet<Integer>();
-
-        // Pull out the cpus, memory, disk, and 2 ports from the offer.
-        for (Resource resource : offer.getResourcesList()) {
-          if (resource.getName().equals("cpus")
-              && resource.getType() == Value.Type.SCALAR) {
-            cpus = resource.getScalar().getValue();
-          } else if (resource.getName().equals("mem")
-              && resource.getType() == Value.Type.SCALAR) {
-            mem = resource.getScalar().getValue();
-          } else if (resource.getName().equals("disk")
-              && resource.getType() == Value.Type.SCALAR) {
-            disk = resource.getScalar().getValue();
-          } else if (resource.getName().equals("ports")
-              && resource.getType() == Value.Type.RANGES) {
-            for (Value.Range range : resource.getRanges().getRangeList()) {
-              Integer begin = (int)range.getBegin();
-              Integer end = (int)range.getEnd();
-              if (end < begin) {
-                LOG.warn("Ignoring invalid port range: begin=" + begin + " end=" + end);
-                continue;
-              }
-              while (begin <= end && ports.size() < 2) {
-                ports.add(begin);
-                begin += 1;
-              }
-            }
-          }
-        }
-
-        int mapSlotsMax = conf.getInt("mapred.tasktracker.map.tasks.maximum",
-            MAP_SLOTS_DEFAULT);
-        int reduceSlotsMax =
-          conf.getInt("mapred.tasktracker.reduce.tasks.maximum",
-              REDUCE_SLOTS_DEFAULT);
-
-        // What's the minimum number of map and reduce slots we should try to
-        // launch?
-        long mapSlots = 0;
-        long reduceSlots = 0;
-
-        double slotCpus = conf.getFloat("mapred.mesos.slot.cpus",
-            (float) SLOT_CPUS_DEFAULT);
-        double slotDisk = conf.getInt("mapred.mesos.slot.disk",
-            SLOT_DISK_DEFAULT);
-
-        int slotMem = conf.getInt("mapred.mesos.slot.mem",
-            SLOT_JVM_HEAP_DEFAULT);
-        long slotJVMHeap = Math.round((double)slotMem -
-            (JVM_MEM_OVERHEAD_PERCENT_DEFAULT * slotMem));
-
-        int tasktrackerMem = conf.getInt("mapred.mesos.tasktracker.mem",
-              TASKTRACKER_MEM_DEFAULT);
-        long tasktrackerJVMHeap = Math.round((double)tasktrackerMem -
-            (JVM_MEM_OVERHEAD_PERCENT_DEFAULT * tasktrackerMem));
-
-        // Minimum resource requirements for the container (TaskTracker + map/red
-        // tasks).
-        double containerCpus = TASKTRACKER_CPUS;
-        double containerMem = tasktrackerMem;
-        double containerDisk = 0;
-
-        // Determine how many slots we can allocate.
-        int slots = mapSlotsMax + reduceSlotsMax;
-        slots = (int)Math.min(slots, (cpus - containerCpus) / slotCpus);
-        slots = (int)Math.min(slots, (mem - containerMem) / slotMem);
-        slots = (int)Math.min(slots, (disk - containerDisk) / slotDisk);
-
-        // Is this offer too small for even the minimum slots?
-        if (slots < 1 || ports.size() < 2) {
-          LOG.info(join("\n", Arrays.asList(
-                  "Declining offer with insufficient resources for a TaskTracker: ",
-                  "  cpus: offered " + cpus + " needed " + containerCpus,
-                  "  mem : offered " + mem + " needed " + containerMem,
-                  "  disk: offered " + disk + " needed " + containerDisk,
-                  "  ports: " + (ports.size() < 2
-                    ? " less than 2 offered"
-                    : " at least 2 (sufficient)"))));
-
-          driver.declineOffer(offer.getId());
-          continue;
-        }
-
-        // Is the number of slots we need sufficiently small? If so, we can
-        // allocate exactly the number we need.
-        if (slots >= neededMapSlots + neededReduceSlots && neededMapSlots <
-            mapSlotsMax && neededReduceSlots < reduceSlotsMax) {
-          mapSlots = neededMapSlots;
-          reduceSlots = neededReduceSlots;
-        } else {
-          // Allocate slots fairly for this resource offer.
-          double mapFactor = (double)neededMapSlots / (neededMapSlots + neededReduceSlots);
-          double reduceFactor = (double)neededReduceSlots / (neededMapSlots + neededReduceSlots);
-          // To avoid map/reduce slot starvation, don't allow more than 50%
-          // spread between map/reduce slots when we need both mappers and
-          // reducers.
-          if (neededMapSlots > 0 && neededReduceSlots > 0) {
-            if (mapFactor < 0.25) {
-              mapFactor = 0.25;
-            } else if (mapFactor > 0.75) {
-              mapFactor = 0.75;
-            }
-            if (reduceFactor < 0.25) {
-              reduceFactor = 0.25;
-            } else if (reduceFactor > 0.75) {
-              reduceFactor = 0.75;
-            }
-          }
-          mapSlots = Math.min(Math.min((long)(mapFactor * slots), mapSlotsMax), neededMapSlots);
-          // The remaining slots are allocated for reduces.
-          slots -= mapSlots;
-          reduceSlots = Math.min(Math.min(slots, reduceSlotsMax), neededReduceSlots);
-        }
-
-        Iterator<Integer> portIter = ports.iterator();
-        HttpHost httpAddress = new HttpHost(offer.getHostname(), portIter.next());
-        HttpHost reportAddress = new HttpHost(offer.getHostname(), portIter.next());
-
-          // Check that this tracker is not already launched.  This problem was
-          // observed on a few occasions, but not reliably.  The main symptom was
-          // that entries in `mesosTrackers` were being lost, and task trackers
-          // would be 'lost' mysteriously (probably because the ports were in
-          // use).  This problem has since gone away with a rewrite of the port
-          // selection code, but the check + logging is left here.
-          // TODO(brenden): Diagnose this to determine root cause.
-
-        if (mesosTrackers.containsKey(httpAddress)) {
-          LOG.info(join("\n", Arrays.asList(
-                  "Declining offer because host/port combination is in use: ",
-                  "  cpus: offered " + cpus + " needed " + containerCpus,
-                  "  mem : offered " + mem + " needed " + containerMem,
-                  "  disk: offered " + disk + " needed " + containerDisk,
-                  "  ports: " + ports)));
-
-          driver.declineOffer(offer.getId());
-          continue;
-        }
-
-        TaskID taskId = TaskID.newBuilder()
-          .setValue("Task_Tracker_" + launchedTrackers++).build();
-
-        LOG.info("Launching task " + taskId.getValue() + " on "
-            + httpAddress.toString() + " with mapSlots=" + mapSlots + " reduceSlots=" + reduceSlots);
-
-        // Add this tracker to Mesos tasks.
-        mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId,
-              mapSlots, reduceSlots, this));
-
-        // Create the environment depending on whether the executor is going to be
-        // run locally.
-        // TODO(vinod): Do not pass the mapred config options as environment
-        // variables.
-        Protos.Environment.Builder envBuilder = Protos.Environment
-          .newBuilder()
-          .addVariables(
-              Protos.Environment.Variable
-              .newBuilder()
-              .setName("mapred.job.tracker")
-              .setValue(jobTrackerAddress.getHostName() + ':'
-                + jobTrackerAddress.getPort()))
-          .addVariables(
-              Protos.Environment.Variable
-              .newBuilder()
-              .setName("mapred.task.tracker.http.address")
-              .setValue(
-                httpAddress.getHostName() + ':' + httpAddress.getPort()))
-          .addVariables(
-              Protos.Environment.Variable
-              .newBuilder()
-              .setName("mapred.task.tracker.report.address")
-              .setValue(reportAddress.getHostName() + ':'
-                + reportAddress.getPort()))
-          .addVariables(
-              Protos.Environment.Variable.newBuilder()
-              .setName("mapred.map.child.java.opts")
-              .setValue("-Xmx" + slotJVMHeap + "m"))
-          .addVariables(
-              Protos.Environment.Variable.newBuilder()
-              .setName("mapred.reduce.child.java.opts")
-              .setValue("-Xmx" + slotJVMHeap + "m"))
-          .addVariables(
-              Protos.Environment.Variable.newBuilder()
-              .setName("HADOOP_HEAPSIZE")
-              .setValue("" + tasktrackerJVMHeap))
-          .addVariables(
-              Protos.Environment.Variable.newBuilder()
-              .setName("mapred.tasktracker.map.tasks.maximum")
-              .setValue("" + mapSlots))
-          .addVariables(
-              Protos.Environment.Variable.newBuilder()
-              .setName("mapred.tasktracker.reduce.tasks.maximum")
-              .setValue("" + reduceSlots));
-
-        // Set java specific environment, appropriately.
-        Map<String, String> env = System.getenv();
-        if (env.containsKey("JAVA_HOME")) {
-          envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
-              .setName("JAVA_HOME")
-              .setValue(env.get("JAVA_HOME")));
-        }
-
-        if (env.containsKey("JAVA_LIBRARY_PATH")) {
-          envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
-              .setName("JAVA_LIBRARY_PATH")
-              .setValue(env.get("JAVA_LIBRARY_PATH")));
-        }
-
-        // Command info differs when performing a local run.
-        CommandInfo commandInfo = null;
-        String master = conf.get("mapred.mesos.master", "local");
-
-        if (master.equals("local")) {
-          commandInfo = CommandInfo.newBuilder()
-            .setEnvironment(envBuilder)
-            .setValue(new File("bin/hadoop").getCanonicalPath() +
-                      " org.apache.hadoop.mapred.MesosExecutor")
-            .build();
-        } else {
-          String uri = conf.get("mapred.mesos.executor");
-          commandInfo = CommandInfo.newBuilder()
-            .setEnvironment(envBuilder)
-            .setValue("cd hadoop-* && " +
-                      "./bin/hadoop org.apache.hadoop.mapred.MesosExecutor")
-            .addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
-        }
-
-        TaskInfo info = TaskInfo
-          .newBuilder()
-          .setName(taskId.getValue())
-          .setTaskId(taskId)
-          .setSlaveId(offer.getSlaveId())
-          .addResources(
-              Resource
-              .newBuilder()
-              .setName("cpus")
-              .setType(Value.Type.SCALAR)
-              .setScalar(Value.Scalar.newBuilder().setValue(
-                  (mapSlots + reduceSlots) * slotCpus)))
-          .addResources(
-              Resource
-              .newBuilder()
-              .setName("mem")
-              .setType(Value.Type.SCALAR)
-              .setScalar(Value.Scalar.newBuilder().setValue(
-                  (mapSlots + reduceSlots) * slotMem)))
-          .addResources(
-              Resource
-              .newBuilder()
-              .setName("disk")
-              .setType(Value.Type.SCALAR)
-              .setScalar(Value.Scalar.newBuilder().setValue(
-                  (mapSlots + reduceSlots) * slotDisk)))
-          .addResources(
-              Resource
-              .newBuilder()
-              .setName("ports")
-              .setType(Value.Type.RANGES)
-              .setRanges(
-                Value.Ranges
-                .newBuilder()
-                .addRange(Value.Range.newBuilder()
-                  .setBegin(httpAddress.getPort())
-                  .setEnd(httpAddress.getPort()))
-                .addRange(Value.Range.newBuilder()
-                  .setBegin(reportAddress.getPort())
-                  .setEnd(reportAddress.getPort()))))
-          .setExecutor(
-              ExecutorInfo
-              .newBuilder()
-              .setExecutorId(ExecutorID.newBuilder().setValue(
-                  "executor_" + taskId.getValue()))
-              .setName("Hadoop TaskTracker")
-              .setSource(taskId.getValue())
-              .addResources(
-                Resource
-                .newBuilder()
-                .setName("cpus")
-                .setType(Value.Type.SCALAR)
-                .setScalar(Value.Scalar.newBuilder().setValue(
-                    (TASKTRACKER_CPUS))))
-              .addResources(
-                Resource
-                .newBuilder()
-                .setName("mem")
-                .setType(Value.Type.SCALAR)
-                .setScalar(Value.Scalar.newBuilder().setValue(
-                    (tasktrackerMem)))).setCommand(commandInfo))
-                    .build();
-
-        driver.launchTasks(offer.getId(), Arrays.asList(info));
-
-        neededMapSlots -= mapSlots;
-        neededReduceSlots -= reduceSlots;
-      }
-
-      if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
-        LOG.info("Satisfied map and reduce slots needed.");
-      } else {
-        LOG.info("Unable to fully satisfy needed map/reduce slots: "
-            + (neededMapSlots > 0 ? neededMapSlots + " map slots " : "")
-            + (neededReduceSlots > 0 ? neededReduceSlots + " reduce slots " : "")
-            + "remaining");
-      }
-    }
-  }
-
-  @Override
-  public synchronized void offerRescinded(SchedulerDriver schedulerDriver,
-      OfferID offerID) {
-    LOG.warn("Rescinded offer: " + offerID.getValue());
-  }
-
-  @Override
-  public synchronized void statusUpdate(SchedulerDriver schedulerDriver,
-      Protos.TaskStatus taskStatus) {
-    LOG.info("Status update of " + taskStatus.getTaskId().getValue()
-        + " to " + taskStatus.getState().name()
-        + " with message " + taskStatus.getMessage());
-
-    // Remove the TaskTracker if the corresponding Mesos task has reached a
-    // terminal state.
-    switch (taskStatus.getState()) {
-      case TASK_FINISHED:
-      case TASK_FAILED:
-      case TASK_KILLED:
-      case TASK_LOST:
-        // Make a copy to iterate over keys and delete values.
-        Set<HttpHost> trackers = new HashSet<HttpHost>(mesosTrackers.keySet());
-
-        // Remove the task from the map.
-        for (HttpHost tracker : trackers) {
-          if (mesosTrackers.get(tracker).taskId.equals(taskStatus.getTaskId())) {
-            LOG.info("Removing terminated TaskTracker: " + tracker);
-	    mesosTrackers.get(tracker).timer.cancel();
-            mesosTrackers.remove(tracker);
-          }
-        }
-        break;
-      case TASK_STAGING:
-      case TASK_STARTING:
-      case TASK_RUNNING:
-        break;
-      default:
-        LOG.error("Unexpected TaskStatus: " + taskStatus.getState().name());
-        break;
-    }
-  }
-
-  @Override
-  public synchronized void frameworkMessage(SchedulerDriver schedulerDriver,
-      ExecutorID executorID, SlaveID slaveID, byte[] bytes) {
-    LOG.info("Framework Message of " + bytes.length + " bytes"
-        + " from executor " + executorID.getValue()
-        + " on slave " + slaveID.getValue());
-  }
-
-  @Override
-  public synchronized void disconnected(SchedulerDriver schedulerDriver) {
-    LOG.warn("Disconnected from Mesos master.");
-  }
-
-  @Override
-  public synchronized void slaveLost(SchedulerDriver schedulerDriver,
-      SlaveID slaveID) {
-    LOG.warn("Slave lost: " + slaveID.getValue());
-  }
-
-  @Override
-  public synchronized void executorLost(SchedulerDriver schedulerDriver,
-      ExecutorID executorID, SlaveID slaveID, int status) {
-    LOG.warn("Executor " + executorID.getValue()
-        + " lost with status " + status + " on slave " + slaveID);
-  }
-
-  @Override
-  public synchronized void error(SchedulerDriver schedulerDriver, String s) {
-    LOG.error("Error from scheduler driver: " + s);
-  }
-
-  /**
-   * Used to track the our launched TaskTrackers.
-   */
-  private class MesosTracker {
-    public volatile HttpHost host;
-    public TaskID taskId;
-    public long mapSlots;
-    public long reduceSlots;
-    public volatile boolean active = false; // Set once tracked by the JobTracker.
-    public Timer timer;
-    public volatile MesosScheduler scheduler;
-
-    // Tracks Hadoop job tasks running on the tracker.
-    public Set<JobID> hadoopJobs = new HashSet<JobID>();
-
-    public MesosTracker(HttpHost host, TaskID taskId, long mapSlots,
-        long reduceSlots, MesosScheduler scheduler) {
-      this.host = host;
-      this.taskId = taskId;
-      this.mapSlots = mapSlots;
-      this.reduceSlots = reduceSlots;
-      this.scheduler = scheduler;
-
-      this.timer = new Timer();
-      timer.schedule(new TimerTask() {
-        @Override
-        public void run() {
-          synchronized (MesosTracker.this.scheduler) {
-            // If the tracker activated while we were awaiting to acquire the
-            // lock, return.
-            if (MesosTracker.this.active) return;
-
-            LOG.warn("Tracker " + MesosTracker.this.host + " failed to launch within " +
-              LAUNCH_TIMEOUT_MS / 1000 + " seconds, killing it");
-            MesosTracker.this.scheduler.killTracker(MesosTracker.this);
-          }
-        }
-      }, LAUNCH_TIMEOUT_MS);
-    }
-  }
-}


[2/5] git commit: Refactored Hadoop on Mesos from contrib to JAR.

Posted by be...@apache.org.
Refactored Hadoop on Mesos from contrib to JAR.

Review: https://reviews.apache.org/r/13225


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/25cb6f91
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/25cb6f91
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/25cb6f91

Branch: refs/heads/master
Commit: 25cb6f91862b34d1f471e9ef027a2b7123b10d20
Parents: f6d95e8
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Aug 1 21:40:11 2013 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Aug 2 14:25:30 2013 -0700

----------------------------------------------------------------------
 hadoop/HadoopPipes.cc.patch                     |  10 -
 hadoop/Makefile.am                              | 139 ---
 hadoop/NOTES                                    |   6 -
 hadoop/README.md                                |  54 ++
 hadoop/TUTORIAL.sh                              | 710 ----------------
 hadoop/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch |  14 -
 hadoop/hadoop-0.20.2-cdh3u3_mesos.patch         |  38 -
 hadoop/hadoop-0.20.2-cdh3u5_hadoop-env.sh.patch |  14 -
 hadoop/hadoop-0.20.2-cdh3u5_mesos.patch         |  38 -
 hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch    |  14 -
 hadoop/hadoop-0.20.205.0_mesos.patch            |  22 -
 ...adoop-2.0.0-mr1-cdh4.1.2_hadoop-env.sh.patch |  14 -
 hadoop/hadoop-2.0.0-mr1-cdh4.1.2_mesos.patch    |  22 -
 ...adoop-2.0.0-mr1-cdh4.2.1_hadoop-env.sh.patch |  14 -
 hadoop/hadoop-2.0.0-mr1-cdh4.2.1_mesos.patch    |  22 -
 hadoop/hadoop-7698-1.patch                      |  27 -
 hadoop/hadoop-gridmix.patch                     |  17 -
 hadoop/mapred-site.xml.patch                    |  54 --
 hadoop/mesos-executor                           |   3 -
 hadoop/mesos/build.xml                          |  28 -
 hadoop/mesos/ivy.xml                            |  42 -
 hadoop/mesos/ivy/libraries.properties           |   5 -
 .../org/apache/hadoop/mapred/MesosExecutor.java | 145 ----
 .../apache/hadoop/mapred/MesosScheduler.java    | 848 -------------------
 hadoop/pom.xml                                  | 110 +++
 .../org/apache/hadoop/mapred/MesosExecutor.java | 145 ++++
 .../apache/hadoop/mapred/MesosScheduler.java    | 845 ++++++++++++++++++
 27 files changed, 1154 insertions(+), 2246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/HadoopPipes.cc.patch
----------------------------------------------------------------------
diff --git a/hadoop/HadoopPipes.cc.patch b/hadoop/HadoopPipes.cc.patch
deleted file mode 100644
index e0ca33a..0000000
--- a/hadoop/HadoopPipes.cc.patch
+++ /dev/null
@@ -1,10 +0,0 @@
---- hadoop-2.0.0-mr1-cdh4.2.1/src/c++/pipes/impl/HadoopPipes.cc.old	2013-04-16 20:18:22.681061322 +0000
-+++ hadoop-2.0.0-mr1-cdh4.2.1/src/c++/pipes/impl/HadoopPipes.cc	2013-04-16 20:18:44.005060961 +0000
-@@ -34,6 +34,7 @@
- #include <pthread.h>
- #include <iostream>
- #include <fstream>
-+#include <unistd.h>
- 
- #include <openssl/hmac.h>
- #include <openssl/buffer.h>

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/Makefile.am
----------------------------------------------------------------------
diff --git a/hadoop/Makefile.am b/hadoop/Makefile.am
deleted file mode 100644
index 2702924..0000000
--- a/hadoop/Makefile.am
+++ /dev/null
@@ -1,139 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License
-
-EXTRA_DIST = TUTORIAL.sh hadoop-gridmix.patch				\
-  hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch				\
-  hadoop-0.20.2-cdh3u3_mesos.patch					\
-  hadoop-0.20.2-cdh3u5_hadoop-env.sh.patch				\
-  hadoop-0.20.2-cdh3u5_mesos.patch					\
-  hadoop-2.0.0-mr1-cdh4.1.2_hadoop-env.sh.patch				\
-  hadoop-2.0.0-mr1-cdh4.1.2_mesos.patch					\
-  hadoop-2.0.0-mr1-cdh4.2.1_hadoop-env.sh.patch				\
-  hadoop-2.0.0-mr1-cdh4.2.1_mesos.patch					\
-  hadoop-7698-1.patch							\
-  hadoop-0.20.205.0_hadoop-env.sh.patch hadoop-0.20.205.0_mesos.patch	\
-  HadoopPipes.cc.patch                                                  \
-  mapred-site.xml.patch mesos-executor mesos/build.xml			\
-  mesos/ivy/libraries.properties mesos/ivy.xml				\
-  mesos/src/java/org/apache/hadoop/mapred/MesosExecutor.java		\
-  mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
-
-# Defines some targets to run the Hadoop tutorial using a specified
-# distribution. At some point we might want to do this automagically
-# (i.e., as part of 'make check'). Note that we set the environment
-# variable TMOUT to 1 so that each prompt in the tutorial will return
-# after 1 second so no interaction from the user is required.
-hadoop-0.20.205.0:
-	if test "$(top_srcdir)" != "$(top_builddir)"; then \
-          cp -p $(srcdir)/TUTORIAL.sh .; \
-          cp -p $(srcdir)/hadoop-gridmix.patch .; \
-          cp -p $(srcdir)/hadoop-7698-1.patch .; \
-          cp -p $(srcdir)/hadoop-0.20.205.0_hadoop-env.sh.patch .; \
-          cp -p $(srcdir)/hadoop-0.20.205.0_mesos.patch .; \
-          cp -p $(srcdir)/mapred-site.xml.patch .; \
-          cp -rp $(srcdir)/mesos .; \
-          cp -p $(srcdir)/mesos-executor .; \
-        fi
-	rm -rf hadoop-0.20.205.0
-	@TMOUT=1 JAVA_HOME=$(JAVA_HOME) ./TUTORIAL.sh
-
-hadoop-0.20.2-cdh3u3:
-	if test "$(top_srcdir)" != "$(top_builddir)"; then \
-          cp -p $(srcdir)/TUTORIAL.sh .; \
-          cp -p $(srcdir)/hadoop-gridmix.patch .; \
-          cp -p $(srcdir)/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch .; \
-          cp -p $(srcdir)/hadoop-0.20.2-cdh3u3_mesos.patch .; \
-          cp -p $(srcdir)/mapred-site.xml.patch .; \
-          cp -rp $(srcdir)/mesos .; \
-          cp -p $(srcdir)/mesos-executor .; \
-        fi
-	rm -rf hadoop-0.20.2-cdh3u3
-	@TMOUT=1 JAVA_HOME=$(JAVA_HOME) ./TUTORIAL.sh 0.20.2-cdh3u3
-
-hadoop-0.20.2-cdh3u5:
-	if test "$(top_srcdir)" != "$(top_builddir)"; then \
-          cp -p $(srcdir)/TUTORIAL.sh .; \
-          cp -p $(srcdir)/hadoop-gridmix.patch .; \
-          cp -p $(srcdir)/hadoop-0.20.2-cdh3u5_hadoop-env.sh.patch .; \
-          cp -p $(srcdir)/hadoop-0.20.2-cdh3u5_mesos.patch .; \
-          cp -p $(srcdir)/mapred-site.xml.patch .; \
-          cp -rp $(srcdir)/mesos .; \
-          cp -p $(srcdir)/mesos-executor .; \
-        fi
-	rm -rf hadoop-0.20.2-cdh3u5
-	@TMOUT=1 JAVA_HOME=$(JAVA_HOME) ./TUTORIAL.sh 0.20.2-cdh3u5
-
-hadoop-2.0.0-mr1-cdh4.1.2:
-	if test "$(top_srcdir)" != "$(top_builddir)"; then \
-          cp -p $(srcdir)/TUTORIAL.sh .; \
-          cp -p $(srcdir)/hadoop-gridmix.patch .; \
-          cp -p $(srcdir)/hadoop-2.0.0-mr1-cdh4.1.2_hadoop-env.sh.patch .; \
-          cp -p $(srcdir)/hadoop-2.0.0-mr1-cdh4.1.2_mesos.patch .; \
-          cp -p $(srcdir)/mapred-site.xml.patch .; \
-          cp -p $(srcdir)/HadoopPipes.cc.patch .; \
-          cp -rp $(srcdir)/mesos .; \
-          cp -p $(srcdir)/mesos-executor .; \
-        fi
-	rm -rf hadoop-2.0.0-mr1-cdh4.1.2
-	@TMOUT=1 JAVA_HOME=$(JAVA_HOME) ./TUTORIAL.sh 2.0.0-mr1-cdh4.1.2
-
-hadoop-2.0.0-mr1-cdh4.2.1:
-	if test "$(top_srcdir)" != "$(top_builddir)"; then \
-          cp -p $(srcdir)/TUTORIAL.sh .; \
-          cp -p $(srcdir)/hadoop-gridmix.patch .; \
-          cp -p $(srcdir)/hadoop-2.0.0-mr1-cdh4.2.1_hadoop-env.sh.patch .; \
-          cp -p $(srcdir)/hadoop-2.0.0-mr1-cdh4.2.1_mesos.patch .; \
-          cp -p $(srcdir)/mapred-site.xml.patch .; \
-          cp -p $(srcdir)/HadoopPipes.cc.patch .; \
-          cp -rp $(srcdir)/mesos .; \
-          cp -p $(srcdir)/mesos-executor .; \
-        fi
-	rm -rf hadoop-2.0.0-mr1-cdh4.2.1
-	@TMOUT=1 JAVA_HOME=$(JAVA_HOME) ./TUTORIAL.sh 2.0.0-mr1-cdh4.2.1
-
-
-clean-local:
-	if test "$(top_srcdir)" != "$(top_builddir)"; then \
-          rm -f TUTORIAL.sh; \
-          rm -f hadoop-gridmix.patch; \
-          rm -f hadoop-7698-1.patch; \
-          rm -f hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch; \
-          rm -f hadoop-0.20.2-cdh3u3_mesos.patch; \
-          rm -f hadoop-0.20.2-cdh3u5_hadoop-env.sh.patch; \
-          rm -f hadoop-0.20.2-cdh3u5_mesos.patch; \
-          rm -f hadoop-2.0.0-mr1-cdh4.1.2_hadoop-env.sh.patch; \
-          rm -f hadoop-2.0.0-mr1-cdh4.1.2_mesos.patch; \
-          rm -f hadoop-2.0.0-mr1-cdh4.2.1_hadoop-env.sh.patch; \
-          rm -f hadoop-2.0.0-mr1-cdh4.2.1_mesos.patch; \
-          rm -f hadoop-0.20.205.0_hadoop-env.sh.patch; \
-          rm -f hadoop-0.20.205.0_mesos.patch; \
-          rm -f mapred-site.xml.patch; \
-          rm -f mesos-executor; \
-          rm -rf mesos; \
-        fi
-	rm -rf hadoop-0.20.2-cdh3u3
-	rm -f hadoop-0.20.2-cdh3u3.tar.gz
-	rm -rf hadoop-0.20.2-cdh3u5
-	rm -f hadoop-0.20.2-cdh3u5.tar.gz
-	rm -rf hadoop-2.0.0-mr1-cdh4.1.2
-	rm -f mr1-2.0.0-mr1-cdh4.1.2.tar.gz
-	rm -rf hadoop-2.0.0-mr1-cdh4.2.1
-	rm -f mr1-2.0.0-mr1-cdh4.2.1.tar.gz
-	rm -rf hadoop-0.20.205.0
-	rm -f hadoop-0.20.205.0.tar.gz
-
-
-.PHONY: hadoop-0.20.205.0 hadoop-0.20.2-cdh3u3 hadoop-0.20.2-cdh3u5 hadoop-2.0.0-mr1-cdh4.1.2 hadoop-2.0.0-mr1-cdh4.2.1

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/NOTES
----------------------------------------------------------------------
diff --git a/hadoop/NOTES b/hadoop/NOTES
deleted file mode 100644
index 1c9948c..0000000
--- a/hadoop/NOTES
+++ /dev/null
@@ -1,6 +0,0 @@
-We've patched GridMix (a contribution in Hadoop) because it was broken under Java 7.
-We can stop applying this custom patch when (if?) GridMix is patched in the Hadoop versions
-we support.
-
-We've also patched build.xml of hadoop-0.20.205.0 because its 'jsvc' target is broken.
-The patch is obtained from https://issues.apache.org/jira/browse/HADOOP-7698.

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/README.md
----------------------------------------------------------------------
diff --git a/hadoop/README.md b/hadoop/README.md
new file mode 100644
index 0000000..d5a99f6
--- /dev/null
+++ b/hadoop/README.md
@@ -0,0 +1,54 @@
+diff --git a/conf/mapred-site.xml b/conf/mapred-site.xml
+index 970c8fe..f9f272d 100644
+--- a/conf/mapred-site.xml
++++ b/conf/mapred-site.xml
+@@ -4,5 +4,48 @@
+ <!-- Put site-specific property overrides in this file. -->
+ 
+ <configuration>
+-
++  <property>
++    <name>mapred.job.tracker</name>
++    <value>localhost:54311</value>
++  </property>
++  <property>
++    <name>mapred.jobtracker.taskScheduler</name>
++    <value>org.apache.hadoop.mapred.MesosScheduler</value>
++  </property>
++  <property>
++    <name>mapred.mesos.taskScheduler</name>
++    <value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value>
++  </property>
++  <property>
++    <name>mapred.mesos.master</name>
++    <value>local</value>
++  </property>
++#
++# Make sure to uncomment the 'mapred.mesos.executor' property,
++# when running the Hadoop JobTracker on a real Mesos cluster.
++# NOTE: You need to MANUALLY upload the Mesos executor bundle
++# to the location that is set as the value of this property.
++#  <property>
++#    <name>mapred.mesos.executor</name>
++#    <value>hdfs://hdfs.name.node:port/hadoop.zip</value>
++#  </property>
++#
++# The properties below indicate the amount of resources
++# that are allocated to a Hadoop slot (i.e., map/reduce task) by Mesos.
++  <property>
++    <name>mapred.mesos.slot.cpus</name>
++    <value>1</value>
++  </property>
++  <property>
++    <name>mapred.mesos.slot.disk</name>
++    <!-- The value is in MB. -->
++    <value>1024</value>
++  </property>
++  <property>
++    <name>mapred.mesos.slot.mem</name>
++    <!-- Note that this is the total memory required for
++         JVM overhead (10% of this value) and the heap (-Xmx) of the task.
++         The value is in MB. -->
++    <value>1024</value>
++  </property>
+ </configuration>

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/TUTORIAL.sh
----------------------------------------------------------------------
diff --git a/hadoop/TUTORIAL.sh b/hadoop/TUTORIAL.sh
deleted file mode 100755
index f4285d5..0000000
--- a/hadoop/TUTORIAL.sh
+++ /dev/null
@@ -1,710 +0,0 @@
-#!/bin/bash
-
-# Determine the Hadoop distribution to use.
-if test -z "${1}"; then
-    distribution="0.20.205.0"
-    url="http://archive.apache.org/dist/hadoop/core/hadoop-0.20.205.0"
-    bundle="hadoop-0.20.205.0.tar.gz"
-elif test "${1}" = "0.20.2-cdh3u3"; then
-    distribution="0.20.2-cdh3u3"
-    url="http://archive.cloudera.com/cdh/3"
-    bundle="hadoop-0.20.2-cdh3u3.tar.gz"
-elif test "${1}" = "0.20.2-cdh3u5"; then
-    distribution="0.20.2-cdh3u5"
-    url="http://archive.cloudera.com/cdh/3"
-    bundle="hadoop-0.20.2-cdh3u5.tar.gz"
-elif test "${1}" = "2.0.0-mr1-cdh4.1.2"; then
-    distribution="2.0.0-mr1-cdh4.1.2"
-    url="http://archive.cloudera.com/cdh4/cdh/4"
-    bundle="mr1-2.0.0-mr1-cdh4.1.2.tar.gz"
-elif test "${1}" = "2.0.0-mr1-cdh4.2.1"; then
-    distribution="2.0.0-mr1-cdh4.2.1"
-    url="http://archive.cloudera.com/cdh4/cdh/4"
-    bundle="mr1-2.0.0-mr1-cdh4.2.1.tar.gz"
-fi
-
-hadoop="hadoop-${distribution}"
-
-# The potentially running JobTracker, that we need to kill.
-jobtracker_pid=
-
-# Trap Ctrl-C (signal 2).
-trap 'test ! -z ${jobtracker_pid} && kill ${jobtracker_pid}; echo; exit 1' 2
-
-
-# A helper function to run one or more commands.
-# If any command fails, the tutorial exits with a helpful message.
-function run() {
-  for command in "${@}"; do
-      eval ${command}
-
-      if test "$?" != 0; then
-          cat <<__EOF__
-
-${RED}Oh no! We failed to run '${command}'. If you need help try emailing:
-
-  mesos-dev@incubator.apache.org
-
-(Remember to include as much debug information as possible.)${NORMAL}
-
-__EOF__
-          exit 1
-      fi
-  done
-}
-
-
-# A helper function to execute a step of the tutorial (i.e., one or
-# more commands). Prints the command(s) out before they are run and
-# waits for the user to confirm. In addition, the commands are
-# appended to the summary.
-summary=""
-function execute() {
-  echo
-  for command in "${@}"; do
-      echo "  $ ${command}"
-  done
-  echo
-
-  read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
-  echo
-
-  for command in "${@}"; do
-      run "${command}"
-
-      # Append to the summary.
-      summary="${summary}
-$ ${command}"
-  done
-}
-
-
-# Make sure we start out in the right directory.
-cd `dirname ${0}`
-
-# Include wonderful colors for our tutorial!
-test -f ../support/colors.sh && . ../support/colors.sh
-
-# Make sure we have all the necessary files/directories we need.
-resources="TUTORIAL.sh \
-  hadoop-gridmix.patch \
-  ${hadoop}_hadoop-env.sh.patch \
-  ${hadoop}_mesos.patch \
-  mapred-site.xml.patch \
-  mesos \
-  mesos-executor"
-
-if test ${distribution} = "0.20.205.0"; then
-    resources="${resources} hadoop-7698-1.patch"
-fi
-
-if test ${distribution} = "2.0.0-mr1-cdh4.1.2" -o ${distribution} = "2.0.0-mr1-cdh4.2.1"; then
-    resources="${resources} HadoopPipes.cc.patch"
-fi
-
-for resource in `echo ${resources}`; do
-    if test ! -e ${resource}; then
-        cat <<__EOF__
-
-${RED}We seem to be missing ${resource} from the directory containing
-this tutorial and we can't continue without it. If you haven't
-made any modifications to this directory, please report this to:
-
-  mesos-dev@incubator.apache.org
-
-(Remember to include as much debug information as possible.)${NORMAL}
-
-__EOF__
-        exit 1
-    fi
-done
-
-# Make sure we have all the build tools we need.
-programs="mvn \
-  ant"
-
-for program in `echo ${programs}`; do
-    which ${program} > /dev/null
-    if test "$?" != 0; then
-        cat <<__EOF__
-
-${RED}We seem to be missing ${program} from PATH.  Please install
-${program} and re-run this tutorial.  If you still have troubles, please report
-this to:
-
-  mesos-dev@incubator.apache.org
-
-(Remember to include as much debug information as possible.)${NORMAL}
-
-__EOF__
-        exit 1
-    fi
-done
-
-
-# Start the tutorial!
-cat <<__EOF__
-
-Welcome to the tutorial on running Apache Hadoop on top of Mesos!
-During this ${BRIGHT}interactive${NORMAL} guide we'll ask some yes/no
-questions and you should enter your answer via 'Y' or 'y' for yes and
-'N' or 'n' for no.
-
-Let's begin!
-
-__EOF__
-
-
-# Check for JAVA_HOME.
-if test -z ${JAVA_HOME}; then
-    cat <<__EOF__
-
-${RED}You probably need to set JAVA_HOME in order to run this tutorial!${NORMAL}
-
-__EOF__
-    read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
-    echo
-fi
-
-
-# Download Hadoop.
-if test ! -e ${bundle}; then
-    cat <<__EOF__
-
-We'll try and grab ${hadoop} from ${url}/${bundle} for you now.
-
-__EOF__
-    execute "wget ${url}/${bundle}"
-else
-    cat <<__EOF__
-
-${RED}It looks like you've already downloaded ${bundle}, so
-we'll skip that step.${NORMAL}
-
-__EOF__
-fi
-
-
-# Extract the archive.
-if test ! -d ${hadoop}; then
-    cat <<__EOF__
-
-Let's start by extracting ${bundle}.
-
-__EOF__
-    execute "tar zxf ${bundle}"
-else
-    cat <<__EOF__
-
-${RED}It looks like you've already extracted ${bundle}, so
-we'll skip that step.${NORMAL}
-
-__EOF__
-fi
-
-
-# Change into Hadoop directory.
-cat <<__EOF__
-
-Okay, now let's change into the ${hadoop} directory in order to apply
-some patches, copy in the Mesos specific code, and build everything.
-
-__EOF__
-
-execute "cd ${hadoop}"
-
-
-# Apply the GridMix patch.
-cat <<__EOF__
-
-To run Hadoop on Mesos under Java 7 we need to apply a rather minor patch
-(hadoop-gridmix.patch) to GridMix, a contribution in Hadoop. See 'NOTES'
-file for more info.
-
-__EOF__
-
-# Check and see if the patch has already been applied.
-grep 'private String getEnumValues' \
-  src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java \
-  >/dev/null
-
-if test ${?} == "0"; then
-    cat <<__EOF__
-
-${RED}It looks like you've already applied the patch, so we'll skip
-applying it now.${NORMAL}
-
-__EOF__
-else
-    execute "patch -p1 <../hadoop-gridmix.patch"
-fi
-
-# Apply the 'jsvc' patch for hadoop-0.20.205.0.
-if test ${distribution} = "0.20.205.0"; then
-  cat <<__EOF__
-
-To build Mesos executor bundle, we need to apply a patch for
-'jsvc' target (hadoop-7698-1.patch) that is broken in build.xml.
-
-__EOF__
-
-  # Check and see if the patch has already been applied.
-  grep 'os-name' build.xml >/dev/null
-
-  if test ${?} == "0"; then
-      cat <<__EOF__
-
-  ${RED}It looks like you've already applied the patch, so we'll skip
-  applying it now.${NORMAL}
-
-__EOF__
-  else
-      execute "patch -p1 <../hadoop-7698-1.patch"
-  fi
-fi
-
-# Copy over the Mesos contrib components (and mesos-executor).
-cat <<__EOF__
-
-Now, we'll copy over the Mesos contrib components.
-
-__EOF__
-
-execute "cp -r ../mesos src/contrib" \
-  "cp -p ../mesos-executor bin"
-
-
-# Apply the patch to build the contrib.
-cat <<__EOF__
-
-In addition, we will need to edit ivy/libraries.properties and
-src/contrib/build.xml to hook the Mesos contrib component into the
-build. We've included a patch (${hadoop}_mesos.patch) to do that for
-you.
-
-__EOF__
-
-
-# Check and see if the patch has already been applied.
-grep mesos src/contrib/build.xml >/dev/null
-
-if test ${?} == "0"; then
-    cat <<__EOF__
-
-${RED}It looks like you've already applied the patch, so we'll skip
-applying it now.${NORMAL}
-
-__EOF__
-else
-    execute "patch -p1 <../${hadoop}_mesos.patch"
-fi
-
-
-# Determine MESOS_BUILD_DIR.
-cat <<__EOF__
-
-Okay, now we're ready to build and then run Hadoop! There are a couple
-important considerations. First, we need to locate the Mesos JAR and
-native library (i.e., libmesos.so on Linux and libmesos.dylib on Mac
-OS X). The Mesos JAR is used for both building and running, while the
-native library is only used for running. In addition, we need to
-locate the Protobuf JAR (if you don't already have one one your
-default classpath).
-
-This tutorial assumes you've built Mesos already. We'll use the
-environment variable MESOS_BUILD_DIR to denote this directory.
-
-__EOF__
-
-read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
-echo
-
-MESOS_BUILD_DIR=`cd ../../ && pwd`
-
-while test ! -f `echo ${MESOS_BUILD_DIR}/src/mesos-*.jar`; do
-    cat <<__EOF__
-
-${RED}We couldn't automagically determine MESOS_BUILD_DIR. It doesn't
-look like you used ${MESOS_BUILD_DIR} to build Mesos. Maybe you need
-to go back and run 'make' in that directory before
-continuing?${NORMAL}
-
-__EOF__
-
-    DEFAULT=${MESOS_BUILD_DIR}
-    read -e -p "${BRIGHT}Where is the build directory?${NORMAL} [${DEFAULT}] "
-    echo
-    test -z ${REPLY} && REPLY=${DEFAULT}
-    MESOS_BUILD_DIR=`cd ${REPLY} && pwd`
-done
-
-
-cat <<__EOF__
-
-Using ${BRIGHT}${MESOS_BUILD_DIR}${NORMAL} as the build directory.
-
-__EOF__
-
-LIBRARY=${MESOS_BUILD_DIR}/src/.libs/libmesos.so
-
-if test ! -f ${LIBRARY}; then
-    LIBRARY=${MESOS_BUILD_DIR}/src/.libs/libmesos.dylib
-fi
-
-if test ! -f ${LIBRARY}; then
-    cat <<__EOF__
-
-${RED}We seem to be having trouble locating the native library (it's
-not at ${MESOS_BUILD_DIR}/src/.libs/libmesos.so or
-${MESOS_BUILD_DIR}/src/.libs/libmesos.dylib).
-
-Have you already built Mesos? If you have, please report this to:
-
-  mesos-dev@incubator.apache.org
-
-(Remember to include as much debug information as possible.)${NORMAL}
-
-__EOF__
-    exit 1
-fi
-
-# Determine the "platform name" to copy the native library.
-cat <<__EOF__ >PlatformName.java
-public class PlatformName {
-  public static void main(String[] args) {
-    System.out.println(System.getProperty("os.name") + "-" +
-      System.getProperty("os.arch") + "-" +
-      System.getProperty("sun.arch.data.model"));
-    System.exit(0);
-  }
-}
-__EOF__
-
-run "${JAVA_HOME}/bin/javac PlatformName.java"
-
-PLATFORM=`${JAVA_HOME}/bin/java -Xmx32m PlatformName | sed -e "s/ /_/g"`
-
-run "rm PlatformName.*"
-
-
-# Copy over libraries.
-MESOS_JAR=`echo ${MESOS_BUILD_DIR}/src/mesos-*.jar`
-PROTOBUF_JAR=`echo ${MESOS_BUILD_DIR}/protobuf-*.jar`
-
-cat <<__EOF__
-
-Now we'll copy over the necessary libraries we need from the build
-directory.
-
-__EOF__
-
-execute "cp ${PROTOBUF_JAR} lib" \
-  "cp ${MESOS_JAR} lib" \
-  "mkdir -p lib/native/${PLATFORM}" \
-  "cp ${LIBRARY} lib/native/${PLATFORM}"
-
-if test ${distribution} = "0.20.205.0"; then
-    cat <<__EOF__
-
-The Apache Hadoop distribution requires that we also copy some
-libraries to multiple places. :/
-
-__EOF__
-
-    execute "cp ${PROTOBUF_JAR} share/hadoop/lib" \
-      "cp ${MESOS_JAR} share/hadoop/lib" \
-      "cp ${LIBRARY} lib"
-fi
-
-
-# Apply conf/mapred-site.xml patch.
-cat <<__EOF__
-
-First we need to configure Hadoop appropriately by modifying
-conf/mapred-site.xml (as is always required when running Hadoop).
-In order to run Hadoop on Mesos we need to set at least these four
-properties:
-
-  mapred.job.tracker
-
-  mapred.jobtracker.taskScheduler
-
-  mapred.mesos.master
-
-  mapred.mesos.executor
-
-The 'mapred.job.tracker' property should be set to the host:port where
-you want to launch the JobTracker (e.g., localhost:54321).
-
-The 'mapred.jobtracker.taskScheduler' property must be set to
-'org.apache.hadoop.mapred.MesosScheduler'.
-
-If you've alredy got a Mesos master running you can use that for
-'mapred.mesos.master', but for this tutorial well just use 'local' in
-order to bring up a Mesos "cluster" within the process. To connect to
-a remote master simply use the URL used to connect the slave to the
-master (e.g., localhost:5050).
-
-The 'mapred.mesos.executor' property must be set to the location
-of Mesos executor bundle so that Mesos slaves can download
-and run the executor.
-NOTE: You need to MANUALLY upload the Mesos executor bundle to
-the above location.
-
-
-We've got a prepared patch (mapred-site.xml.patch) for
-conf/mapred-site.xml that makes the changes necessary
-to get everything running with a local Mesos cluster.
-
-__EOF__
-
-read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
-echo
-
-patch --dry-run --silent --force -p1 \
-    <../mapred-site.xml.patch 1>/dev/null 2>&1
-
-if test ${?} == "1"; then
-    cat <<__EOF__
-
-${RED}It looks like conf/mapred-site.xml has been modified. You'll
-need to copy that to something else and restore the file to it's
-original contents before we'll be able to apply this patch.${NORMAL}
-
-__EOF__
-    DEFAULT="N"
-else
-    DEFAULT="Y"
-fi
-
-read -e -p "${BRIGHT}Patch conf/mapred-site.xml?${NORMAL} [${DEFAULT}] "
-echo
-test -z ${REPLY} && REPLY=${DEFAULT}
-if test ${REPLY} == "Y" -o ${REPLY} == "y"; then
-    execute "patch -p1 <../mapred-site.xml.patch"
-fi
-
-
-
-# Apply conf/hadoop-env.sh patch.
-cat <<__EOF__
-
-Most users will need to set JAVA_HOME in conf/hadoop-env.sh, but we'll
-also need to set MESOS_NATIVE_LIBRARY and update the HADOOP_CLASSPATH
-to include the Mesos contrib classfiles. We've prepared a patch
-(${hadoop}_hadoop-env.sh.patch) for conf/hadoop-env.sh that makes the
-necessary changes.
-
-__EOF__
-
-read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
-echo
-
-patch --dry-run --silent --force -p1 \
-    <../${hadoop}_hadoop-env.sh.patch 1>/dev/null 2>&1
-
-if test ${?} == "1"; then
-    cat <<__EOF__
-
-${RED}It looks like conf/hadoop-env.sh has been modified. You'll need
-to copy that to something else and restore the file to it's original
-contents before we'll be able to apply this patch.${NORMAL}
-
-__EOF__
-    DEFAULT="N"
-else
-    DEFAULT="Y"
-fi
-
-read -e -p "${BRIGHT}Patch conf/hadoop-env.sh?${NORMAL} [${DEFAULT}] "
-echo
-test -z ${REPLY} && REPLY=${DEFAULT}
-if test ${REPLY} == "Y" -o ${REPLY} == "y"; then
-    execute "patch -p1 <../${hadoop}_hadoop-env.sh.patch"
-fi
-
-if test ${distribution} = "2.0.0-mr1-cdh4.1.2" -o ${distribution} = "2.0.0-mr1-cdh4.2.1"; then
-    # Apply HadoopPipes.cc patch.
-    cat <<__EOF__
-
-    This version of Hadoop needs to be patched to build on GCC 4.7 and newer compilers.
-
-__EOF__
-
-    read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
-    echo
-
-    patch --dry-run --silent --force -p1 \
-	<../HadoopPipes.cc.patch 1>/dev/null 2>&1
-
-    if test ${?} == "1"; then
-	cat <<__EOF__
-
-    ${RED}It looks like conf/hadoop-env.sh has been modified. You'll need
-    to copy that to something else and restore the file to it's original
-    contents before we'll be able to apply this patch.${NORMAL}
-
-__EOF__
-	DEFAULT="N"
-    else
-	DEFAULT="Y"
-    fi
-
-    read -e -p "${BRIGHT}Patch src/c++/pipes/impl/HadoopPipes.cc?${NORMAL} [${DEFAULT}] "
-    echo
-    test -z ${REPLY} && REPLY=${DEFAULT}
-    if test ${REPLY} == "Y" -o ${REPLY} == "y"; then
-	execute "patch -p1 <../HadoopPipes.cc.patch"
-    fi
-fi
-
-# Build Hadoop and Mesos executor package that Mesos slaves can download
-# and execute.
-# TODO(vinod): Create a new ant target in build.xml that builds the executor.
-# NOTE: We specifically set the version when calling ant, to ensure we know
-# the resulting directory name.
-cat <<__EOF__
-
-Okay, let's try building Hadoop.
-
-__EOF__
-
-read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
-echo
-
-if test ${distribution} = "2.0.0-mr1-cdh4.1.2" -o ${distribution} = "2.0.0-mr1-cdh4.2.1"; then
-  cat <<__EOF__
-
-  We need to chmod +x install-sh scripts to compile
-  C++ components needed by the 'bin-package' target in CDH4.
-  We also need to specifically set the 'reactor.repo' property.
-
-__EOF__
-  execute "find . -name "*install-sh*" | xargs chmod +x" \
-    "ant -Dreactor.repo=file://$HOME/.m2/repository \
--Dversion=${distribution} -Dcompile.c++=true compile bin-package"
-else
-  execute "ant -Dversion=${distribution} compile bin-package"
-fi
-
-cat <<__EOF__
-
-To build the Mesos executor package, we first copy the
-necessary Mesos libraries.
-
-__EOF__
-
-# Copy the Mesos native library.
-execute "cd build/${hadoop}" \
-  "mkdir -p lib/native/${PLATFORM}" \
-  "cp ${LIBRARY} lib/native/${PLATFORM}"
-
-if test ${distribution} != "0.20.205.0"; then
-  cat <<__EOF__
-
-  We will remove Cloudera patches from the Mesos executor package
-  to save space (~62MB).
-
-__EOF__
-  execute "rm -rf cloudera"
-fi
-
-cat <<__EOF__
-
-  Finally, we will build the Mesos executor package as follows:
-
-__EOF__
-
-# We re-name the directory to 'hadoop' so that the Mesos executor
-# can be agnostic to the Hadoop version.
-execute "cd .." \
-  "mv ${hadoop} ${hadoop}-mesos" \
-  "tar czf ${hadoop}-mesos.tar.gz ${hadoop}-mesos/"
-
-# Start JobTracker.
-cat <<__EOF__
-
-${GREEN}Build success!${NORMAL}
-
-The Mesos distribution is now built in '${hadoop}-mesos'
-
-Now let's run something!
-
-We'll try and start the JobTracker from the Mesos distribution path via:
-  $ cd ${hadoop}-mesos
-  $ ./bin/hadoop jobtracker
-
-__EOF__
-
-read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
-echo
-
-
-# Fake the resources for this local slave, because the default resources
-# (esp. memory on MacOSX) offered by the slave might not be enough to
-# launch TaskTrackers.
-# TODO(vinod): Pipe these commands through 'execute()' so that they
-# can be appended to the summary.
-cd ${hadoop}-mesos
-export MESOS_RESOURCES="cpus:16;mem:16384;disk:307200;ports:[31000-32000]"
-./bin/hadoop jobtracker 1>/dev/null 2>&1 &
-
-jobtracker_pid=${!}
-
-cat <<__EOF__
-
-JobTracker started at ${BRIGHT}${jobtracker_pid}${NORMAL}.
-
-__EOF__
-
-echo -n "Waiting 5 seconds for it to start."
-
-for i in 1 2 3 4 5; do sleep 1 && echo -n " ."; done
-
-# Now let's run an example.
-cat <<__EOF__
-
-Alright, now let's run the "wordcount" example via:
-
-  $ ./bin/hadoop jar hadoop-examples-${distribution}.jar wordcount \
-  ${MESOS_BUILD_DIR}/src/mesos out
-
-__EOF__
-
-read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
-echo
-
-rm -rf out # TODO(benh): Ask about removing this first.
-
-./bin/hadoop jar hadoop-examples-${distribution}.jar wordcount \
-    ${MESOS_BUILD_DIR}/src/mesos out
-
-
-if test ${?} == "0"; then
-    cat <<__EOF__
-
-${GREEN}Success!${NORMAL} We'll kill the JobTracker and exit.
-
-Summary:
-${summary}
-
-Remember you'll need to make some changes to
-${hadoop}/conf/mapred-site.xml to run Hadoop on a
-real Mesos cluster:
-
-We hope you found this was helpful!
-
-__EOF__
-    kill ${jobtracker_pid}
-else
-    cat <<__EOF__
-
-${RED}Oh no, it failed! Try running the JobTracker and wordcount
-example manually ... it might be an issue with your environment that
-this tutorial didn't cover (if you find this to be the case, please
-create a JIRA for us and/or send us a code review).${NORMAL}
-
-__EOF__
-    kill ${jobtracker_pid}
-    exit 1
-fi

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch b/hadoop/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch
deleted file mode 100644
index 88c0754..0000000
--- a/hadoop/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch
+++ /dev/null
@@ -1,14 +0,0 @@
-diff --git a/conf/hadoop-env.sh b/conf/hadoop-env.sh
-index ada5bef..76aaf48 100644
---- a/conf/hadoop-env.sh
-+++ b/conf/hadoop-env.sh
-@@ -9,7 +9,8 @@
- # export JAVA_HOME=/usr/lib/j2sdk1.6-sun
- 
- # Extra Java CLASSPATH elements.  Optional.
--# export HADOOP_CLASSPATH="<extra_entries>:$HADOOP_CLASSPATH"
-+MESOS_CLASSPATH=${HADOOP_HOME}/contrib/mesos/hadoop-mesos-0.20.2-cdh3u3.jar:${HADOOP_HOME}/build/contrib/mesos/hadoop-mesos-0.20.2-cdh3u3.jar
-+export HADOOP_CLASSPATH=${MESOS_CLASSPATH}:${HADOOP_CLASSPATH}
-
- # The maximum amount of heap to use, in MB. Default is 1000.
- # export HADOOP_HEAPSIZE=2000

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-0.20.2-cdh3u3_mesos.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-0.20.2-cdh3u3_mesos.patch b/hadoop/hadoop-0.20.2-cdh3u3_mesos.patch
deleted file mode 100644
index 9284788..0000000
--- a/hadoop/hadoop-0.20.2-cdh3u3_mesos.patch
+++ /dev/null
@@ -1,38 +0,0 @@
-diff --git a/ivy/libraries.properties b/ivy/libraries.properties
-index 0b3c715..6b7770b 100644
---- a/ivy/libraries.properties
-+++ b/ivy/libraries.properties
-@@ -75,3 +75,5 @@ slf4j-log4j12.version=1.4.3
- wagon-http.version=1.0-beta-2
- xmlenc.version=0.52
- xerces.version=1.4.4
-+
-+protobuf-java.version=2.4.1
-diff --git a/src/contrib/build.xml b/src/contrib/build.xml
-index e41c132..593aecd 100644
---- a/src/contrib/build.xml
-+++ b/src/contrib/build.xml
-@@ -57,6 +57,7 @@
-       <fileset dir="." includes="capacity-scheduler/build.xml"/>
-       <fileset dir="." includes="mrunit/build.xml"/>
-       <fileset dir="." includes="gridmix/build.xml"/>
-+      <fileset dir="." includes="mesos/build.xml"/>
-     </subant>
-      <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
-      <fail if="testsfailed">Tests failed!</fail>
-diff --git a/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java b/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
-index 545c3c7..f6950be 100644
---- a/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
-+++ b/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
-@@ -257,6 +257,11 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
-     taskScheduler.refresh();
-   }
-
-+  @Override
-+  public synchronized void checkJobSubmission(JobInProgress job) throws IOException {
-+    taskScheduler.checkJobSubmission(job);
-+  }
-+
-   // Mesos Scheduler methods.
-   @Override
-   public synchronized void registered(

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-0.20.2-cdh3u5_hadoop-env.sh.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-0.20.2-cdh3u5_hadoop-env.sh.patch b/hadoop/hadoop-0.20.2-cdh3u5_hadoop-env.sh.patch
deleted file mode 100644
index 38eca6c..0000000
--- a/hadoop/hadoop-0.20.2-cdh3u5_hadoop-env.sh.patch
+++ /dev/null
@@ -1,14 +0,0 @@
-diff --git a/conf/hadoop-env.sh b/conf/hadoop-env.sh
-index ada5bef..76aaf48 100644
---- a/conf/hadoop-env.sh
-+++ b/conf/hadoop-env.sh
-@@ -9,7 +9,8 @@
- # export JAVA_HOME=/usr/lib/j2sdk1.6-sun
- 
- # Extra Java CLASSPATH elements.  Optional.
--# export HADOOP_CLASSPATH="<extra_entries>:$HADOOP_CLASSPATH"
-+MESOS_CLASSPATH=${HADOOP_HOME}/contrib/mesos/hadoop-mesos-0.20.2-cdh3u5.jar:${HADOOP_HOME}/build/contrib/mesos/hadoop-mesos-0.20.2-cdh3u5.jar
-+export HADOOP_CLASSPATH=${MESOS_CLASSPATH}:${HADOOP_CLASSPATH}
-
- # The maximum amount of heap to use, in MB. Default is 1000.
- # export HADOOP_HEAPSIZE=2000

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-0.20.2-cdh3u5_mesos.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-0.20.2-cdh3u5_mesos.patch b/hadoop/hadoop-0.20.2-cdh3u5_mesos.patch
deleted file mode 100644
index 9284788..0000000
--- a/hadoop/hadoop-0.20.2-cdh3u5_mesos.patch
+++ /dev/null
@@ -1,38 +0,0 @@
-diff --git a/ivy/libraries.properties b/ivy/libraries.properties
-index 0b3c715..6b7770b 100644
---- a/ivy/libraries.properties
-+++ b/ivy/libraries.properties
-@@ -75,3 +75,5 @@ slf4j-log4j12.version=1.4.3
- wagon-http.version=1.0-beta-2
- xmlenc.version=0.52
- xerces.version=1.4.4
-+
-+protobuf-java.version=2.4.1
-diff --git a/src/contrib/build.xml b/src/contrib/build.xml
-index e41c132..593aecd 100644
---- a/src/contrib/build.xml
-+++ b/src/contrib/build.xml
-@@ -57,6 +57,7 @@
-       <fileset dir="." includes="capacity-scheduler/build.xml"/>
-       <fileset dir="." includes="mrunit/build.xml"/>
-       <fileset dir="." includes="gridmix/build.xml"/>
-+      <fileset dir="." includes="mesos/build.xml"/>
-     </subant>
-      <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
-      <fail if="testsfailed">Tests failed!</fail>
-diff --git a/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java b/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
-index 545c3c7..f6950be 100644
---- a/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
-+++ b/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
-@@ -257,6 +257,11 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
-     taskScheduler.refresh();
-   }
-
-+  @Override
-+  public synchronized void checkJobSubmission(JobInProgress job) throws IOException {
-+    taskScheduler.checkJobSubmission(job);
-+  }
-+
-   // Mesos Scheduler methods.
-   @Override
-   public synchronized void registered(

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch b/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch
deleted file mode 100644
index 066f715..0000000
--- a/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch
+++ /dev/null
@@ -1,14 +0,0 @@
-diff --git a/conf/hadoop-env.sh b/conf/hadoop-env.sh
-index ada5bef..76aaf48 100644
---- a/conf/hadoop-env.sh
-+++ b/conf/hadoop-env.sh
-@@ -9,7 +9,8 @@
- # export JAVA_HOME=/usr/lib/j2sdk1.5-sun
- 
- # Extra Java CLASSPATH elements.  Optional.
--# export HADOOP_CLASSPATH=
-+MESOS_CLASSPATH=${HADOOP_HOME}/contrib/mesos/hadoop-mesos-0.20.205.0.jar:${HADOOP_HOME}/build/contrib/mesos/hadoop-mesos-0.20.205.0.jar
-+export HADOOP_CLASSPATH=${MESOS_CLASSPATH}:${HADOOP_CLASSPATH}
- 
- # The maximum amount of heap to use, in MB. Default is 1000.
- # export HADOOP_HEAPSIZE=2000

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-0.20.205.0_mesos.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-0.20.205.0_mesos.patch b/hadoop/hadoop-0.20.205.0_mesos.patch
deleted file mode 100644
index 35fe3ba..0000000
--- a/hadoop/hadoop-0.20.205.0_mesos.patch
+++ /dev/null
@@ -1,22 +0,0 @@
-diff --git a/ivy/libraries.properties b/ivy/libraries.properties
-index b47b4c3..713f0c1 100644
---- a/ivy/libraries.properties
-+++ b/ivy/libraries.properties
-@@ -86,3 +86,5 @@ slf4j-log4j12.version=1.4.3
- wagon-http.version=1.0-beta-2
- xmlenc.version=0.52
- xerces.version=1.4.4
-+
-+protobuf-java.version=2.4.1
-diff --git a/src/contrib/build.xml b/src/contrib/build.xml
-index 3c19e25..ecb7198 100644
---- a/src/contrib/build.xml
-+++ b/src/contrib/build.xml
-@@ -55,6 +55,7 @@
-       <fileset dir="." includes="fairscheduler/build.xml"/>
-       <fileset dir="." includes="capacity-scheduler/build.xml"/>
-       <fileset dir="." includes="gridmix/build.xml"/>
-+      <fileset dir="." includes="mesos/build.xml"/>
-     </subant>
-      <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
-      <fail if="testsfailed">Tests failed!</fail>

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-2.0.0-mr1-cdh4.1.2_hadoop-env.sh.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-2.0.0-mr1-cdh4.1.2_hadoop-env.sh.patch b/hadoop/hadoop-2.0.0-mr1-cdh4.1.2_hadoop-env.sh.patch
deleted file mode 100644
index 9dfe4a3..0000000
--- a/hadoop/hadoop-2.0.0-mr1-cdh4.1.2_hadoop-env.sh.patch
+++ /dev/null
@@ -1,14 +0,0 @@
-diff --git a/conf/hadoop-env.sh b/conf/hadoop-env.sh
-index ada5bef..76aaf48 100644
---- a/conf/hadoop-env.sh
-+++ b/conf/hadoop-env.sh
-@@ -9,7 +9,8 @@
- # export JAVA_HOME=/usr/lib/j2sdk1.6-sun
- 
- # Extra Java CLASSPATH elements.  Optional.
--# export HADOOP_CLASSPATH="<extra_entries>:$HADOOP_CLASSPATH"
-+MESOS_CLASSPATH=${HADOOP_HOME}/contrib/mesos/hadoop-mesos-2.0.0-mr1-cdh4.1.2.jar:${HADOOP_HOME}/build/contrib/mesos/hadoop-mesos-2.0.0-mr1-cdh4.1.2.jar
-+export HADOOP_CLASSPATH=${MESOS_CLASSPATH}:${HADOOP_CLASSPATH}
-
- # The maximum amount of heap to use, in MB. Default is 1000.
- # export HADOOP_HEAPSIZE=2000

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-2.0.0-mr1-cdh4.1.2_mesos.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-2.0.0-mr1-cdh4.1.2_mesos.patch b/hadoop/hadoop-2.0.0-mr1-cdh4.1.2_mesos.patch
deleted file mode 100644
index 2fc254f..0000000
--- a/hadoop/hadoop-2.0.0-mr1-cdh4.1.2_mesos.patch
+++ /dev/null
@@ -1,22 +0,0 @@
-diff --git a/ivy/libraries.properties b/ivy/libraries.properties
-index b47b4c3..713f0c1 100644
---- a/ivy/libraries.properties
-+++ b/ivy/libraries.properties
-@@ -86,3 +86,5 @@ slf4j-log4j12.version=1.4.3
- wagon-http.version=1.0-beta-2
- xmlenc.version=0.52
- xerces.version=1.4.4
-+
-+protobuf-java.version=2.4.1
-diff --git a/src/contrib/build.xml b/src/contrib/build.xml
-index 3c19e25..ecb7198 100644
---- a/src/contrib/build.xml
-+++ b/src/contrib/build.xml
-@@ -54,6 +54,7 @@
-       <fileset dir="." includes="fairscheduler/build.xml"/>
-       <fileset dir="." includes="capacity-scheduler/build.xml"/>
-       <!-- <fileset dir="." includes="gridmix/build.xml"/> -->
-+      <fileset dir="." includes="mesos/build.xml"/>
-     </subant>
-      <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
-      <fail if="testsfailed">Tests failed!</fail>

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-2.0.0-mr1-cdh4.2.1_hadoop-env.sh.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-2.0.0-mr1-cdh4.2.1_hadoop-env.sh.patch b/hadoop/hadoop-2.0.0-mr1-cdh4.2.1_hadoop-env.sh.patch
deleted file mode 100644
index d57c1e7..0000000
--- a/hadoop/hadoop-2.0.0-mr1-cdh4.2.1_hadoop-env.sh.patch
+++ /dev/null
@@ -1,14 +0,0 @@
-diff --git a/conf/hadoop-env.sh b/conf/hadoop-env.sh
-index ada5bef..76aaf48 100644
---- a/conf/hadoop-env.sh
-+++ b/conf/hadoop-env.sh
-@@ -9,7 +9,8 @@
- # export JAVA_HOME=/usr/lib/j2sdk1.6-sun
- 
- # Extra Java CLASSPATH elements.  Optional.
--# export HADOOP_CLASSPATH="<extra_entries>:$HADOOP_CLASSPATH"
-+MESOS_CLASSPATH=${HADOOP_HOME}/contrib/mesos/hadoop-mesos-2.0.0-mr1-cdh4.2.1.jar:${HADOOP_HOME}/build/contrib/mesos/hadoop-mesos-2.0.0-mr1-cdh4.2.1.jar
-+export HADOOP_CLASSPATH=${MESOS_CLASSPATH}:${HADOOP_CLASSPATH}
-
- # The maximum amount of heap to use, in MB. Default is 1000.
- # export HADOOP_HEAPSIZE=2000

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-2.0.0-mr1-cdh4.2.1_mesos.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-2.0.0-mr1-cdh4.2.1_mesos.patch b/hadoop/hadoop-2.0.0-mr1-cdh4.2.1_mesos.patch
deleted file mode 100644
index 8a39444..0000000
--- a/hadoop/hadoop-2.0.0-mr1-cdh4.2.1_mesos.patch
+++ /dev/null
@@ -1,22 +0,0 @@
-diff --git a/ivy/libraries.properties b/ivy/libraries.properties
-index b47b4c3..713f0c1 100644
---- a/ivy/libraries.properties
-+++ b/ivy/libraries.properties
-@@ -86,3 +86,5 @@ slf4j-log4j12.version=1.4.3
- xerces.version=1.4.4
-
- zookeeper.version=3.4.2
-+
-+protobuf-java.version=2.4.1
-diff --git a/src/contrib/build.xml b/src/contrib/build.xml
-index 3c19e25..ecb7198 100644
---- a/src/contrib/build.xml
-+++ b/src/contrib/build.xml
-@@ -54,6 +54,7 @@
-       <fileset dir="." includes="fairscheduler/build.xml"/>
-       <fileset dir="." includes="capacity-scheduler/build.xml"/>
-       <!-- <fileset dir="." includes="gridmix/build.xml"/> -->
-+      <fileset dir="." includes="mesos/build.xml"/>
-     </subant>
-      <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
-      <fail if="testsfailed">Tests failed!</fail>

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-7698-1.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-7698-1.patch b/hadoop/hadoop-7698-1.patch
deleted file mode 100644
index 0ab424b..0000000
--- a/hadoop/hadoop-7698-1.patch
+++ /dev/null
@@ -1,27 +0,0 @@
-diff --git a/build.xml b/build.xml
-index a5ba0f5..cc8a606 100644
---- a/build.xml        (revision 1177084)
-+++ b/build.xml        (working copy)
-@@ -170,6 +170,13 @@
- 
-   <property name="jsvc.build.dir" value="${build.dir}/jsvc.${os.arch}" />
-   <property name="jsvc.install.dir" value="${dist.dir}/libexec" /> 
-+  <exec executable="sh" outputproperty="os-name">
-+    <arg value="-c" />
-+    <arg value="uname -s | tr '[:upper:]' '[:lower:]'" />
-+  </exec>
-+  <condition property="os-arch" value="universal">
-+    <equals arg1="darwin" arg2="${os-name}" />
-+  </condition>
-   <condition property="os-arch" value="x86_64">
-     <and>
-       <os arch="amd64" />
-@@ -183,7 +190,7 @@
-       <os arch="i686" />
-     </or>
-   </condition>
--  <property name="jsvc.location" value="http://archive.apache.org/dist/commons/daemon/binaries/1.0.2/linux/commons-daemon-1.0.2-bin-linux-${os-arch}.tar.gz" />
-+  <property name="jsvc.location" value="http://archive.apache.org/dist/commons/daemon/binaries/1.0.2/${os-name}/commons-daemon-1.0.2-bin-${os-name}-${os-arch}.tar.gz" />
-   <property name="jsvc.dest.name" value="jsvc.${os.arch}.tar.gz" />
- 
-   <!-- task-controller properties set here -->

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-gridmix.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-gridmix.patch b/hadoop/hadoop-gridmix.patch
deleted file mode 100644
index 92b55e0..0000000
--- a/hadoop/hadoop-gridmix.patch
+++ /dev/null
@@ -1,17 +0,0 @@
-diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
-index a5ba0f5..cc8a606 100644
---- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
-+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
-@@ -609,10 +609,10 @@
-     }
-   }
-
--  private <T> String getEnumValues(Enum<? extends T>[] e) {
-+  private String getEnumValues(Enum<?>[] e) {
-     StringBuilder sb = new StringBuilder();
-     String sep = "";
--    for (Enum<? extends T> v : e) {
-+    for (Enum<?> v : e) {
-       sb.append(sep);
-       sb.append(v.name());
-       sep = "|";

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/mapred-site.xml.patch
----------------------------------------------------------------------
diff --git a/hadoop/mapred-site.xml.patch b/hadoop/mapred-site.xml.patch
deleted file mode 100644
index d5a99f6..0000000
--- a/hadoop/mapred-site.xml.patch
+++ /dev/null
@@ -1,54 +0,0 @@
-diff --git a/conf/mapred-site.xml b/conf/mapred-site.xml
-index 970c8fe..f9f272d 100644
---- a/conf/mapred-site.xml
-+++ b/conf/mapred-site.xml
-@@ -4,5 +4,48 @@
- <!-- Put site-specific property overrides in this file. -->
- 
- <configuration>
--
-+  <property>
-+    <name>mapred.job.tracker</name>
-+    <value>localhost:54311</value>
-+  </property>
-+  <property>
-+    <name>mapred.jobtracker.taskScheduler</name>
-+    <value>org.apache.hadoop.mapred.MesosScheduler</value>
-+  </property>
-+  <property>
-+    <name>mapred.mesos.taskScheduler</name>
-+    <value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value>
-+  </property>
-+  <property>
-+    <name>mapred.mesos.master</name>
-+    <value>local</value>
-+  </property>
-+#
-+# Make sure to uncomment the 'mapred.mesos.executor' property,
-+# when running the Hadoop JobTracker on a real Mesos cluster.
-+# NOTE: You need to MANUALLY upload the Mesos executor bundle
-+# to the location that is set as the value of this property.
-+#  <property>
-+#    <name>mapred.mesos.executor</name>
-+#    <value>hdfs://hdfs.name.node:port/hadoop.zip</value>
-+#  </property>
-+#
-+# The properties below indicate the amount of resources
-+# that are allocated to a Hadoop slot (i.e., map/reduce task) by Mesos.
-+  <property>
-+    <name>mapred.mesos.slot.cpus</name>
-+    <value>1</value>
-+  </property>
-+  <property>
-+    <name>mapred.mesos.slot.disk</name>
-+    <!-- The value is in MB. -->
-+    <value>1024</value>
-+  </property>
-+  <property>
-+    <name>mapred.mesos.slot.mem</name>
-+    <!-- Note that this is the total memory required for
-+         JVM overhead (10% of this value) and the heap (-Xmx) of the task.
-+         The value is in MB. -->
-+    <value>1024</value>
-+  </property>
- </configuration>

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/mesos-executor
----------------------------------------------------------------------
diff --git a/hadoop/mesos-executor b/hadoop/mesos-executor
deleted file mode 100755
index 7902e80..0000000
--- a/hadoop/mesos-executor
+++ /dev/null
@@ -1,3 +0,0 @@
-#!/bin/sh
-
-exec `dirname ${0}`/hadoop org.apache.hadoop.mapred.MesosExecutor

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/mesos/build.xml
----------------------------------------------------------------------
diff --git a/hadoop/mesos/build.xml b/hadoop/mesos/build.xml
deleted file mode 100644
index 9edf9e2..0000000
--- a/hadoop/mesos/build.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0"?>
-
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-
-<!--
-Before you can run these subtargets directly, you need
-to call at top-level: ant deploy-contrib compile-core-test
--->
-<project name="mesos" default="jar">
-
-  <import file="../build-contrib.xml"/>
-
-</project>

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/mesos/ivy.xml
----------------------------------------------------------------------
diff --git a/hadoop/mesos/ivy.xml b/hadoop/mesos/ivy.xml
deleted file mode 100644
index fc4fb16..0000000
--- a/hadoop/mesos/ivy.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" ?>
-<ivy-module version="1.0">
-  <info organisation="org.apache.hadoop" module="${ant.project.name}">
-    <license name="Apache 2.0"/>
-    <ivyauthor name="Apache Hadoop Team" url="http://hadoop.apache.org"/>
-    <description>
-        Apache Hadoop contrib
-    </description>
-  </info>
-  <configurations defaultconfmapping="default">
-    <!--these match the Maven configurations-->
-    <conf name="default" extends="master,runtime"/>
-    <conf name="master" description="contains the artifact but no dependencies"/>
-    <conf name="runtime" description="runtime but not the artifact" />
-
-    <conf name="common" visibility="private"
-      description="artifacts needed to compile/test the application"/>
-  </configurations>
-
-  <publications>
-    <!--get the artifact from our module name-->
-    <artifact conf="master"/>
-  </publications>
-  <dependencies>
-    <dependency org="commons-logging"
-      name="commons-logging"
-      rev="${commons-logging.version}"
-      conf="common->default"/>
-    <dependency org="log4j"
-      name="log4j"
-      rev="${log4j.version}"
-      conf="common->master"/>
-   <dependency org="junit"
-      name="junit"
-      rev="${junit.version}"
-      conf="common->default"/>
-   <dependency org="com.google.protobuf"
-     name="protobuf-java"
-     rev="${protobuf-java.version}"
-     conf="common->default"/>
-  </dependencies>
-</ivy-module>

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/mesos/ivy/libraries.properties
----------------------------------------------------------------------
diff --git a/hadoop/mesos/ivy/libraries.properties b/hadoop/mesos/ivy/libraries.properties
deleted file mode 100644
index d740528..0000000
--- a/hadoop/mesos/ivy/libraries.properties
+++ /dev/null
@@ -1,5 +0,0 @@
-#This properties file lists the versions of the various artifacts used by streaming.
-#It drives ivy and the generation of a maven POM
-
-#Please list the dependencies name with version if they are different from the ones
-#listed in the global libraries.properties file (in alphabetical order)

http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosExecutor.java b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosExecutor.java
deleted file mode 100644
index ccf0090..0000000
--- a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosExecutor.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.mesos.Executor;
-import org.apache.mesos.ExecutorDriver;
-import org.apache.mesos.MesosExecutorDriver;
-import org.apache.mesos.Protos.Environment.Variable;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.FrameworkInfo;
-import org.apache.mesos.Protos.SlaveInfo;
-import org.apache.mesos.Protos.Status;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.TaskState;
-import org.apache.mesos.Protos.TaskStatus;
-
-public class MesosExecutor implements Executor {
-  public static final Log LOG = LogFactory.getLog(MesosExecutor.class);
-
-  private JobConf conf;
-  private TaskTracker taskTracker;
-
-  @Override
-  public void registered(ExecutorDriver driver, ExecutorInfo executorInfo,
-      FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) {
-    LOG.info("Executor registered with the slave");
-
-    conf = new JobConf();
-
-    // Get TaskTracker's config options from environment variables set by the
-    // JobTracker.
-    if (executorInfo.getCommand().hasEnvironment()) {
-      for (Variable variable : executorInfo.getCommand().getEnvironment()
-          .getVariablesList()) {
-        LOG.info("Setting config option : " + variable.getName() + " to "
-            + variable.getValue());
-        conf.set(variable.getName(), variable.getValue());
-      }
-    }
-
-    // Get hostname from Mesos to make sure we match what it reports
-    // to the JobTracker.
-    conf.set("slave.host.name", slaveInfo.getHostname());
-
-    // Set the mapred.local directory inside the executor sandbox, so that
-    // different TaskTrackers on the same host do not step on each other.
-    conf.set("mapred.local.dir", System.getProperty("user.dir") + "/mapred");
-  }
-
-  @Override
-  public void launchTask(final ExecutorDriver driver, final TaskInfo task) {
-    LOG.info("Launching task : " + task.getTaskId().getValue());
-
-    // NOTE: We need to manually set the context class loader here because,
-    // the TaskTracker is unable to find LoginModule class otherwise.
-    Thread.currentThread().setContextClassLoader(
-        TaskTracker.class.getClassLoader());
-
-    try {
-      taskTracker = new TaskTracker(conf);
-    } catch (IOException e) {
-      LOG.fatal("Failed to start TaskTracker", e);
-      System.exit(1);
-    } catch (InterruptedException e) {
-      LOG.fatal("Failed to start TaskTracker", e);
-      System.exit(1);
-    }
-
-    // Spin up a TaskTracker in a new thread.
-    new Thread("TaskTracker Run Thread") {
-      @Override
-      public void run() {
-        taskTracker.run();
-
-        // Send a TASK_FINISHED status update.
-        // We do this here because we want to send it in a separate thread
-        // than was used to call killTask().
-        driver.sendStatusUpdate(TaskStatus.newBuilder()
-            .setTaskId(task.getTaskId())
-            .setState(TaskState.TASK_FINISHED)
-            .build());
-
-        // Give some time for the update to reach the slave.
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          LOG.error("Failed to sleep TaskTracker thread", e);
-        }
-
-        // Stop the executor.
-        driver.stop();
-      }
-    }.start();
-
-    driver.sendStatusUpdate(TaskStatus.newBuilder()
-        .setTaskId(task.getTaskId())
-        .setState(TaskState.TASK_RUNNING).build());
-  }
-
-  @Override
-  public void killTask(ExecutorDriver driver, TaskID taskId) {
-    LOG.info("Killing task : " + taskId.getValue());
-    try {
-      taskTracker.shutdown();
-    } catch (IOException e) {
-      LOG.error("Failed to shutdown TaskTracker", e);
-    } catch (InterruptedException e) {
-      LOG.error("Failed to shutdown TaskTracker", e);
-    }
-  }
-
-  @Override
-  public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) {
-    LOG.info("Executor reregistered with the slave");
-  }
-
-  @Override
-  public void disconnected(ExecutorDriver driver) {
-    LOG.info("Executor disconnected from the slave");
-  }
-
-  @Override
-  public void frameworkMessage(ExecutorDriver d, byte[] msg) {
-    LOG.info("Executor received framework message of length: " + msg.length
-        + " bytes");
-  }
-
-  @Override
-  public void error(ExecutorDriver d, String message) {
-    LOG.error("MesosExecutor.error: " + message);
-  }
-
-  @Override
-  public void shutdown(ExecutorDriver d) {
-    LOG.info("Executor asked to shutdown");
-  }
-
-  public static void main(String[] args) {
-    MesosExecutorDriver driver = new MesosExecutorDriver(new MesosExecutor());
-    System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1);
-  }
-}


[4/5] git commit: Updated README.md.

Posted by be...@apache.org.
Updated README.md.

Review: https://reviews.apache.org/r/13231


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3a7b9260
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3a7b9260
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3a7b9260

Branch: refs/heads/master
Commit: 3a7b9260bf49fbba82ef73163505ff20b18326b6
Parents: dba8c38
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Aug 1 23:40:40 2013 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Aug 2 14:28:28 2013 -0700

----------------------------------------------------------------------
 hadoop/README.md | 182 +++++++++++++++++++++++++++++++++++---------------
 1 file changed, 128 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3a7b9260/hadoop/README.md
----------------------------------------------------------------------
diff --git a/hadoop/README.md b/hadoop/README.md
index d5a99f6..36d7eb4 100644
--- a/hadoop/README.md
+++ b/hadoop/README.md
@@ -1,54 +1,128 @@
-diff --git a/conf/mapred-site.xml b/conf/mapred-site.xml
-index 970c8fe..f9f272d 100644
---- a/conf/mapred-site.xml
-+++ b/conf/mapred-site.xml
-@@ -4,5 +4,48 @@
- <!-- Put site-specific property overrides in this file. -->
- 
- <configuration>
--
-+  <property>
-+    <name>mapred.job.tracker</name>
-+    <value>localhost:54311</value>
-+  </property>
-+  <property>
-+    <name>mapred.jobtracker.taskScheduler</name>
-+    <value>org.apache.hadoop.mapred.MesosScheduler</value>
-+  </property>
-+  <property>
-+    <name>mapred.mesos.taskScheduler</name>
-+    <value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value>
-+  </property>
-+  <property>
-+    <name>mapred.mesos.master</name>
-+    <value>local</value>
-+  </property>
-+#
-+# Make sure to uncomment the 'mapred.mesos.executor' property,
-+# when running the Hadoop JobTracker on a real Mesos cluster.
-+# NOTE: You need to MANUALLY upload the Mesos executor bundle
-+# to the location that is set as the value of this property.
-+#  <property>
-+#    <name>mapred.mesos.executor</name>
-+#    <value>hdfs://hdfs.name.node:port/hadoop.zip</value>
-+#  </property>
-+#
-+# The properties below indicate the amount of resources
-+# that are allocated to a Hadoop slot (i.e., map/reduce task) by Mesos.
-+  <property>
-+    <name>mapred.mesos.slot.cpus</name>
-+    <value>1</value>
-+  </property>
-+  <property>
-+    <name>mapred.mesos.slot.disk</name>
-+    <!-- The value is in MB. -->
-+    <value>1024</value>
-+  </property>
-+  <property>
-+    <name>mapred.mesos.slot.mem</name>
-+    <!-- Note that this is the total memory required for
-+         JVM overhead (10% of this value) and the heap (-Xmx) of the task.
-+         The value is in MB. -->
-+    <value>1024</value>
-+  </property>
- </configuration>
+Hadoop on Mesos
+---------------
+
+#### Overview ####
+
+To run _Hadoop on Mesos_ you need to add the `hadoop-mesos-0.0.1.jar`
+library to your Hadoop distribution (any distribution that supports
+`hadoop-core-1.2.0` should work) and set some new configuration
+properties. Read on for details.
+
+#### Build ####
+
+You can build `hadoop-mesos-0.0.1.jar` using Maven:
+
+```
+$ mvn package
+```
+
+If successful, the JAR will be at `target/hadoop-mesos-0.0.1.jar`.
+
+> NOTE: If you want to build against a different version of Mesos than
+> the default you'll need to update `mesos-version` in `pom.xml`.
+
+We plan to provide already built JARs at http://repository.apache.org
+in the near future!
+
+#### Package ####
+
+You'll need to download an existing Hadoop distribution. For this
+guide, we'll use [CDH4.2.1][CDH4.2.1]. First grab the tar archive and
+extract it.
+
+```
+$ wget http://archive.cloudera.com/cdh4/cdh/4/mr1-2.0.0-mr1-cdh4.2.1.tar.gz
+...
+$ tar zxf mr1-2.0.0-mr1-cdh4.2.1.tar.gz
+```
+
+> **Take note**, the extracted directory is `hadoop-2.0.0-mr1-cdh4.2.1`.
+
+Now copy `hadoop-mesos-0.0.1.jar` into the `lib` folder.
+
+```
+$ cp /path/to/hadoop-mesos-0.0.1.jar hadoop-2.0.0-mr1-cdh4.2.1/lib/
+```
+
+_That's it!_ You now have a _Hadoop on Mesos_ distribution!
+
+[CDH4.2.1]: http://www.cloudera.com/content/support/en/documentation/cdh4-documentation/cdh4-documentation-v4-2-1.html
+
+#### Upload ####
+
+You'll want to upload your _Hadoop on Mesos_ distribution somewhere
+that Mesos can access in order to launch each `TaskTracker`. For
+example, if you're already running HDFS:
+
+```
+$ tar czf hadoop-2.0.0-mr1-cdh4.2.1.tar.gz hadoop-2.0.0-mr1-cdh4.2.1
+$ hadoop fs -put hadoop-2.0.0-mr1-cdh4.2.1.tar.gz /hadoop-2.0.0-mr1-cdh4.2.1.tar.gz
+```
+
+> **Consider** any permissions issues with your uploaded location
+> (i.e., on HDFS you'll probably want to make the file world
+> readable).
+
+Now you'll need to configure your `JobTracker` to launch each
+`TaskTracker` on Mesos!
+
+#### Configure ####
+
+Along with the normal configuration properties you might want to set
+to launch a `JobTracker`, you'll need to set some Mesos specific ones
+too.
+
+Here are the mandatory configuration properties for
+`conf/mapred-site.xml` (initialized to values representative of
+running in [pseudo distributed
+operation](http://hadoop.apache.org/docs/stable/single_node_setup.html#PseudoDistributed):
+
+```
+<property>
+  <name>mapred.job.tracker</name>
+  <value>localhost:9001</value>
+</property>
+<property>
+  <name>mapred.jobtracker.taskScheduler</name>
+  <value>org.apache.hadoop.mapred.MesosScheduler</value>
+</property>
+<property>
+  <name>mapred.mesos.taskScheduler</name>
+  <value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value>
+</property>
+<property>
+  <name>mapred.mesos.master</name>
+  <value>localhost:5050</value>
+</property>
+<property>
+  <name>mapred.mesos.executor</name>
+  <value>hdfs://localhost:9000/hadoop-2.0.0-mr1-cdh4.2.1.tar.gz</value>
+</property>
+```
+
+#### Start ####
+
+Now you can start the `JobTracker` but you'll need to include the path
+to the Mesos native library.
+
+On Linux:
+
+```
+$ MESOS_NATIVE_LIBRARY=/path/to/libmesos.so hadoop jobtracker
+```
+
+And on OS X:
+
+```
+$ MESOS_NATIVE_LIBRARY=/path/to/libmesos.dylib hadoop jobtracker
+```
+
+> **NOTE: You do not need to worry about distributing your Hadoop
+> configuration! All of the configuration properties read by the**
+> `JobTracker` **along with any necessary** `TaskTracker` **specific
+> _overrides_ will get serialized and passed to each** `TaskTracker`
+> **on startup.**
+
+_Please email user@mesos.apache.org with questions!_
+
+----------


[3/5] git commit: Get Hadoop code to compile!

Posted by be...@apache.org.
Get Hadoop code to compile!

Review: https://reviews.apache.org/r/13226


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dba8c38d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dba8c38d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dba8c38d

Branch: refs/heads/master
Commit: dba8c38df8694ed2a93af2445b28238b05e69315
Parents: 25cb6f9
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Aug 1 22:02:06 2013 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Aug 2 14:25:32 2013 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/hadoop/mapred/MesosScheduler.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/dba8c38d/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
index 7a1469d..0332dea 100644
--- a/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
+++ b/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
@@ -148,7 +148,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
                     + mesosTracker.host);
 
                 driver.killTask(mesosTracker.taskId);
-		tracker.timer.cancel();
+		mesosTracker.timer.cancel();
                 mesosTrackers.remove(tracker);
               }
             }
@@ -756,7 +756,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
         for (HttpHost tracker : trackers) {
           if (mesosTrackers.get(tracker).taskId.equals(taskStatus.getTaskId())) {
             LOG.info("Removing terminated TaskTracker: " + tracker);
-	    tracker.timer.cancel();
+	    mesosTrackers.get(tracker).timer.cancel();
             mesosTrackers.remove(tracker);
           }
         }