You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/01/28 03:19:44 UTC

svn commit: r1439225 - in /incubator/mesos/trunk/hadoop: ./ mesos/src/java/org/apache/hadoop/mapred/

Author: benh
Date: Mon Jan 28 02:19:44 2013
New Revision: 1439225

URL: http://svn.apache.org/viewvc?rev=1439225&view=rev
Log:
Refactored Hadoop port. The new port doesn't require patching Hadoop
(i.e., it's a self contained Hadoop contrib) and lets you use existing
schedulers (e.g., the fair scheduler or capacity scheduler)!  The
tradeoff, however, is that it doesn't take as much advantage of the
fine-grained nature of a Mesos task (i.e., there is no longer a 1-1
mapping between a Mesos task and a map/reduce task).

Added:
    incubator/mesos/trunk/hadoop/NOTES
    incubator/mesos/trunk/hadoop/hadoop-gridmix.patch
    incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosExecutor.java
Removed:
    incubator/mesos/trunk/hadoop/hadoop-0.20.2-cdh3u3.patch
    incubator/mesos/trunk/hadoop/hadoop-0.20.205.0.patch
    incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/FrameworkExecutor.java
    incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java
    incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/HadoopFrameworkMessage.java
    incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosTaskTrackerInstrumentation.java
Modified:
    incubator/mesos/trunk/hadoop/Makefile.am
    incubator/mesos/trunk/hadoop/TUTORIAL.sh
    incubator/mesos/trunk/hadoop/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch
    incubator/mesos/trunk/hadoop/hadoop-0.20.2-cdh3u3_mesos.patch
    incubator/mesos/trunk/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch
    incubator/mesos/trunk/hadoop/mapred-site.xml.patch
    incubator/mesos/trunk/hadoop/mesos-executor
    incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java

Modified: incubator/mesos/trunk/hadoop/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/Makefile.am?rev=1439225&r1=1439224&r2=1439225&view=diff
==============================================================================
--- incubator/mesos/trunk/hadoop/Makefile.am (original)
+++ incubator/mesos/trunk/hadoop/Makefile.am Mon Jan 28 02:19:44 2013
@@ -14,17 +14,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License
 
-EXTRA_DIST = TUTORIAL.sh hadoop-0.20.2-cdh3u3.patch			\
+EXTRA_DIST = TUTORIAL.sh hadoop-gridmix.patch				\
   hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch				\
-  hadoop-0.20.2-cdh3u3_mesos.patch hadoop-0.20.205.0.patch		\
+  hadoop-0.20.2-cdh3u3_mesos.patch hadoop-7698-1.patch			\
   hadoop-0.20.205.0_hadoop-env.sh.patch hadoop-0.20.205.0_mesos.patch	\
   mapred-site.xml.patch mesos-executor mesos/build.xml			\
   mesos/ivy/libraries.properties mesos/ivy.xml				\
-  mesos/src/java/org/apache/hadoop/mapred/FrameworkExecutor.java	\
-  mesos/src/java/org/apache/hadoop/mapred/FrameworkScheduler.java	\
-  mesos/src/java/org/apache/hadoop/mapred/HadoopFrameworkMessage.java	\
-  mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java		\
-  mesos/src/java/org/apache/hadoop/mapred/MesosTaskTrackerInstrumentation.java
+  mesos/src/java/org/apache/hadoop/mapred/MesosExecutor.java		\
+  mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
 
 # Defines some targets to run the Hadoop tutorial using a specified
 # distribution. At some point we might want to do this automagically
@@ -34,7 +31,8 @@ EXTRA_DIST = TUTORIAL.sh hadoop-0.20.2-c
 hadoop-0.20.205.0:
 	if test "$(top_srcdir)" != "$(top_builddir)"; then \
           cp -p $(srcdir)/TUTORIAL.sh .; \
-          cp -p $(srcdir)/hadoop-0.20.205.0.patch .; \
+          cp -p $(srcdir)/hadoop-gridmix.patch .; \
+          cp -p $(srcdir)/hadoop-7698-1.patch .; \
           cp -p $(srcdir)/hadoop-0.20.205.0_hadoop-env.sh.patch .; \
           cp -p $(srcdir)/hadoop-0.20.205.0_mesos.patch .; \
           cp -p $(srcdir)/mapred-site.xml.patch .; \
@@ -47,7 +45,7 @@ hadoop-0.20.205.0:
 hadoop-0.20.2-cdh3u3:
 	if test "$(top_srcdir)" != "$(top_builddir)"; then \
           cp -p $(srcdir)/TUTORIAL.sh .; \
-          cp -p $(srcdir)/hadoop-0.20.2-cdh3u3.patch .; \
+          cp -p $(srcdir)/hadoop-gridmix.patch .; \
           cp -p $(srcdir)/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch .; \
           cp -p $(srcdir)/hadoop-0.20.2-cdh3u3_mesos.patch .; \
           cp -p $(srcdir)/mapred-site.xml.patch .; \
@@ -61,10 +59,10 @@ hadoop-0.20.2-cdh3u3:
 clean-local:
 	if test "$(top_srcdir)" != "$(top_builddir)"; then \
           -rm -f TUTORIAL.sh; \
-          -rm -f hadoop-0.20.2-cdh3u3.patch; \
+          -rm -f hadoop-gridmix.patch; \
+          -rm -f hadoop-7698-1.patch; \
           -rm -f hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch; \
           -rm -f hadoop-0.20.2-cdh3u3_mesos.patch; \
-          -rm -f hadoop-0.20.205.0.patch; \
           -rm -f hadoop-0.20.205.0_hadoop-env.sh.patch; \
           -rm -f hadoop-0.20.205.0_mesos.patch; \
           -rm -f mapred-site.xml.patch; \

Added: incubator/mesos/trunk/hadoop/NOTES
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/NOTES?rev=1439225&view=auto
==============================================================================
--- incubator/mesos/trunk/hadoop/NOTES (added)
+++ incubator/mesos/trunk/hadoop/NOTES Mon Jan 28 02:19:44 2013
@@ -0,0 +1,6 @@
+We've patched GridMix (a contribution in Hadoop) because it was broken under Java 7.
+We can stop applying this custom patch when (if?) GridMix is patched in the Hadoop versions
+we support.
+
+We've also patched build.xml of hadoop-0.20.205.0 because its 'jsvc' target is broken.
+The patch is obtained from https://issues.apache.org/jira/browse/HADOOP-7698.

Modified: incubator/mesos/trunk/hadoop/TUTORIAL.sh
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/TUTORIAL.sh?rev=1439225&r1=1439224&r2=1439225&view=diff
==============================================================================
--- incubator/mesos/trunk/hadoop/TUTORIAL.sh (original)
+++ incubator/mesos/trunk/hadoop/TUTORIAL.sh Mon Jan 28 02:19:44 2013
@@ -3,7 +3,7 @@
 # Determine the Hadoop distribution to use.
 if test -z "${1}"; then
     distribution="0.20.205.0"
-    url="http://apache.cs.utah.edu/hadoop/common/hadoop-0.20.205.0"
+    url="http://archive.apache.org/dist/hadoop/core/hadoop-0.20.205.0"
 elif test "${1}" = "0.20.2-cdh3u3"; then
     distribution="0.20.2-cdh3u3"
     url="http://archive.cloudera.com/cdh/3"
@@ -41,7 +41,8 @@ test -f ../support/colors.sh && . ../sup
 
 # Make sure we have all the necessary files/directories we need.
 resources="TUTORIAL.sh \
-  ${hadoop}.patch \
+  hadoop-gridmix.patch \
+  hadoop-7698-1.patch \
   ${hadoop}_hadoop-env.sh.patch \
   ${hadoop}_mesos.patch \
   mapred-site.xml.patch \
@@ -151,21 +152,21 @@ echo
 cd ${hadoop} || fail "cd ${hadoop}"
 
 
-# Apply the Hadoop patch.
+# Apply the GridMix patch.
 cat <<__EOF__
 
-To run Hadoop on Mesos we need to apply a rather minor patch. The
-patch makes a small number of modifications in Hadoop. (Note that the
-changes to Hadoop have been committed in revisions r1033804 and
-r987589 so at some point we won't need to apply any patch at all.)
+To run Hadoop on Mesos under Java 7 we need to apply a rather minor patch
+to GridMix, a contribution in Hadoop. See 'NOTES' file for more info.
 We'll apply the patch with:
 
-  $ patch -p1 <../${hadoop}.patch
+  $ patch -p1 <../hadoop-gridmix.patch
 
 __EOF__
 
 # Check and see if the patch has already been applied.
-grep extraData src/mapred/org/apache/hadoop/mapred/Task.java >/dev/null
+grep 'private String getEnumValues' \
+  src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java \
+  >/dev/null
 
 if test ${?} == "0"; then
     cat <<__EOF__
@@ -177,9 +178,38 @@ __EOF__
 else
     read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
     echo
-    patch -p1 <../${hadoop}.patch || fail "patch -p1 <../${hadoop}.patch"
+    patch -p1 <../hadoop-gridmix.patch || \
+      fail "patch -p1 <../hadoop-gridmix.patch"
 fi
 
+# Apply the 'jsvc' patch for hadoop-0.20.205.0.
+if test ${distribution} = "0.20.205.0"; then
+  cat <<__EOF__
+
+To build Mesos executor bundle, we need to apply a patch for
+'jsvc' target that is broken in build.xml. We apply the patch with:
+
+  $ patch -p1 <../hadoop-7698-1.patch
+
+__EOF__
+
+  # Check and see if the patch has already been applied.
+  grep 'os-name' build.xml >/dev/null
+
+  if test ${?} == "0"; then
+      cat <<__EOF__
+
+  ${RED}It looks like you've already applied the patch, so we'll skip
+  applying it now.${NORMAL}
+
+__EOF__
+  else
+      read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
+      echo
+      patch -p1 <../hadoop-7698-1.patch || \
+        fail "patch -p1 <../hadoop-7698-1.patch"
+  fi
+fi
 
 # Copy over the Mesos contrib component (and mesos-executor) and apply
 # the patch to build the contrib.
@@ -368,9 +398,8 @@ cat <<__EOF__
 ${GREEN}Build success!${NORMAL} Now let's run something!
 
 First we need to configure Hadoop appropriately by modifying
-conf/mapred-site.xml (as is always required when running Hadoop). In
-order to run Hadoop on Mesos we need to set at least these three
-properties:
+conf/mapred-site.xml (as is always required when running Hadoop).
+In order to run Hadoop on Mesos we need to set at least these four properties:
 
   mapred.job.tracker
 
@@ -378,6 +407,8 @@ properties:
 
   mapred.mesos.master
 
+  mapred.mesos.executor
+
 The 'mapred.job.tracker' property should be set to the host:port where
 you want to launch the JobTracker (e.g., localhost:54321).
 
@@ -390,9 +421,16 @@ order to bring up a Mesos "cluster" with
 a remote master simply use the URL used to connect the slave to the
 master (e.g., localhost:5050).
 
+The 'mapred.mesos.executor' property must be set to the location
+of Mesos executor bundle so that Mesos slaves can download
+and run the executor.
+NOTE: You need to MANUALLY upload the Mesos executor bundle to
+the above location.
+
+
 We've got a prepared patch for conf/mapred-site.xml that makes the
-changes necessary to get everything running. We can apply that patch
-like so:
+changes necessary to get everything running with a local Mesos cluster.
+We can apply that patch like so:
 
   $ patch -p1 <../mapred-site.xml.patch
 
@@ -467,6 +505,52 @@ if test ${REPLY} == "Y" -o ${REPLY} == "
         fail "patch -p1 <../${hadoop}_hadoop-env.sh.patch"
 fi
 
+# Build Mesos executor package that Mesos slaves can download and execute.
+# TODO(vinod): Create a new ant target in build.xml that does this for us.
+# NOTE: We specifically set the version when calling ant, to ensure we know
+# the resulting directory name.
+cat <<__EOF__
+
+Okay, let's try building Mesos executor package:
+
+  $ ant -Dversion=${distribution} bin-package
+  $ cd build/${hadoop}
+  $ cp ${LIBRARY} lib/native/${PLATFORM}
+  $ rm -rf cloudera # Only for cdh3
+  $ cd ..
+  $ mv ${hadoop} hadoop
+  $ tar -cjf hadoop.tar.gz hadoop
+  $ cd ..
+
+__EOF__
+
+read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
+echo
+
+ant -Dversion=${distribution} bin-package || \
+  fail "ant -Dversion=${distribution} bin-package"
+
+cd build/${hadoop} || fail "cd build/${hadoop}"
+
+# Copy the Mesos native library.
+mkdir -p lib/native/${PLATFORM} || fail "mkdir -p lib/native/${PLATFORM}"
+cp ${LIBRARY} lib/native/${PLATFORM} || \
+  fail "cp ${LIBRARY} lib/native/${PLATFORM}"
+
+# Delete cloudera patches (only present in cdh3 versions of Hadoop)
+# to save space (62MB).
+rm -rf cloudera || fail "rm -rf cloudera"
+
+cd .. || fail "cd .."
+
+# We re-name the directory to 'hadoop' so that the Mesos executor
+# can be agnostic to the Hadoop version.
+mv ${hadoop} hadoop || fail "mv ${hadoop} hadoop"
+
+# Create the bundle.
+tar -cjf hadoop.tar.gz hadoop || fail "tar -cjf hadoop.tar.gz hadoop"
+
+cd .. || fail "cd.."
 
 # Start JobTracker.
 cat <<__EOF__
@@ -480,6 +564,10 @@ __EOF__
 read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
 echo
 
+# Fake the resources for this local slave, because the default resources
+# (esp. memory on MacOSX) offered by the slave might not be enough to
+# launch TaskTrackers.
+export MESOS_RESOURCES="cpus:16;mem:16384;disk:307200;ports:[31000-32000]"
 ./bin/hadoop jobtracker 1>/dev/null 2>&1 &
 
 jobtracker_pid=${!}
@@ -546,9 +634,18 @@ cat <<__EOF__
   $ ant
   $ patch -p1 <../mapred-site.xml.patch
   $ patch -p1 <../${hadoop}_hadoop-env.sh.patch
-
-Remember you'll need to change ${hadoop}/conf/mapred-site.xml to
-connect to a Mesos cluster (the patch just uses 'local').
+  $ ant -Dversion=${distribution} bin-package
+  $ cd build/${hadoop}
+  $ cp ${LIBRARY} lib/native/${PLATFORM}
+  $ rm -rf cloudera
+  $ cd ..
+  $ mv ${hadoop} hadoop
+  $ tar -cjf hadoop.tar.gz hadoop
+  $ cd ..
+
+Remember you'll need to make some changes to
+${hadoop}/conf/mapred-site.xml to run Hadoop on a
+real Mesos cluster:
 
 We hope you found this was helpful!
 

Modified: incubator/mesos/trunk/hadoop/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch?rev=1439225&r1=1439224&r2=1439225&view=diff
==============================================================================
--- incubator/mesos/trunk/hadoop/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch (original)
+++ incubator/mesos/trunk/hadoop/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch Mon Jan 28 02:19:44 2013
@@ -7,7 +7,7 @@ index ada5bef..76aaf48 100644
  
  # Extra Java CLASSPATH elements.  Optional.
 -# export HADOOP_CLASSPATH="<extra_entries>:$HADOOP_CLASSPATH"
-+export HADOOP_CLASSPATH=${HADOOP_HOME}/build/contrib/mesos/classes
++export HADOOP_CLASSPATH=${HADOOP_HOME}/build/contrib/mesos/classes:$HADOOP_CLASSPATH
 
  # The maximum amount of heap to use, in MB. Default is 1000.
  # export HADOOP_HEAPSIZE=2000

Modified: incubator/mesos/trunk/hadoop/hadoop-0.20.2-cdh3u3_mesos.patch
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/hadoop-0.20.2-cdh3u3_mesos.patch?rev=1439225&r1=1439224&r2=1439225&view=diff
==============================================================================
--- incubator/mesos/trunk/hadoop/hadoop-0.20.2-cdh3u3_mesos.patch (original)
+++ incubator/mesos/trunk/hadoop/hadoop-0.20.2-cdh3u3_mesos.patch Mon Jan 28 02:19:44 2013
@@ -20,3 +20,19 @@ index e41c132..593aecd 100644
      </subant>
       <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
       <fail if="testsfailed">Tests failed!</fail>
+diff --git a/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java b/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
+index 545c3c7..f6950be 100644
+--- a/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
++++ b/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
+@@ -257,6 +257,11 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
+     taskScheduler.refresh();
+   }
+
++  @Override
++  public synchronized void checkJobSubmission(JobInProgress job) throws IOException {
++    taskScheduler.checkJobSubmission(job);
++  }
++
+   // Mesos Scheduler methods.
+   @Override
+   public synchronized void registered(

Modified: incubator/mesos/trunk/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch?rev=1439225&r1=1439224&r2=1439225&view=diff
==============================================================================
--- incubator/mesos/trunk/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch (original)
+++ incubator/mesos/trunk/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch Mon Jan 28 02:19:44 2013
@@ -7,7 +7,7 @@ index ada5bef..76aaf48 100644
  
  # Extra Java CLASSPATH elements.  Optional.
 -# export HADOOP_CLASSPATH=
-+export HADOOP_CLASSPATH=${HADOOP_HOME}/build/contrib/mesos/classes
++export HADOOP_CLASSPATH=${HADOOP_HOME}/build/contrib/mesos/classes:$HADOOP_CLASSPATH
  
  # The maximum amount of heap to use, in MB. Default is 1000.
  # export HADOOP_HEAPSIZE=2000

Added: incubator/mesos/trunk/hadoop/hadoop-gridmix.patch
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/hadoop-gridmix.patch?rev=1439225&view=auto
==============================================================================
--- incubator/mesos/trunk/hadoop/hadoop-gridmix.patch (added)
+++ incubator/mesos/trunk/hadoop/hadoop-gridmix.patch Mon Jan 28 02:19:44 2013
@@ -0,0 +1,17 @@
+diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
+index a5ba0f5..cc8a606 100644
+--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
++++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
+@@ -609,3 +609,2 @@
+     }
+   }
+
+-  private <T> String getEnumValues(Enum<? extends T>[] e) {
++  private String getEnumValues(Enum<?>[] e) {
+     StringBuilder sb = new StringBuilder();
+     String sep = "";
+-    for (Enum<? extends T> v : e) {
++    for (Enum<?> v : e) {
+       sb.append(sep);
+       sb.append(v.name());
+       sep = "|";

Modified: incubator/mesos/trunk/hadoop/mapred-site.xml.patch
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/mapred-site.xml.patch?rev=1439225&r1=1439224&r2=1439225&view=diff
==============================================================================
--- incubator/mesos/trunk/hadoop/mapred-site.xml.patch (original)
+++ incubator/mesos/trunk/hadoop/mapred-site.xml.patch Mon Jan 28 02:19:44 2013
@@ -2,7 +2,7 @@ diff --git a/conf/mapred-site.xml b/conf
 index 970c8fe..f9f272d 100644
 --- a/conf/mapred-site.xml
 +++ b/conf/mapred-site.xml
-@@ -4,5 +4,16 @@
+@@ -4,5 +4,48 @@
  <!-- Put site-specific property overrides in this file. -->
  
  <configuration>
@@ -16,7 +16,39 @@ index 970c8fe..f9f272d 100644
 +    <value>org.apache.hadoop.mapred.MesosScheduler</value>
 +  </property>
 +  <property>
++    <name>mapred.mesos.taskScheduler</name>
++    <value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value>
++  </property>
++  <property>
 +    <name>mapred.mesos.master</name>
 +    <value>local</value>
 +  </property>
++#
++# Make sure to uncomment the 'mapred.mesos.executor' property,
++# when running the Hadoop JobTracker on a real Mesos cluster.
++# NOTE: You need to MANUALLY upload the Mesos executor bundle
++# to the location that is set as the value of this property.
++#  <property>
++#    <name>mapred.mesos.executor</name>
++#    <value>hdfs://hdfs.name.node:port/hadoop.zip</value>
++#  </property>
++#
++# The properties below indicate the amount of resources
++# that are allocated to a Hadoop slot (i.e., map/reduce task) by Mesos.
++  <property>
++    <name>mapred.mesos.slot.cpus</name>
++    <value>0.2</value>
++  </property>
++  <property>
++    <name>mapred.mesos.slot.disk</name>
++    <!-- The value is in MB. -->
++    <value>1024</value>
++  </property>
++  <property>
++    <name>mapred.mesos.slot.mem</name>
++    <!-- Note that this is the total memory required for
++         JVM overhead (256 MB) and the heap (-Xmx) of the task.
++         The value is in MB. -->
++    <value>512</value>
++  </property>
  </configuration>

Modified: incubator/mesos/trunk/hadoop/mesos-executor
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/mesos-executor?rev=1439225&r1=1439224&r2=1439225&view=diff
==============================================================================
--- incubator/mesos/trunk/hadoop/mesos-executor (original)
+++ incubator/mesos/trunk/hadoop/mesos-executor Mon Jan 28 02:19:44 2013
@@ -1,3 +1,3 @@
 #!/bin/sh
 
-exec `dirname ${0}`/hadoop org.apache.hadoop.mapred.FrameworkExecutor
+exec `dirname ${0}`/hadoop org.apache.hadoop.mapred.MesosExecutor

Added: incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosExecutor.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosExecutor.java?rev=1439225&view=auto
==============================================================================
--- incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosExecutor.java (added)
+++ incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosExecutor.java Mon Jan 28 02:19:44 2013
@@ -0,0 +1,145 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.mesos.Executor;
+import org.apache.mesos.ExecutorDriver;
+import org.apache.mesos.MesosExecutorDriver;
+import org.apache.mesos.Protos.Environment.Variable;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Protos.SlaveInfo;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.TaskStatus;
+
+public class MesosExecutor implements Executor {
+  public static final Log LOG = LogFactory.getLog(MesosExecutor.class);
+
+  private JobConf conf;
+  private TaskTracker taskTracker;
+
+  @Override
+  public void registered(ExecutorDriver driver, ExecutorInfo executorInfo,
+      FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) {
+    LOG.info("Executor registered with the slave");
+
+    conf = new JobConf();
+
+    // Get TaskTracker's config options from environment variables set by the
+    // JobTracker.
+    if (executorInfo.getCommand().hasEnvironment()) {
+      for (Variable variable : executorInfo.getCommand().getEnvironment()
+          .getVariablesList()) {
+        LOG.info("Setting config option : " + variable.getName() + " to "
+            + variable.getValue());
+        conf.set(variable.getName(), variable.getValue());
+      }
+    }
+
+    // Get hostname from Mesos to make sure we match what it reports
+    // to the JobTracker.
+    conf.set("slave.host.name", slaveInfo.getHostname());
+
+    // Set the mapred.local directory inside the executor sandbox, so that
+    // different TaskTrackers on the same host do not step on each other.
+    conf.set("mapred.local.dir", System.getProperty("user.dir") + "/mapred");
+  }
+
+  @Override
+  public void launchTask(final ExecutorDriver driver, final TaskInfo task) {
+    LOG.info("Launching task : " + task.getTaskId().getValue());
+
+    // NOTE: We need to manually set the context class loader here because,
+    // the TaskTracker is unable to find LoginModule class otherwise.
+    Thread.currentThread().setContextClassLoader(
+        TaskTracker.class.getClassLoader());
+
+    try {
+      taskTracker = new TaskTracker(conf);
+    } catch (IOException e) {
+      LOG.fatal("Failed to start TaskTracker", e);
+      System.exit(1);
+    } catch (InterruptedException e) {
+      LOG.fatal("Failed to start TaskTracker", e);
+      System.exit(1);
+    }
+
+    // Spin up a TaskTracker in a new thread.
+    new Thread("TaskTracker Run Thread") {
+      @Override
+      public void run() {
+        taskTracker.run();
+
+        // Send a TASK_FINISHED status update.
+        // We do this here because we want to send it in a separate thread
+        // than was used to call killTask().
+        driver.sendStatusUpdate(TaskStatus.newBuilder()
+            .setTaskId(task.getTaskId())
+            .setState(TaskState.TASK_FINISHED)
+            .build());
+
+        // Give some time for the update to reach the slave.
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          LOG.error("Failed to sleep TaskTracker thread", e);
+        }
+
+        // Stop the executor.
+        driver.stop();
+      }
+    }.start();
+
+    driver.sendStatusUpdate(TaskStatus.newBuilder()
+        .setTaskId(task.getTaskId())
+        .setState(TaskState.TASK_RUNNING).build());
+  }
+
+  @Override
+  public void killTask(ExecutorDriver driver, TaskID taskId) {
+    LOG.info("Killing task : " + taskId.getValue());
+    try {
+      taskTracker.shutdown();
+    } catch (IOException e) {
+      LOG.error("Failed to shutdown TaskTracker", e);
+    } catch (InterruptedException e) {
+      LOG.error("Failed to shutdown TaskTracker", e);
+    }
+  }
+
+  @Override
+  public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) {
+    LOG.info("Executor reregistered with the slave");
+  }
+
+  @Override
+  public void disconnected(ExecutorDriver driver) {
+    LOG.info("Executor disconnected from the slave");
+  }
+
+  @Override
+  public void frameworkMessage(ExecutorDriver d, byte[] msg) {
+    LOG.info("Executor received framework message of length: " + msg.length
+        + " bytes");
+  }
+
+  @Override
+  public void error(ExecutorDriver d, String message) {
+    LOG.error("MesosExecutor.error: " + message);
+  }
+
+  @Override
+  public void shutdown(ExecutorDriver d) {
+    LOG.info("Executor asked to shutdown");
+  }
+
+  public static void main(String[] args) {
+    MesosExecutorDriver driver = new MesosExecutorDriver(new MesosExecutor());
+    System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1);
+  }
+}

Modified: incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java?rev=1439225&r1=1439224&r2=1439225&view=diff
==============================================================================
--- incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java (original)
+++ incubator/mesos/trunk/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java Mon Jan 28 02:19:44 2013
@@ -1,86 +1,639 @@
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
-import org.apache.mesos.MesosSchedulerDriver;
-import org.apache.mesos.SchedulerDriver;
-
+import org.apache.commons.httpclient.HttpHost;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.CommandInfo;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Protos.MasterInfo;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.Value;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import static org.apache.hadoop.util.StringUtils.join;
 
-public class MesosScheduler extends TaskScheduler {
-  public static final Log LOG =
-    LogFactory.getLog(MesosScheduler.class);
+public class MesosScheduler extends TaskScheduler implements Scheduler {
+  public static final Log LOG = LogFactory.getLog(MesosScheduler.class);
 
-  private boolean running = false;
-  private FrameworkScheduler frameworkScheduler;
   private SchedulerDriver driver;
-  JobTracker jobTracker;
+  private TaskScheduler taskScheduler;
+  private JobTracker jobTracker;
+  private Configuration conf;
+
+  // This is the memory overhead for a jvm process. This needs to be added
+  // to a jvm process's resource requirement, in addition to its heap size.
+  private static final int JVM_MEM_OVERHEAD = 256; // 256 MB.
+
+  // TODO(vinod): Consider parsing the slot memory from the configuration jvm
+  // heap options (e.g: mapred.child.java.opts).
+
+  // NOTE: It appears that there's no real resource requirements for a
+  // map / reduce slot. We therefore define a default slot as:
+  // 0.2 cores.
+  // 512 MB memory.
+  // 1 GB of disk space.
+  private static final double SLOT_CPUS_DEFAULT = 0.2; // 0.2 cores.
+  private static final int SLOT_DISK_DEFAULT = 1024; // 1 GB.
+  private static final int SLOT_JVM_HEAP_DEFAULT = 256; // MB.
+
+  private static final double TASKTRACKER_CPUS = 1.0; // 1 core.
+  private static final int TASKTRACKER_JVM_HEAP = 1024; // 1 GB.
+  private static final int TASKTRACKER_MEM =
+      TASKTRACKER_JVM_HEAP + JVM_MEM_OVERHEAD;
+
+  // The default behavior in Hadoop is to use 4 slots per TaskTracker:
+  private static final int MAP_SLOTS_DEFAULT = 2;
+  private static final int REDUCE_SLOTS_DEFAULT = 2;
+
+  // Count of the launched trackers for TaskID generation.
+  private long launchedTrackers = 0;
+
+  // Maintains a mapping from {tracker host:port -> MesosTracker}.
+  // Used for tracking the slots of each TaskTracker and the corresponding
+  // Mesos TaskID.
+  private Map<HttpHost, MesosTracker> mesosTrackers =
+      new HashMap<HttpHost, MesosTracker>();
+
+  private JobInProgressListener jobListener = new JobInProgressListener() {
+    @Override
+    public void jobAdded(JobInProgress job) throws IOException {
+      LOG.info("Added job " + job.getJobID());
+    }
+
+    @Override
+    public void jobRemoved(JobInProgress job) {
+      LOG.info("Removed job " + job.getJobID());
+    }
 
-  private EagerTaskInitializationListener eagerInitListener;
+    @Override
+    public void jobUpdated(JobChangeEvent event) {
+      synchronized (MesosScheduler.this) {
+        JobInProgress job = event.getJobInProgress();
+
+        // If the job is complete, kill all the corresponding idle TaskTrackers.
+        if (!job.isComplete())
+          return;
+
+        LOG.info("Completed job : " + job.getJobID());
+
+        List<TaskInProgress> completed = new ArrayList<TaskInProgress>();
+
+        // Map tasks.
+        completed.addAll(job.reportTasksInProgress(true, true));
+
+        // Reduce tasks.
+        completed.addAll(job.reportTasksInProgress(false, true));
+
+        for (TaskInProgress task : completed) {
+          for (TaskStatus status : task.getTaskStatuses()) {
+            LOG.info("Removing completed task : " + status.getTaskID()
+                + " of tracker " + status.getTaskTracker());
+
+            // Make a copy to iterate over keys and delete values.
+            Set<HttpHost> trackers = new HashSet<HttpHost>(
+                mesosTrackers.keySet());
+
+            // Remove the task from the map.
+            for (HttpHost tracker : trackers) {
+              MesosTracker mesosTracker = mesosTrackers.get(tracker);
+
+              if (!mesosTracker.active) {
+                LOG.warn("Ignoring TaskTracker: " + tracker
+                    + " because it might not have sent a hearbeat");
+                continue;
+              }
+
+              mesosTracker.hadoopTasks.remove(status.getTaskID());
+
+              // If this TaskTracker doesn't have any running tasks, kill it.
+              if (mesosTracker.hadoopTasks.isEmpty()) {
+                LOG.info("Killing Mesos task: " + mesosTracker.taskId
+                    + " on host " + mesosTracker.host);
+
+                driver.killTask(mesosTracker.taskId);
+                mesosTrackers.remove(tracker);
+              }
+            }
+          }
+        }
+      }
+    }
+  };
 
-  public MesosScheduler() { 
+  public MesosScheduler() {
   }
 
   @Override
-  public void start() throws IOException {
+  public synchronized void start() throws IOException {
+    conf = getConf();
+    String taskTrackerClass = conf.get("mapred.mesos.taskScheduler",
+        "org.apache.hadoop.mapred.JobQueueTaskScheduler");
+
     try {
-      LOG.info("Starting MesosScheduler");
-      jobTracker = (JobTracker) super.taskTrackerManager;
+      taskScheduler =
+          (TaskScheduler) Class.forName(taskTrackerClass).newInstance();
+      taskScheduler.setConf(conf);
+      taskScheduler.setTaskTrackerManager(taskTrackerManager);
+    } catch (ClassNotFoundException e) {
+      LOG.fatal("Failed to initialize the TaskScheduler", e);
+      System.exit(1);
+    } catch (InstantiationException e) {
+      LOG.fatal("Failed to initialize the TaskScheduler", e);
+      System.exit(1);
+    } catch (IllegalAccessException e) {
+      LOG.fatal("Failed to initialize the TaskScheduler", e);
+      System.exit(1);
+    }
 
-      Configuration conf = getConf();
-      String master = conf.get("mapred.mesos.master", "local");
+    // Add the job listener to get job related updates.
+    taskTrackerManager.addJobInProgressListener(jobListener);
+
+    LOG.info("Starting MesosScheduler");
+    jobTracker = (JobTracker) super.taskTrackerManager;
 
-      this.eagerInitListener = new EagerTaskInitializationListener(conf);
-      eagerInitListener.setTaskTrackerManager(taskTrackerManager);
-      eagerInitListener.start();
-      taskTrackerManager.addJobInProgressListener(eagerInitListener);
-
-      frameworkScheduler = new FrameworkScheduler(this);
-      driver = new MesosSchedulerDriver(
-          frameworkScheduler, frameworkScheduler.getFrameworkInfo(), master);
+    String master = conf.get("mapred.mesos.master", "local");
 
+    try {
+      FrameworkInfo frameworkInfo = FrameworkInfo
+          .newBuilder()
+          .setUser("")
+          .setName("Hadoop: (RPC port: " + jobTracker.port + ","
+                   + " WebUI port: " + jobTracker.infoPort + ")").build();
+
+      driver = new MesosSchedulerDriver(this, frameworkInfo, master);
       driver.start();
     } catch (Exception e) {
-      // If the MesosScheduler can't be loaded, the JT won't be useful at all,
-      // so crash it now so that the user notices.
+      // If the MesosScheduler can't be loaded, the JobTracker won't be useful
+      // at all, so crash it now so that the user notices.
       LOG.fatal("Failed to start MesosScheduler", e);
-      // TODO: Use System.exit(1) instead of RuntimeException?
-      throw new RuntimeException("Failed to start MesosScheduler", e);
+      System.exit(1);
     }
+
+    taskScheduler.start();
   }
 
   @Override
-  public void terminate() throws IOException {
+  public synchronized void terminate() throws IOException {
     try {
-      if (running) {
-        LOG.info("Stopping MesosScheduler");
-        driver.stop();
-        frameworkScheduler.cleanUp();
+      LOG.info("Stopping MesosScheduler");
+      driver.stop();
+    } catch (Exception e) {
+      LOG.error("Failed to stop Mesos scheduler", e);
+    }
+
+    taskScheduler.terminate();
+  }
+
+  // TaskScheduler methods.
+  @Override
+  public synchronized List<Task> assignTasks(TaskTracker taskTracker)
+      throws IOException {
+    HttpHost tracker = new HttpHost(taskTracker.getStatus().getHost(),
+        taskTracker.getStatus().getHttpPort());
+
+    if (!mesosTrackers.containsKey(tracker)) {
+      // TODO(bmahler): Consider allowing non-Mesos TaskTrackers.
+      LOG.info("Unknown/exited TaskTracker: " + tracker + ". ");
+      return null;
+    }
+
+    // Let the underlying task scheduler do the actual task scheduling.
+    List<Task> tasks = taskScheduler.assignTasks(taskTracker);
+
+    // Keep track of which TaskTracker contains which tasks.
+    for (Task task : tasks) {
+      LOG.info("Assigning task : " + task.getTaskID()
+          + " to tracker " + tracker);
+      mesosTrackers.get(tracker).hadoopTasks.add(task.getTaskID());
+    }
+
+    return tasks;
+  }
+
+  @Override
+  public synchronized Collection<JobInProgress> getJobs(String queueName) {
+    return taskScheduler.getJobs(queueName);
+  }
+
+  @Override
+  public synchronized void refresh() throws IOException {
+    taskScheduler.refresh();
+  }
+
+  // Mesos Scheduler methods.
+  @Override
+  public synchronized void registered(SchedulerDriver schedulerDriver,
+      FrameworkID frameworkID, MasterInfo masterInfo) {
+    LOG.info("Registered as " + frameworkID.getValue()
+        + " with master " + masterInfo);
+  }
+
+  @Override
+  public synchronized void reregistered(SchedulerDriver schedulerDriver,
+      MasterInfo masterInfo) {
+    LOG.info("Re-registered with master " + masterInfo);
+  }
+
+  @Override
+  public synchronized void resourceOffers(SchedulerDriver schedulerDriver,
+      List<Offer> offers) {
+    // Compute the number of pending maps and reduces.
+    int pendingMaps = 0;
+    int pendingReduces = 0;
+    for (JobStatus status : jobTracker.jobsToComplete()) {
+      JobInProgress progress = jobTracker.getJob(status.getJobID());
+      pendingMaps += progress.pendingMaps();
+      pendingReduces += progress.pendingReduces();
+    }
+
+    // Mark active (heartbeated) TaskTrackers and compute idle slots.
+    int idleMapSlots = 0;
+    int idleReduceSlots = 0;
+    for (TaskTrackerStatus status : jobTracker.taskTrackers()) {
+      HttpHost host = new HttpHost(status.getHost(), status.getHttpPort());
+      if (mesosTrackers.containsKey(host)) {
+        mesosTrackers.get(host).active = true;
+        idleMapSlots += status.getAvailableMapSlots();
+        idleReduceSlots += status.getAvailableReduceSlots();
       }
-      if (eagerInitListener != null) {
-        taskTrackerManager.removeJobInProgressListener(eagerInitListener);
+    }
+
+    // Consider the TaskTrackers that have yet to become active as being idle,
+    // otherwise we will launch excessive TaskTrackers.
+    int inactiveMapSlots = 0;
+    int inactiveReduceSlots = 0;
+    for (MesosTracker tracker : mesosTrackers.values()) {
+      if (!tracker.active) {
+        inactiveMapSlots += tracker.mapSlots;
+        inactiveReduceSlots += tracker.reduceSlots;
+      }
+    }
+
+    // Compute how many slots we need to allocate.
+    int neededMapSlots = Math.max(0, pendingMaps - idleMapSlots);
+    int neededReduceSlots = Math.max(0, pendingReduces - idleReduceSlots);
+
+    LOG.info(join("\n", Arrays.asList(
+        "JobTracker Status",
+        "      Pending Map Tasks: " + pendingMaps,
+        "   Pending Reduce Tasks: " + pendingReduces,
+        "         Idle Map Slots: " + idleMapSlots,
+        "      Idle Reduce Slots: " + idleReduceSlots,
+        "     Inactive Map Slots: " + inactiveMapSlots
+                                    + " (launched but no hearbeat yet)",
+        "  Inactive Reduce Slots: " + inactiveReduceSlots
+                                    + " (launched but no hearbeat yet)",
+        "       Needed Map Slots: " + neededMapSlots,
+        "    Needed Reduce Slots: " + neededReduceSlots)));
+
+    // Launch TaskTrackers to satisfy the slot requirements.
+    // TODO(bmahler): Consider slotting intelligently.
+    // Ex: If more map slots are needed, but no reduce slots are needed,
+    // launch a map-only TaskTracker to better satisfy the slot needs.
+    for (Offer offer : offers) {
+      if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
+        driver.declineOffer(offer.getId());
+        continue;
+      }
+
+      double cpus = -1.0;
+      double mem = -1.0;
+      double disk = -1.0;
+      Set<Integer> ports = new HashSet<Integer>(2);
+
+      // Pull out the cpus, memory, disk, and 2 ports from the offer.
+      for (Resource resource : offer.getResourcesList()) {
+        if (resource.getName().equals("cpus")
+            && resource.getType() == Value.Type.SCALAR) {
+          cpus = resource.getScalar().getValue();
+        } else if (resource.getName().equals("mem")
+            && resource.getType() == Value.Type.SCALAR) {
+          mem = resource.getScalar().getValue();
+        } else if (resource.getName().equals("disk")
+            && resource.getType() == Value.Type.SCALAR) {
+          disk = resource.getScalar().getValue();
+        } else if (resource.getName().equals("ports")
+            && resource.getType() == Value.Type.RANGES) {
+          for (Value.Range range : resource.getRanges().getRangeList()) {
+            if (ports.size() < 2)
+              ports.add((int) range.getBegin());
+            if (ports.size() < 2)
+              ports.add((int) range.getEnd());
+          }
+        }
+      }
+
+      int mapSlots = conf.getInt("mapred.tasktracker.map.tasks.maximum",
+          MAP_SLOTS_DEFAULT);
+      int reduceSlots = conf.getInt("mapred.tasktracker.reduce.tasks.maximum",
+          REDUCE_SLOTS_DEFAULT);
+
+      double slotCpus = conf.getFloat("mapred.mesos.slot.cpus",
+          (float) SLOT_CPUS_DEFAULT);
+      double slotDisk = conf.getInt("mapred.mesos.slot.disk",
+          SLOT_DISK_DEFAULT);
+      double slotMem = conf.getInt("mapred.mesos.slot.mem",
+          SLOT_JVM_HEAP_DEFAULT + JVM_MEM_OVERHEAD);
+      double slotJVMHeap = slotMem - JVM_MEM_OVERHEAD;
+
+      // Total resource requirements for the container (TaskTracker + map/red
+      // tasks).
+      double containerCpus = (mapSlots + reduceSlots) * slotCpus
+          + TASKTRACKER_CPUS;
+      double containerMem = (mapSlots + reduceSlots) * slotMem
+          + TASKTRACKER_MEM;
+      double containerDisk = (mapSlots + reduceSlots) * slotDisk;
+
+      if (containerCpus > cpus || containerMem > mem || containerDisk > disk
+          || ports.size() < 2) {
+        LOG.info(join("\n", Arrays.asList(
+            "Declining offer with insufficient resources for a TaskTracker: ",
+            "  cpus: offered " + cpus + " needed " + containerCpus,
+            "  mem : offered " + mem + " needed " + containerMem,
+            "  disk: offered " + disk + " needed " + containerDisk,
+            "  ports: " + (ports.size() < 2
+                          ? " less than 2 offered"
+                          : " at least 2 (sufficient)"),
+            offer.getResourcesList().toString())));
+
+        driver.declineOffer(offer.getId());
+        continue;
+      }
+
+      Integer[] portArray = ports.toArray(new Integer[2]);
+      HttpHost httpAddress = new HttpHost(offer.getHostname(), portArray[0]);
+      HttpHost reportAddress = new HttpHost(offer.getHostname(), portArray[1]);
+      HttpHost jobTrackerAddress = new HttpHost(jobTracker.getHostname(),
+          jobTracker.getTrackerPort());
+
+      TaskID taskId = TaskID.newBuilder()
+          .setValue("Task_Tracker_" + launchedTrackers++).build();
+
+      LOG.info("Launching task " + taskId.getValue() + " on "
+          + httpAddress.toString());
+
+      // Add this tracker to Mesos tasks.
+      mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId,
+          mapSlots, reduceSlots));
+
+      // Create the environment depending on whether the executor is going to be
+      // run locally.
+      // TODO(vinod): Do not pass the mapred config options as environment
+      // variables.
+      Protos.Environment.Builder envBuilder = Protos.Environment
+          .newBuilder()
+          .addVariables(
+              Protos.Environment.Variable
+                  .newBuilder()
+                  .setName("mapred.job.tracker")
+                  .setValue(jobTrackerAddress.getHostName() + ':'
+                      + jobTrackerAddress.getPort()))
+          .addVariables(
+              Protos.Environment.Variable
+                  .newBuilder()
+                  .setName("mapred.task.tracker.http.address")
+                  .setValue(
+                      httpAddress.getHostName() + ':' + httpAddress.getPort()))
+          .addVariables(
+              Protos.Environment.Variable
+                  .newBuilder()
+                  .setName("mapred.task.tracker.report.address")
+                  .setValue(reportAddress.getHostName() + ':'
+                      + reportAddress.getPort()))
+          .addVariables(
+              Protos.Environment.Variable.newBuilder()
+                  .setName("mapred.map.child.java.opts")
+                  .setValue("-Xmx" + slotJVMHeap + "m"))
+          .addVariables(
+              Protos.Environment.Variable.newBuilder()
+                  .setName("mapred.reduce.child.java.opts")
+                  .setValue("-Xmx" + slotJVMHeap + "m"))
+          .addVariables(
+              Protos.Environment.Variable.newBuilder()
+                  .setName("HADOOP_HEAPSIZE")
+                  .setValue("" + TASKTRACKER_JVM_HEAP));
+
+      // Set java specific environment, appropriately.
+      Map<String, String> env = System.getenv();
+      if (env.containsKey("JAVA_HOME")) {
+        envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
+            .setName("JAVA_HOME")
+            .setValue(env.get("JAVA_HOME")));
+      }
+
+      if (env.containsKey("JAVA_LIBRARY_PATH")) {
+        envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
+            .setName("JAVA_LIBRARY_PATH")
+            .setValue(env.get("JAVA_LIBRARY_PATH")));
+      }
+
+      // Command info differs when performing a local run.
+      CommandInfo commandInfo = null;
+      String master = conf.get("mapred.mesos.master", "local");
+
+      if (master.equals("local")) {
+        try {
+          commandInfo = CommandInfo.newBuilder()
+              .setEnvironment(envBuilder)
+              .setValue(new File("bin/mesos-executor").getCanonicalPath())
+              .build();
+        } catch (IOException e) {
+          LOG.fatal("Failed to find Mesos executor ", e);
+          System.exit(1);
+        }
+      } else {
+        String uri = conf.get("mapred.mesos.executor");
+        commandInfo = CommandInfo.newBuilder()
+            .setEnvironment(envBuilder)
+            .setValue("cd hadoop && ./bin/mesos-executor")
+            .addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
+      }
+
+      TaskInfo info = TaskInfo
+          .newBuilder()
+          .setName(taskId.getValue())
+          .setTaskId(taskId)
+          .setSlaveId(offer.getSlaveId())
+          .addResources(
+              Resource
+                  .newBuilder()
+                  .setName("cpus")
+                  .setType(Value.Type.SCALAR)
+                  .setScalar(Value.Scalar.newBuilder().setValue(
+                      (mapSlots + reduceSlots) * slotCpus)))
+          .addResources(
+              Resource
+                  .newBuilder()
+                  .setName("mem")
+                  .setType(Value.Type.SCALAR)
+                  .setScalar(Value.Scalar.newBuilder().setValue(
+                      (mapSlots + reduceSlots) * slotMem)))
+          .addResources(
+              Resource
+                  .newBuilder()
+                  .setName("disk")
+                  .setType(Value.Type.SCALAR)
+                  .setScalar(Value.Scalar.newBuilder().setValue(
+                      (mapSlots + reduceSlots) * slotDisk)))
+          .addResources(
+              Resource
+                  .newBuilder()
+                  .setName("ports")
+                  .setType(Value.Type.RANGES)
+                  .setRanges(
+                      Value.Ranges
+                          .newBuilder()
+                          .addRange(Value.Range.newBuilder()
+                                        .setBegin(httpAddress.getPort())
+                                        .setEnd(httpAddress.getPort()))
+                          .addRange(Value.Range.newBuilder()
+                                        .setBegin(reportAddress.getPort())
+                                        .setEnd(reportAddress.getPort()))))
+          .setExecutor(
+              ExecutorInfo
+                  .newBuilder()
+                  .setExecutorId(ExecutorID.newBuilder().setValue(
+                      "executor_" + taskId.getValue()))
+                  .addResources(
+                      Resource
+                          .newBuilder()
+                          .setName("cpus")
+                          .setType(Value.Type.SCALAR)
+                          .setScalar(Value.Scalar.newBuilder().setValue(
+                              (TASKTRACKER_CPUS))))
+                  .addResources(
+                      Resource
+                          .newBuilder()
+                          .setName("mem")
+                          .setType(Value.Type.SCALAR)
+                          .setScalar(Value.Scalar.newBuilder().setValue(
+                              (TASKTRACKER_MEM)))).setCommand(commandInfo))
+          .build();
+
+      driver.launchTasks(offer.getId(), Arrays.asList(info));
+
+      neededMapSlots -= mapSlots;
+      neededReduceSlots -= reduceSlots;
+    }
+
+    if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
+      LOG.info("Satisfied map and reduce slots needed.");
+    } else {
+      LOG.info("Unable to fully satisfy needed map/reduce slots: "
+          + (neededMapSlots > 0 ? neededMapSlots + " map slots " : "")
+          + (neededReduceSlots > 0 ? neededReduceSlots + " reduce slots " : "")
+          + "remaining");
+    }
+  }
+
+  @Override
+  public synchronized void offerRescinded(SchedulerDriver schedulerDriver,
+      OfferID offerID) {
+    LOG.warn("Rescinded offer: " + offerID.getValue());
+  }
+
+  @Override
+  public synchronized void statusUpdate(SchedulerDriver schedulerDriver,
+      Protos.TaskStatus taskStatus) {
+    LOG.info("Status update of " + taskStatus.getTaskId().getValue()
+        + " to " + taskStatus.getState().name()
+        + " with message " + taskStatus.getMessage());
+
+    // Remove the TaskTracker if the corresponding Mesos task has reached a
+    // terminal state.
+    TaskState state = taskStatus.getState();
+
+    if (state == TaskState.TASK_FINISHED || state == TaskState.TASK_FAILED
+        || state == TaskState.TASK_KILLED || state == TaskState.TASK_LOST) {
+
+      // Make a copy to iterate over keys and delete values.
+      Set<HttpHost> trackers = new HashSet<HttpHost>(mesosTrackers.keySet());
+
+      // Remove the task from the map.
+      for (HttpHost tracker : trackers) {
+        if (mesosTrackers.get(tracker).taskId == taskStatus.getTaskId()) {
+          LOG.info("Removing terminated TaskTracker: " + tracker);
+          mesosTrackers.remove(tracker);
+        }
       }
-    } catch (Exception e) {
-      e.printStackTrace();
     }
   }
 
   @Override
-  public List<Task> assignTasks(TaskTracker taskTracker) throws IOException {
-    return frameworkScheduler.assignTasks(taskTracker);
+  public synchronized void frameworkMessage(SchedulerDriver schedulerDriver,
+      ExecutorID executorID, SlaveID slaveID, byte[] bytes) {
+    LOG.info("Framework Message of " + bytes.length + " bytes"
+        + " from executor " + executorID.getValue()
+        + " on slave " + slaveID.getValue());
   }
 
   @Override
-  public Collection<JobInProgress> getJobs(String queueName) {
-    // TODO Actually return some jobs
-    ArrayList<JobInProgress> list = new ArrayList<JobInProgress>();
-    return list;
+  public synchronized void disconnected(SchedulerDriver schedulerDriver) {
+    LOG.warn("Disconnected from Mesos master.");
   }
 
+  @Override
+  public synchronized void slaveLost(SchedulerDriver schedulerDriver,
+      SlaveID slaveID) {
+    LOG.warn("Slave lost: " + slaveID.getValue());
+  }
+
+  @Override
+  public synchronized void executorLost(SchedulerDriver schedulerDriver,
+      ExecutorID executorID, SlaveID slaveID, int status) {
+    LOG.warn("Executor " + executorID.getValue()
+        + " lost with status " + status + " on slave " + slaveID);
+  }
+
+  @Override
+  public synchronized void error(SchedulerDriver schedulerDriver, String s) {
+    LOG.error("Error from scheduler driver: " + s);
+  }
+
+  /**
+   * Used to track the our launched TaskTrackers.
+   */
+  private class MesosTracker {
+    public HttpHost host;
+    public TaskID taskId;
+    public int mapSlots;
+    public int reduceSlots;
+    public boolean active = false; // Set once tracked by the JobTracker.
+
+    // Tracks Hadoop tasks running on the tracker.
+    public Set<TaskAttemptID> hadoopTasks = new HashSet<TaskAttemptID>();
+
+    public MesosTracker(HttpHost host, TaskID taskId, int mapSlots,
+        int reduceSlots) {
+      this.host = host;
+      this.taskId = taskId;
+      this.mapSlots = mapSlots;
+      this.reduceSlots = reduceSlots;
+    }
+  }
 }