You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/08/03 00:21:18 UTC
[1/5] Refactored Hadoop on Mesos from contrib to JAR.
Updated Branches:
refs/heads/master f6d95e89e -> c621ae052
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
deleted file mode 100644
index 027389d..0000000
--- a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
+++ /dev/null
@@ -1,848 +0,0 @@
-package org.apache.hadoop.mapred;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.Iterator;
-
-import org.apache.commons.httpclient.HttpHost;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.mesos.MesosSchedulerDriver;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.CommandInfo;
-import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.FrameworkInfo;
-import org.apache.mesos.Protos.MasterInfo;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.TaskState;
-import org.apache.mesos.Protos.Value;
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-
-import static org.apache.hadoop.util.StringUtils.join;
-
-public class MesosScheduler extends TaskScheduler implements Scheduler {
- public static final Log LOG = LogFactory.getLog(MesosScheduler.class);
-
- private SchedulerDriver driver;
- private TaskScheduler taskScheduler;
- private JobTracker jobTracker;
- private Configuration conf;
-
- // This is the memory overhead for a jvm process. This needs to be added
- // to a jvm process's resource requirement, in addition to its heap size.
- private static final double JVM_MEM_OVERHEAD_PERCENT_DEFAULT = 0.1; // 10%.
-
- // TODO(vinod): Consider parsing the slot memory from the configuration jvm
- // heap options (e.g: mapred.child.java.opts).
-
- // NOTE: It appears that there's no real resource requirements for a
- // map / reduce slot. We therefore define a default slot as:
- // 0.2 cores.
- // 512 MB memory.
- // 1 GB of disk space.
- private static final double SLOT_CPUS_DEFAULT = 0.2; // 0.2 cores.
- private static final int SLOT_DISK_DEFAULT = 1024; // 1 GB.
- private static final int SLOT_JVM_HEAP_DEFAULT = 256; // 256MB.
-
- private static final double TASKTRACKER_CPUS = 1.0; // 1 core.
- private static final int TASKTRACKER_MEM_DEFAULT = 1024; // 1 GB.
-
- // The default behavior in Hadoop is to use 4 slots per TaskTracker:
- private static final int MAP_SLOTS_DEFAULT = 2;
- private static final int REDUCE_SLOTS_DEFAULT = 2;
-
- // The amount of time to wait for task trackers to launch before
- // giving up.
- private static final long LAUNCH_TIMEOUT_MS = 300000; // 5 minutes
-
- // Count of the launched trackers for TaskID generation.
- private long launchedTrackers = 0;
-
- // Maintains a mapping from {tracker host:port -> MesosTracker}.
- // Used for tracking the slots of each TaskTracker and the corresponding
- // Mesos TaskID.
- private Map<HttpHost, MesosTracker> mesosTrackers =
- new HashMap<HttpHost, MesosTracker>();
-
- private JobInProgressListener jobListener = new JobInProgressListener() {
- @Override
- public void jobAdded(JobInProgress job) throws IOException {
- LOG.info("Added job " + job.getJobID());
- }
-
- @Override
- public void jobRemoved(JobInProgress job) {
- LOG.info("Removed job " + job.getJobID());
- }
-
- @Override
- public void jobUpdated(JobChangeEvent event) {
- synchronized (MesosScheduler.this) {
- JobInProgress job = event.getJobInProgress();
-
- // If the job is complete, kill all the corresponding idle TaskTrackers.
- if (!job.isComplete()) {
- return;
- }
-
- LOG.info("Completed job : " + job.getJobID());
-
- List<TaskInProgress> completed = new ArrayList<TaskInProgress>();
-
- // Map tasks.
- completed.addAll(job.reportTasksInProgress(true, true));
-
- // Reduce tasks.
- completed.addAll(job.reportTasksInProgress(false, true));
-
- for (TaskInProgress task : completed) {
- // Check that this task actually belongs to this job
- if (task.getJob().getJobID() != job.getJobID()) {
- continue;
- }
-
- for (TaskStatus status : task.getTaskStatuses()) {
- // Make a copy to iterate over keys and delete values.
- Set<HttpHost> trackers = new HashSet<HttpHost>(
- mesosTrackers.keySet());
-
- // Remove the task from the map.
- for (HttpHost tracker : trackers) {
- MesosTracker mesosTracker = mesosTrackers.get(tracker);
-
- if (!mesosTracker.active) {
- LOG.warn("Ignoring TaskTracker: " + tracker
- + " because it might not have sent a hearbeat");
- continue;
- }
-
- LOG.info("Removing completed task : " + status.getTaskID()
- + " of tracker " + status.getTaskTracker());
-
- mesosTracker.hadoopJobs.remove(job.getJobID());
-
- // If the TaskTracker doesn't have any running tasks, kill it.
- if (mesosTracker.hadoopJobs.isEmpty()) {
- LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host "
- + mesosTracker.host);
-
- driver.killTask(mesosTracker.taskId);
- tracker.timer.cancel();
- mesosTrackers.remove(tracker);
- }
- }
- }
- }
- }
- }
- };
-
- public MesosScheduler() {}
-
- // TaskScheduler methods.
- @Override
- public synchronized void start() throws IOException {
- conf = getConf();
- String taskTrackerClass = conf.get("mapred.mesos.taskScheduler",
- "org.apache.hadoop.mapred.JobQueueTaskScheduler");
-
- try {
- taskScheduler =
- (TaskScheduler) Class.forName(taskTrackerClass).newInstance();
- taskScheduler.setConf(conf);
- taskScheduler.setTaskTrackerManager(taskTrackerManager);
- } catch (ClassNotFoundException e) {
- LOG.fatal("Failed to initialize the TaskScheduler", e);
- System.exit(1);
- } catch (InstantiationException e) {
- LOG.fatal("Failed to initialize the TaskScheduler", e);
- System.exit(1);
- } catch (IllegalAccessException e) {
- LOG.fatal("Failed to initialize the TaskScheduler", e);
- System.exit(1);
- }
-
- // Add the job listener to get job related updates.
- taskTrackerManager.addJobInProgressListener(jobListener);
-
- LOG.info("Starting MesosScheduler");
- jobTracker = (JobTracker) super.taskTrackerManager;
-
- String master = conf.get("mapred.mesos.master", "local");
-
- try {
- FrameworkInfo frameworkInfo = FrameworkInfo
- .newBuilder()
- .setUser("")
- .setCheckpoint(conf.getBoolean("mapred.mesos.checkpoint", false))
- .setName("Hadoop: (RPC port: " + jobTracker.port + ","
- + " WebUI port: " + jobTracker.infoPort + ")").build();
-
- driver = new MesosSchedulerDriver(this, frameworkInfo, master);
- driver.start();
- } catch (Exception e) {
- // If the MesosScheduler can't be loaded, the JobTracker won't be useful
- // at all, so crash it now so that the user notices.
- LOG.fatal("Failed to start MesosScheduler", e);
- System.exit(1);
- }
-
- taskScheduler.start();
- }
-
- @Override
- public synchronized void terminate() throws IOException {
- try {
- LOG.info("Stopping MesosScheduler");
- driver.stop();
- } catch (Exception e) {
- LOG.error("Failed to stop Mesos scheduler", e);
- }
-
- taskScheduler.terminate();
- }
-
- @Override
- public synchronized List<Task> assignTasks(TaskTracker taskTracker)
- throws IOException {
- HttpHost tracker = new HttpHost(taskTracker.getStatus().getHost(),
- taskTracker.getStatus().getHttpPort());
-
- if (!mesosTrackers.containsKey(tracker)) {
- // TODO(bmahler): Consider allowing non-Mesos TaskTrackers.
- LOG.info("Unknown/exited TaskTracker: " + tracker + ". ");
- return null;
- }
-
- // Let the underlying task scheduler do the actual task scheduling.
- List<Task> tasks = taskScheduler.assignTasks(taskTracker);
-
- // The Hadoop Fair Scheduler is known to return null.
- if (tasks != null) {
- // Keep track of which TaskTracker contains which tasks.
- for (Task task : tasks) {
- mesosTrackers.get(tracker).hadoopJobs.add(task.getJobID());
- }
- }
-
- return tasks;
- }
-
- @Override
- public synchronized Collection<JobInProgress> getJobs(String queueName) {
- return taskScheduler.getJobs(queueName);
- }
-
- @Override
- public synchronized void refresh() throws IOException {
- taskScheduler.refresh();
- }
-
- // Mesos Scheduler methods.
- // These are synchronized, where possible. Some of these methods need to access the
- // JobTracker, which can lead to a deadlock:
- // See: https://issues.apache.org/jira/browse/MESOS-429
- // The workaround employed here is to unsynchronize those methods needing access to
- // the JobTracker state and use explicit synchronization instead as appropriate.
- // TODO(bmahler): Provide a cleaner solution to this issue. One solution is to
- // split up the Scheduler and TaskScheduler implementations in order to break the
- // locking cycle. This would require a synchronized class to store the shared
- // state across our Scheduler and TaskScheduler implementations, and provide
- // atomic operations as needed.
- @Override
- public synchronized void registered(SchedulerDriver schedulerDriver,
- FrameworkID frameworkID, MasterInfo masterInfo) {
- LOG.info("Registered as " + frameworkID.getValue()
- + " with master " + masterInfo);
- }
-
- @Override
- public synchronized void reregistered(SchedulerDriver schedulerDriver,
- MasterInfo masterInfo) {
- LOG.info("Re-registered with master " + masterInfo);
- }
-
- public synchronized void killTracker(MesosTracker tracker) {
- driver.killTask(tracker.taskId);
- mesosTrackers.remove(tracker.host);
- }
-
- // For some reason, pendingMaps() and pendingReduces() doesn't return the
- // values we expect. We observed negative values, which may be related to
- // https://issues.apache.org/jira/browse/MAPREDUCE-1238. Below is the
- // algorithm that is used to calculate the pending tasks within the Hadoop
- // JobTracker sources (see 'printTaskSummary' in
- // src/org/apache/hadoop/mapred/jobdetails_jsp.java).
- private int getPendingTasks(TaskInProgress[] tasks) {
- int totalTasks = tasks.length;
- int runningTasks = 0;
- int finishedTasks = 0;
- int killedTasks = 0;
- for (int i = 0; i < totalTasks; ++i) {
- TaskInProgress task = tasks[i];
- if (task == null) {
- continue;
- }
- if (task.isComplete()) {
- finishedTasks += 1;
- } else if (task.isRunning()) {
- runningTasks += 1;
- } else if (task.wasKilled()) {
- killedTasks += 1;
- }
- }
- int pendingTasks = totalTasks - runningTasks - killedTasks - finishedTasks;
- return pendingTasks;
- }
-
- // This method uses explicit synchronization in order to avoid deadlocks when
- // accessing the JobTracker.
- @Override
- public void resourceOffers(SchedulerDriver schedulerDriver,
- List<Offer> offers) {
- // Before synchronizing, we pull all needed information from the JobTracker.
- final HttpHost jobTrackerAddress =
- new HttpHost(jobTracker.getHostname(), jobTracker.getTrackerPort());
-
- final Collection<TaskTrackerStatus> taskTrackers = jobTracker.taskTrackers();
-
- final List<JobInProgress> jobsInProgress = new ArrayList<JobInProgress>();
- for (JobStatus status : jobTracker.jobsToComplete()) {
- jobsInProgress.add(jobTracker.getJob(status.getJobID()));
- }
-
- synchronized (this) {
- // Compute the number of pending maps and reduces.
- int pendingMaps = 0;
- int pendingReduces = 0;
- for (JobInProgress progress : jobsInProgress) {
- // JobStatus.pendingMaps/Reduces may return the wrong value on
- // occasion. This seems to be safer.
- pendingMaps += getPendingTasks(progress.getTasks(TaskType.MAP));
- pendingReduces += getPendingTasks(progress.getTasks(TaskType.REDUCE));
- }
-
- // Mark active (heartbeated) TaskTrackers and compute idle slots.
- int idleMapSlots = 0;
- int idleReduceSlots = 0;
- int unhealthyTrackers = 0;
-
- for (TaskTrackerStatus status : taskTrackers) {
- if (!status.getHealthStatus().isNodeHealthy()) {
- // Skip this node if it's unhealthy.
- ++unhealthyTrackers;
- continue;
- }
-
- HttpHost host = new HttpHost(status.getHost(), status.getHttpPort());
- if (mesosTrackers.containsKey(host)) {
- mesosTrackers.get(host).active = true;
- mesosTrackers.get(host).timer.cancel();
- idleMapSlots += status.getAvailableMapSlots();
- idleReduceSlots += status.getAvailableReduceSlots();
- }
- }
-
- // Consider the TaskTrackers that have yet to become active as being idle,
- // otherwise we will launch excessive TaskTrackers.
- int inactiveMapSlots = 0;
- int inactiveReduceSlots = 0;
- for (MesosTracker tracker : mesosTrackers.values()) {
- if (!tracker.active) {
- inactiveMapSlots += tracker.mapSlots;
- inactiveReduceSlots += tracker.reduceSlots;
- }
- }
-
- // To ensure Hadoop jobs begin promptly, we can specify a minimum number
- // of 'hot slots' to be available for use. This addresses the
- // TaskTracker spin up delay that exists with Hadoop on Mesos. This can
- // be a nuisance with lower latency applications, such as ad-hoc Hive
- // queries.
- int minimumMapSlots = conf.getInt("mapred.mesos.total.map.slots.minimum", 0);
- int minimumReduceSlots =
- conf.getInt("mapred.mesos.total.reduce.slots.minimum", 0);
-
- // Compute how many slots we need to allocate.
- int neededMapSlots = Math.max(
- minimumMapSlots - (idleMapSlots + inactiveMapSlots),
- pendingMaps - (idleMapSlots + inactiveMapSlots));
- int neededReduceSlots = Math.max(
- minimumReduceSlots - (idleReduceSlots + inactiveReduceSlots),
- pendingReduces - (idleReduceSlots + inactiveReduceSlots));
-
- LOG.info(join("\n", Arrays.asList(
- "JobTracker Status",
- " Pending Map Tasks: " + pendingMaps,
- " Pending Reduce Tasks: " + pendingReduces,
- " Idle Map Slots: " + idleMapSlots,
- " Idle Reduce Slots: " + idleReduceSlots,
- " Inactive Map Slots: " + inactiveMapSlots
- + " (launched but no hearbeat yet)",
- " Inactive Reduce Slots: " + inactiveReduceSlots
- + " (launched but no hearbeat yet)",
- " Needed Map Slots: " + neededMapSlots,
- " Needed Reduce Slots: " + neededReduceSlots,
- " Unhealthy Trackers: " + unhealthyTrackers)));
-
- // Launch TaskTrackers to satisfy the slot requirements.
- // TODO(bmahler): Consider slotting intelligently.
- // Ex: If more map slots are needed, but no reduce slots are needed,
- // launch a map-only TaskTracker to better satisfy the slot needs.
- for (Offer offer : offers) {
- if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
- driver.declineOffer(offer.getId());
- continue;
- }
-
- // Ensure these values aren't < 0.
- neededMapSlots = Math.max(0, neededMapSlots);
- neededReduceSlots = Math.max(0, neededReduceSlots);
-
- double cpus = -1.0;
- double mem = -1.0;
- double disk = -1.0;
- Set<Integer> ports = new HashSet<Integer>();
-
- // Pull out the cpus, memory, disk, and 2 ports from the offer.
- for (Resource resource : offer.getResourcesList()) {
- if (resource.getName().equals("cpus")
- && resource.getType() == Value.Type.SCALAR) {
- cpus = resource.getScalar().getValue();
- } else if (resource.getName().equals("mem")
- && resource.getType() == Value.Type.SCALAR) {
- mem = resource.getScalar().getValue();
- } else if (resource.getName().equals("disk")
- && resource.getType() == Value.Type.SCALAR) {
- disk = resource.getScalar().getValue();
- } else if (resource.getName().equals("ports")
- && resource.getType() == Value.Type.RANGES) {
- for (Value.Range range : resource.getRanges().getRangeList()) {
- Integer begin = (int)range.getBegin();
- Integer end = (int)range.getEnd();
- if (end < begin) {
- LOG.warn("Ignoring invalid port range: begin=" + begin + " end=" + end);
- continue;
- }
- while (begin <= end && ports.size() < 2) {
- ports.add(begin);
- begin += 1;
- }
- }
- }
- }
-
- int mapSlotsMax = conf.getInt("mapred.tasktracker.map.tasks.maximum",
- MAP_SLOTS_DEFAULT);
- int reduceSlotsMax =
- conf.getInt("mapred.tasktracker.reduce.tasks.maximum",
- REDUCE_SLOTS_DEFAULT);
-
- // What's the minimum number of map and reduce slots we should try to
- // launch?
- long mapSlots = 0;
- long reduceSlots = 0;
-
- double slotCpus = conf.getFloat("mapred.mesos.slot.cpus",
- (float) SLOT_CPUS_DEFAULT);
- double slotDisk = conf.getInt("mapred.mesos.slot.disk",
- SLOT_DISK_DEFAULT);
-
- int slotMem = conf.getInt("mapred.mesos.slot.mem",
- SLOT_JVM_HEAP_DEFAULT);
- long slotJVMHeap = Math.round((double)slotMem -
- (JVM_MEM_OVERHEAD_PERCENT_DEFAULT * slotMem));
-
- int tasktrackerMem = conf.getInt("mapred.mesos.tasktracker.mem",
- TASKTRACKER_MEM_DEFAULT);
- long tasktrackerJVMHeap = Math.round((double)tasktrackerMem -
- (JVM_MEM_OVERHEAD_PERCENT_DEFAULT * tasktrackerMem));
-
- // Minimum resource requirements for the container (TaskTracker + map/red
- // tasks).
- double containerCpus = TASKTRACKER_CPUS;
- double containerMem = tasktrackerMem;
- double containerDisk = 0;
-
- // Determine how many slots we can allocate.
- int slots = mapSlotsMax + reduceSlotsMax;
- slots = (int)Math.min(slots, (cpus - containerCpus) / slotCpus);
- slots = (int)Math.min(slots, (mem - containerMem) / slotMem);
- slots = (int)Math.min(slots, (disk - containerDisk) / slotDisk);
-
- // Is this offer too small for even the minimum slots?
- if (slots < 1 || ports.size() < 2) {
- LOG.info(join("\n", Arrays.asList(
- "Declining offer with insufficient resources for a TaskTracker: ",
- " cpus: offered " + cpus + " needed " + containerCpus,
- " mem : offered " + mem + " needed " + containerMem,
- " disk: offered " + disk + " needed " + containerDisk,
- " ports: " + (ports.size() < 2
- ? " less than 2 offered"
- : " at least 2 (sufficient)"))));
-
- driver.declineOffer(offer.getId());
- continue;
- }
-
- // Is the number of slots we need sufficiently small? If so, we can
- // allocate exactly the number we need.
- if (slots >= neededMapSlots + neededReduceSlots && neededMapSlots <
- mapSlotsMax && neededReduceSlots < reduceSlotsMax) {
- mapSlots = neededMapSlots;
- reduceSlots = neededReduceSlots;
- } else {
- // Allocate slots fairly for this resource offer.
- double mapFactor = (double)neededMapSlots / (neededMapSlots + neededReduceSlots);
- double reduceFactor = (double)neededReduceSlots / (neededMapSlots + neededReduceSlots);
- // To avoid map/reduce slot starvation, don't allow more than 50%
- // spread between map/reduce slots when we need both mappers and
- // reducers.
- if (neededMapSlots > 0 && neededReduceSlots > 0) {
- if (mapFactor < 0.25) {
- mapFactor = 0.25;
- } else if (mapFactor > 0.75) {
- mapFactor = 0.75;
- }
- if (reduceFactor < 0.25) {
- reduceFactor = 0.25;
- } else if (reduceFactor > 0.75) {
- reduceFactor = 0.75;
- }
- }
- mapSlots = Math.min(Math.min((long)(mapFactor * slots), mapSlotsMax), neededMapSlots);
- // The remaining slots are allocated for reduces.
- slots -= mapSlots;
- reduceSlots = Math.min(Math.min(slots, reduceSlotsMax), neededReduceSlots);
- }
-
- Iterator<Integer> portIter = ports.iterator();
- HttpHost httpAddress = new HttpHost(offer.getHostname(), portIter.next());
- HttpHost reportAddress = new HttpHost(offer.getHostname(), portIter.next());
-
- // Check that this tracker is not already launched. This problem was
- // observed on a few occasions, but not reliably. The main symptom was
- // that entries in `mesosTrackers` were being lost, and task trackers
- // would be 'lost' mysteriously (probably because the ports were in
- // use). This problem has since gone away with a rewrite of the port
- // selection code, but the check + logging is left here.
- // TODO(brenden): Diagnose this to determine root cause.
-
- if (mesosTrackers.containsKey(httpAddress)) {
- LOG.info(join("\n", Arrays.asList(
- "Declining offer because host/port combination is in use: ",
- " cpus: offered " + cpus + " needed " + containerCpus,
- " mem : offered " + mem + " needed " + containerMem,
- " disk: offered " + disk + " needed " + containerDisk,
- " ports: " + ports)));
-
- driver.declineOffer(offer.getId());
- continue;
- }
-
- TaskID taskId = TaskID.newBuilder()
- .setValue("Task_Tracker_" + launchedTrackers++).build();
-
- LOG.info("Launching task " + taskId.getValue() + " on "
- + httpAddress.toString() + " with mapSlots=" + mapSlots + " reduceSlots=" + reduceSlots);
-
- // Add this tracker to Mesos tasks.
- mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId,
- mapSlots, reduceSlots, this));
-
- // Create the environment depending on whether the executor is going to be
- // run locally.
- // TODO(vinod): Do not pass the mapred config options as environment
- // variables.
- Protos.Environment.Builder envBuilder = Protos.Environment
- .newBuilder()
- .addVariables(
- Protos.Environment.Variable
- .newBuilder()
- .setName("mapred.job.tracker")
- .setValue(jobTrackerAddress.getHostName() + ':'
- + jobTrackerAddress.getPort()))
- .addVariables(
- Protos.Environment.Variable
- .newBuilder()
- .setName("mapred.task.tracker.http.address")
- .setValue(
- httpAddress.getHostName() + ':' + httpAddress.getPort()))
- .addVariables(
- Protos.Environment.Variable
- .newBuilder()
- .setName("mapred.task.tracker.report.address")
- .setValue(reportAddress.getHostName() + ':'
- + reportAddress.getPort()))
- .addVariables(
- Protos.Environment.Variable.newBuilder()
- .setName("mapred.map.child.java.opts")
- .setValue("-Xmx" + slotJVMHeap + "m"))
- .addVariables(
- Protos.Environment.Variable.newBuilder()
- .setName("mapred.reduce.child.java.opts")
- .setValue("-Xmx" + slotJVMHeap + "m"))
- .addVariables(
- Protos.Environment.Variable.newBuilder()
- .setName("HADOOP_HEAPSIZE")
- .setValue("" + tasktrackerJVMHeap))
- .addVariables(
- Protos.Environment.Variable.newBuilder()
- .setName("mapred.tasktracker.map.tasks.maximum")
- .setValue("" + mapSlots))
- .addVariables(
- Protos.Environment.Variable.newBuilder()
- .setName("mapred.tasktracker.reduce.tasks.maximum")
- .setValue("" + reduceSlots));
-
- // Set java specific environment, appropriately.
- Map<String, String> env = System.getenv();
- if (env.containsKey("JAVA_HOME")) {
- envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
- .setName("JAVA_HOME")
- .setValue(env.get("JAVA_HOME")));
- }
-
- if (env.containsKey("JAVA_LIBRARY_PATH")) {
- envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
- .setName("JAVA_LIBRARY_PATH")
- .setValue(env.get("JAVA_LIBRARY_PATH")));
- }
-
- // Command info differs when performing a local run.
- CommandInfo commandInfo = null;
- String master = conf.get("mapred.mesos.master", "local");
-
- if (master.equals("local")) {
- try {
- commandInfo = CommandInfo.newBuilder()
- .setEnvironment(envBuilder)
- .setValue(new File("bin/mesos-executor").getCanonicalPath())
- .build();
- } catch (IOException e) {
- LOG.fatal("Failed to find Mesos executor ", e);
- System.exit(1);
- }
- } else {
- String uri = conf.get("mapred.mesos.executor");
- commandInfo = CommandInfo.newBuilder()
- .setEnvironment(envBuilder)
- .setValue("cd hadoop-* && ./bin/mesos-executor")
- .addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
- }
-
- TaskInfo info = TaskInfo
- .newBuilder()
- .setName(taskId.getValue())
- .setTaskId(taskId)
- .setSlaveId(offer.getSlaveId())
- .addResources(
- Resource
- .newBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (mapSlots + reduceSlots) * slotCpus)))
- .addResources(
- Resource
- .newBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (mapSlots + reduceSlots) * slotMem)))
- .addResources(
- Resource
- .newBuilder()
- .setName("disk")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (mapSlots + reduceSlots) * slotDisk)))
- .addResources(
- Resource
- .newBuilder()
- .setName("ports")
- .setType(Value.Type.RANGES)
- .setRanges(
- Value.Ranges
- .newBuilder()
- .addRange(Value.Range.newBuilder()
- .setBegin(httpAddress.getPort())
- .setEnd(httpAddress.getPort()))
- .addRange(Value.Range.newBuilder()
- .setBegin(reportAddress.getPort())
- .setEnd(reportAddress.getPort()))))
- .setExecutor(
- ExecutorInfo
- .newBuilder()
- .setExecutorId(ExecutorID.newBuilder().setValue(
- "executor_" + taskId.getValue()))
- .setName("Hadoop TaskTracker")
- .setSource(taskId.getValue())
- .addResources(
- Resource
- .newBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (TASKTRACKER_CPUS))))
- .addResources(
- Resource
- .newBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (tasktrackerMem)))).setCommand(commandInfo))
- .build();
-
- driver.launchTasks(offer.getId(), Arrays.asList(info));
-
- neededMapSlots -= mapSlots;
- neededReduceSlots -= reduceSlots;
- }
-
- if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
- LOG.info("Satisfied map and reduce slots needed.");
- } else {
- LOG.info("Unable to fully satisfy needed map/reduce slots: "
- + (neededMapSlots > 0 ? neededMapSlots + " map slots " : "")
- + (neededReduceSlots > 0 ? neededReduceSlots + " reduce slots " : "")
- + "remaining");
- }
- }
- }
-
- @Override
- public synchronized void offerRescinded(SchedulerDriver schedulerDriver,
- OfferID offerID) {
- LOG.warn("Rescinded offer: " + offerID.getValue());
- }
-
- @Override
- public synchronized void statusUpdate(SchedulerDriver schedulerDriver,
- Protos.TaskStatus taskStatus) {
- LOG.info("Status update of " + taskStatus.getTaskId().getValue()
- + " to " + taskStatus.getState().name()
- + " with message " + taskStatus.getMessage());
-
- // Remove the TaskTracker if the corresponding Mesos task has reached a
- // terminal state.
- switch (taskStatus.getState()) {
- case TASK_FINISHED:
- case TASK_FAILED:
- case TASK_KILLED:
- case TASK_LOST:
- // Make a copy to iterate over keys and delete values.
- Set<HttpHost> trackers = new HashSet<HttpHost>(mesosTrackers.keySet());
-
- // Remove the task from the map.
- for (HttpHost tracker : trackers) {
- if (mesosTrackers.get(tracker).taskId.equals(taskStatus.getTaskId())) {
- LOG.info("Removing terminated TaskTracker: " + tracker);
- tracker.timer.cancel();
- mesosTrackers.remove(tracker);
- }
- }
- break;
- case TASK_STAGING:
- case TASK_STARTING:
- case TASK_RUNNING:
- break;
- default:
- LOG.error("Unexpected TaskStatus: " + taskStatus.getState().name());
- break;
- }
- }
-
- @Override
- public synchronized void frameworkMessage(SchedulerDriver schedulerDriver,
- ExecutorID executorID, SlaveID slaveID, byte[] bytes) {
- LOG.info("Framework Message of " + bytes.length + " bytes"
- + " from executor " + executorID.getValue()
- + " on slave " + slaveID.getValue());
- }
-
- @Override
- public synchronized void disconnected(SchedulerDriver schedulerDriver) {
- LOG.warn("Disconnected from Mesos master.");
- }
-
- @Override
- public synchronized void slaveLost(SchedulerDriver schedulerDriver,
- SlaveID slaveID) {
- LOG.warn("Slave lost: " + slaveID.getValue());
- }
-
- @Override
- public synchronized void executorLost(SchedulerDriver schedulerDriver,
- ExecutorID executorID, SlaveID slaveID, int status) {
- LOG.warn("Executor " + executorID.getValue()
- + " lost with status " + status + " on slave " + slaveID);
- }
-
- @Override
- public synchronized void error(SchedulerDriver schedulerDriver, String s) {
- LOG.error("Error from scheduler driver: " + s);
- }
-
- /**
- * Used to track the our launched TaskTrackers.
- */
- private class MesosTracker {
- public volatile HttpHost host;
- public TaskID taskId;
- public long mapSlots;
- public long reduceSlots;
- public volatile boolean active = false; // Set once tracked by the JobTracker.
- public Timer timer;
- public volatile MesosScheduler scheduler;
-
- // Tracks Hadoop job tasks running on the tracker.
- public Set<JobID> hadoopJobs = new HashSet<JobID>();
-
- public MesosTracker(HttpHost host, TaskID taskId, long mapSlots,
- long reduceSlots, MesosScheduler scheduler) {
- this.host = host;
- this.taskId = taskId;
- this.mapSlots = mapSlots;
- this.reduceSlots = reduceSlots;
- this.scheduler = scheduler;
-
- this.timer = new Timer();
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- synchronized (MesosTracker.this.scheduler) {
- // If the tracker activated while we were awaiting to acquire the
- // lock, return.
- if (MesosTracker.this.active) return;
-
- LOG.warn("Tracker " + MesosTracker.this.host + " failed to launch within " +
- LAUNCH_TIMEOUT_MS / 1000 + " seconds, killing it");
- MesosTracker.this.scheduler.killTracker(MesosTracker.this);
- }
- }
- }, LAUNCH_TIMEOUT_MS);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop/pom.xml b/hadoop/pom.xml
new file mode 100644
index 0000000..baf3e5b
--- /dev/null
+++ b/hadoop/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.mesos</groupId>
+ <artifactId>hadoop-mesos</artifactId>
+ <version>0.0.1</version>
+
+ <properties>
+ <encoding>UTF-8</encoding>
+
+ <!-- language versions -->
+ <java.abi>1.6</java.abi>
+
+ <!-- runtime deps versions -->
+ <commons-logging.version>1.1.1</commons-logging.version>
+ <hadoop-core.version>1.2.0</hadoop-core.version>
+ <mesos.version>0.14.0</mesos.version>
+ <protobuf.version>2.4.1</protobuf.version>
+
+ <!-- test deps versions -->
+ <junit.version>4.11</junit.version>
+ <mockito.version>1.9.5</mockito.version>
+ </properties>
+
+ <!-- TODO(benh): Remove repository once we've published a JDK6 JAR. -->
+ <repositories>
+ <repository>
+ <id>mesosphere-public-repo</id>
+ <name>Mesosphere Public Repo</name>
+ <url>http://s3.amazonaws.com/mesosphere-maven-public/</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>${commons-logging.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>${hadoop-core.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.mesos</groupId>
+ <!-- TODO(benh): Use 'mesos' after we've published a JDK6 JAR. -->
+ <artifactId>mesos_jdk6</artifactId>
+ <version>${mesos.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+
+ <!-- Test scope -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>${java.abi}</source>
+ <target>${java.abi}</target>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.0</version>
+ <configuration>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/hadoop/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java
new file mode 100644
index 0000000..ccf0090
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java
@@ -0,0 +1,145 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.mesos.Executor;
+import org.apache.mesos.ExecutorDriver;
+import org.apache.mesos.MesosExecutorDriver;
+import org.apache.mesos.Protos.Environment.Variable;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Protos.SlaveInfo;
+import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.TaskStatus;
+
+public class MesosExecutor implements Executor {
+ public static final Log LOG = LogFactory.getLog(MesosExecutor.class);
+
+ private JobConf conf;
+ private TaskTracker taskTracker;
+
+ @Override
+ public void registered(ExecutorDriver driver, ExecutorInfo executorInfo,
+ FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) {
+ LOG.info("Executor registered with the slave");
+
+ conf = new JobConf();
+
+ // Get TaskTracker's config options from environment variables set by the
+ // JobTracker.
+ if (executorInfo.getCommand().hasEnvironment()) {
+ for (Variable variable : executorInfo.getCommand().getEnvironment()
+ .getVariablesList()) {
+ LOG.info("Setting config option : " + variable.getName() + " to "
+ + variable.getValue());
+ conf.set(variable.getName(), variable.getValue());
+ }
+ }
+
+ // Get hostname from Mesos to make sure we match what it reports
+ // to the JobTracker.
+ conf.set("slave.host.name", slaveInfo.getHostname());
+
+ // Set the mapred.local directory inside the executor sandbox, so that
+ // different TaskTrackers on the same host do not step on each other.
+ conf.set("mapred.local.dir", System.getProperty("user.dir") + "/mapred");
+ }
+
+ @Override
+ public void launchTask(final ExecutorDriver driver, final TaskInfo task) {
+ LOG.info("Launching task : " + task.getTaskId().getValue());
+
+ // NOTE: We need to manually set the context class loader here because,
+ // the TaskTracker is unable to find LoginModule class otherwise.
+ Thread.currentThread().setContextClassLoader(
+ TaskTracker.class.getClassLoader());
+
+ try {
+ taskTracker = new TaskTracker(conf);
+ } catch (IOException e) {
+ LOG.fatal("Failed to start TaskTracker", e);
+ System.exit(1);
+ } catch (InterruptedException e) {
+ LOG.fatal("Failed to start TaskTracker", e);
+ System.exit(1);
+ }
+
+ // Spin up a TaskTracker in a new thread.
+ new Thread("TaskTracker Run Thread") {
+ @Override
+ public void run() {
+ taskTracker.run();
+
+ // Send a TASK_FINISHED status update.
+ // We do this here because we want to send it in a separate thread
+ // than was used to call killTask().
+ driver.sendStatusUpdate(TaskStatus.newBuilder()
+ .setTaskId(task.getTaskId())
+ .setState(TaskState.TASK_FINISHED)
+ .build());
+
+ // Give some time for the update to reach the slave.
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.error("Failed to sleep TaskTracker thread", e);
+ }
+
+ // Stop the executor.
+ driver.stop();
+ }
+ }.start();
+
+ driver.sendStatusUpdate(TaskStatus.newBuilder()
+ .setTaskId(task.getTaskId())
+ .setState(TaskState.TASK_RUNNING).build());
+ }
+
+ @Override
+ public void killTask(ExecutorDriver driver, TaskID taskId) {
+ LOG.info("Killing task : " + taskId.getValue());
+ try {
+ taskTracker.shutdown();
+ } catch (IOException e) {
+ LOG.error("Failed to shutdown TaskTracker", e);
+ } catch (InterruptedException e) {
+ LOG.error("Failed to shutdown TaskTracker", e);
+ }
+ }
+
+ @Override
+ public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) {
+ LOG.info("Executor reregistered with the slave");
+ }
+
+ @Override
+ public void disconnected(ExecutorDriver driver) {
+ LOG.info("Executor disconnected from the slave");
+ }
+
+ @Override
+ public void frameworkMessage(ExecutorDriver d, byte[] msg) {
+ LOG.info("Executor received framework message of length: " + msg.length
+ + " bytes");
+ }
+
+ @Override
+ public void error(ExecutorDriver d, String message) {
+ LOG.error("MesosExecutor.error: " + message);
+ }
+
+ @Override
+ public void shutdown(ExecutorDriver d) {
+ LOG.info("Executor asked to shutdown");
+ }
+
+ public static void main(String[] args) {
+ MesosExecutorDriver driver = new MesosExecutorDriver(new MesosExecutor());
+ System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
new file mode 100644
index 0000000..7a1469d
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
@@ -0,0 +1,845 @@
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.Iterator;
+
+import org.apache.commons.httpclient.HttpHost;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.CommandInfo;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.FrameworkID;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.Protos.MasterInfo;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.OfferID;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.SlaveID;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.Value;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import static org.apache.hadoop.util.StringUtils.join;
+
+public class MesosScheduler extends TaskScheduler implements Scheduler {
+ public static final Log LOG = LogFactory.getLog(MesosScheduler.class);
+
+ private SchedulerDriver driver;
+ private TaskScheduler taskScheduler;
+ private JobTracker jobTracker;
+ private Configuration conf;
+
+ // This is the memory overhead for a jvm process. This needs to be added
+ // to a jvm process's resource requirement, in addition to its heap size.
+ private static final double JVM_MEM_OVERHEAD_PERCENT_DEFAULT = 0.1; // 10%.
+
+ // TODO(vinod): Consider parsing the slot memory from the configuration jvm
+ // heap options (e.g: mapred.child.java.opts).
+
+ // NOTE: It appears that there's no real resource requirements for a
+ // map / reduce slot. We therefore define a default slot as:
+ // 0.2 cores.
+ // 512 MB memory.
+ // 1 GB of disk space.
+ private static final double SLOT_CPUS_DEFAULT = 0.2; // 0.2 cores.
+ private static final int SLOT_DISK_DEFAULT = 1024; // 1 GB.
+ private static final int SLOT_JVM_HEAP_DEFAULT = 256; // 256MB.
+
+ private static final double TASKTRACKER_CPUS = 1.0; // 1 core.
+ private static final int TASKTRACKER_MEM_DEFAULT = 1024; // 1 GB.
+
+ // The default behavior in Hadoop is to use 4 slots per TaskTracker:
+ private static final int MAP_SLOTS_DEFAULT = 2;
+ private static final int REDUCE_SLOTS_DEFAULT = 2;
+
+ // The amount of time to wait for task trackers to launch before
+ // giving up.
+ private static final long LAUNCH_TIMEOUT_MS = 300000; // 5 minutes
+
+ // Count of the launched trackers for TaskID generation.
+ private long launchedTrackers = 0;
+
+ // Maintains a mapping from {tracker host:port -> MesosTracker}.
+ // Used for tracking the slots of each TaskTracker and the corresponding
+ // Mesos TaskID.
+ private Map<HttpHost, MesosTracker> mesosTrackers =
+ new HashMap<HttpHost, MesosTracker>();
+
+ private JobInProgressListener jobListener = new JobInProgressListener() {
+ @Override
+ public void jobAdded(JobInProgress job) throws IOException {
+ LOG.info("Added job " + job.getJobID());
+ }
+
+ @Override
+ public void jobRemoved(JobInProgress job) {
+ LOG.info("Removed job " + job.getJobID());
+ }
+
+ @Override
+ public void jobUpdated(JobChangeEvent event) {
+ synchronized (MesosScheduler.this) {
+ JobInProgress job = event.getJobInProgress();
+
+ // If the job is complete, kill all the corresponding idle TaskTrackers.
+ if (!job.isComplete()) {
+ return;
+ }
+
+ LOG.info("Completed job : " + job.getJobID());
+
+ List<TaskInProgress> completed = new ArrayList<TaskInProgress>();
+
+ // Map tasks.
+ completed.addAll(job.reportTasksInProgress(true, true));
+
+ // Reduce tasks.
+ completed.addAll(job.reportTasksInProgress(false, true));
+
+ for (TaskInProgress task : completed) {
+ // Check that this task actually belongs to this job
+ if (task.getJob().getJobID() != job.getJobID()) {
+ continue;
+ }
+
+ for (TaskStatus status : task.getTaskStatuses()) {
+ // Make a copy to iterate over keys and delete values.
+ Set<HttpHost> trackers = new HashSet<HttpHost>(
+ mesosTrackers.keySet());
+
+ // Remove the task from the map.
+ for (HttpHost tracker : trackers) {
+ MesosTracker mesosTracker = mesosTrackers.get(tracker);
+
+ if (!mesosTracker.active) {
+ LOG.warn("Ignoring TaskTracker: " + tracker
+ + " because it might not have sent a hearbeat");
+ continue;
+ }
+
+ LOG.info("Removing completed task : " + status.getTaskID()
+ + " of tracker " + status.getTaskTracker());
+
+ mesosTracker.hadoopJobs.remove(job.getJobID());
+
+ // If the TaskTracker doesn't have any running tasks, kill it.
+ if (mesosTracker.hadoopJobs.isEmpty()) {
+ LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host "
+ + mesosTracker.host);
+
+ driver.killTask(mesosTracker.taskId);
+ tracker.timer.cancel();
+ mesosTrackers.remove(tracker);
+ }
+ }
+ }
+ }
+ }
+ }
+ };
+
+ public MesosScheduler() {}
+
+ // TaskScheduler methods.
+ @Override
+ public synchronized void start() throws IOException {
+ conf = getConf();
+ String taskTrackerClass = conf.get("mapred.mesos.taskScheduler",
+ "org.apache.hadoop.mapred.JobQueueTaskScheduler");
+
+ try {
+ taskScheduler =
+ (TaskScheduler) Class.forName(taskTrackerClass).newInstance();
+ taskScheduler.setConf(conf);
+ taskScheduler.setTaskTrackerManager(taskTrackerManager);
+ } catch (ClassNotFoundException e) {
+ LOG.fatal("Failed to initialize the TaskScheduler", e);
+ System.exit(1);
+ } catch (InstantiationException e) {
+ LOG.fatal("Failed to initialize the TaskScheduler", e);
+ System.exit(1);
+ } catch (IllegalAccessException e) {
+ LOG.fatal("Failed to initialize the TaskScheduler", e);
+ System.exit(1);
+ }
+
+ // Add the job listener to get job related updates.
+ taskTrackerManager.addJobInProgressListener(jobListener);
+
+ LOG.info("Starting MesosScheduler");
+ jobTracker = (JobTracker) super.taskTrackerManager;
+
+ String master = conf.get("mapred.mesos.master", "local");
+
+ try {
+ FrameworkInfo frameworkInfo = FrameworkInfo
+ .newBuilder()
+ .setUser("")
+ .setCheckpoint(conf.getBoolean("mapred.mesos.checkpoint", false))
+ .setName("Hadoop: (RPC port: " + jobTracker.port + ","
+ + " WebUI port: " + jobTracker.infoPort + ")").build();
+
+ driver = new MesosSchedulerDriver(this, frameworkInfo, master);
+ driver.start();
+ } catch (Exception e) {
+ // If the MesosScheduler can't be loaded, the JobTracker won't be useful
+ // at all, so crash it now so that the user notices.
+ LOG.fatal("Failed to start MesosScheduler", e);
+ System.exit(1);
+ }
+
+ taskScheduler.start();
+ }
+
+ @Override
+ public synchronized void terminate() throws IOException {
+ try {
+ LOG.info("Stopping MesosScheduler");
+ driver.stop();
+ } catch (Exception e) {
+ LOG.error("Failed to stop Mesos scheduler", e);
+ }
+
+ taskScheduler.terminate();
+ }
+
+ @Override
+ public synchronized List<Task> assignTasks(TaskTracker taskTracker)
+ throws IOException {
+ HttpHost tracker = new HttpHost(taskTracker.getStatus().getHost(),
+ taskTracker.getStatus().getHttpPort());
+
+ if (!mesosTrackers.containsKey(tracker)) {
+ // TODO(bmahler): Consider allowing non-Mesos TaskTrackers.
+ LOG.info("Unknown/exited TaskTracker: " + tracker + ". ");
+ return null;
+ }
+
+ // Let the underlying task scheduler do the actual task scheduling.
+ List<Task> tasks = taskScheduler.assignTasks(taskTracker);
+
+ // The Hadoop Fair Scheduler is known to return null.
+ if (tasks != null) {
+ // Keep track of which TaskTracker contains which tasks.
+ for (Task task : tasks) {
+ mesosTrackers.get(tracker).hadoopJobs.add(task.getJobID());
+ }
+ }
+
+ return tasks;
+ }
+
+ @Override
+ public synchronized Collection<JobInProgress> getJobs(String queueName) {
+ return taskScheduler.getJobs(queueName);
+ }
+
+ @Override
+ public synchronized void refresh() throws IOException {
+ taskScheduler.refresh();
+ }
+
+ // Mesos Scheduler methods.
+ // These are synchronized, where possible. Some of these methods need to access the
+ // JobTracker, which can lead to a deadlock:
+ // See: https://issues.apache.org/jira/browse/MESOS-429
+ // The workaround employed here is to unsynchronize those methods needing access to
+ // the JobTracker state and use explicit synchronization instead as appropriate.
+ // TODO(bmahler): Provide a cleaner solution to this issue. One solution is to
+ // split up the Scheduler and TaskScheduler implementations in order to break the
+ // locking cycle. This would require a synchronized class to store the shared
+ // state across our Scheduler and TaskScheduler implementations, and provide
+ // atomic operations as needed.
+ @Override
+ public synchronized void registered(SchedulerDriver schedulerDriver,
+ FrameworkID frameworkID, MasterInfo masterInfo) {
+ LOG.info("Registered as " + frameworkID.getValue()
+ + " with master " + masterInfo);
+ }
+
+ @Override
+ public synchronized void reregistered(SchedulerDriver schedulerDriver,
+ MasterInfo masterInfo) {
+ LOG.info("Re-registered with master " + masterInfo);
+ }
+
+ public synchronized void killTracker(MesosTracker tracker) {
+ driver.killTask(tracker.taskId);
+ mesosTrackers.remove(tracker.host);
+ }
+
+ // For some reason, pendingMaps() and pendingReduces() doesn't return the
+ // values we expect. We observed negative values, which may be related to
+ // https://issues.apache.org/jira/browse/MAPREDUCE-1238. Below is the
+ // algorithm that is used to calculate the pending tasks within the Hadoop
+ // JobTracker sources (see 'printTaskSummary' in
+ // src/org/apache/hadoop/mapred/jobdetails_jsp.java).
+ private int getPendingTasks(TaskInProgress[] tasks) {
+ int totalTasks = tasks.length;
+ int runningTasks = 0;
+ int finishedTasks = 0;
+ int killedTasks = 0;
+ for (int i = 0; i < totalTasks; ++i) {
+ TaskInProgress task = tasks[i];
+ if (task == null) {
+ continue;
+ }
+ if (task.isComplete()) {
+ finishedTasks += 1;
+ } else if (task.isRunning()) {
+ runningTasks += 1;
+ } else if (task.wasKilled()) {
+ killedTasks += 1;
+ }
+ }
+ int pendingTasks = totalTasks - runningTasks - killedTasks - finishedTasks;
+ return pendingTasks;
+ }
+
+ // This method uses explicit synchronization in order to avoid deadlocks when
+ // accessing the JobTracker.
+ @Override
+ public void resourceOffers(SchedulerDriver schedulerDriver,
+ List<Offer> offers) {
+ // Before synchronizing, we pull all needed information from the JobTracker.
+ final HttpHost jobTrackerAddress =
+ new HttpHost(jobTracker.getHostname(), jobTracker.getTrackerPort());
+
+ final Collection<TaskTrackerStatus> taskTrackers = jobTracker.taskTrackers();
+
+ final List<JobInProgress> jobsInProgress = new ArrayList<JobInProgress>();
+ for (JobStatus status : jobTracker.jobsToComplete()) {
+ jobsInProgress.add(jobTracker.getJob(status.getJobID()));
+ }
+
+ synchronized (this) {
+ // Compute the number of pending maps and reduces.
+ int pendingMaps = 0;
+ int pendingReduces = 0;
+ for (JobInProgress progress : jobsInProgress) {
+ // JobStatus.pendingMaps/Reduces may return the wrong value on
+ // occasion. This seems to be safer.
+ pendingMaps += getPendingTasks(progress.getTasks(TaskType.MAP));
+ pendingReduces += getPendingTasks(progress.getTasks(TaskType.REDUCE));
+ }
+
+ // Mark active (heartbeated) TaskTrackers and compute idle slots.
+ int idleMapSlots = 0;
+ int idleReduceSlots = 0;
+ int unhealthyTrackers = 0;
+
+ for (TaskTrackerStatus status : taskTrackers) {
+ if (!status.getHealthStatus().isNodeHealthy()) {
+ // Skip this node if it's unhealthy.
+ ++unhealthyTrackers;
+ continue;
+ }
+
+ HttpHost host = new HttpHost(status.getHost(), status.getHttpPort());
+ if (mesosTrackers.containsKey(host)) {
+ mesosTrackers.get(host).active = true;
+ mesosTrackers.get(host).timer.cancel();
+ idleMapSlots += status.getAvailableMapSlots();
+ idleReduceSlots += status.getAvailableReduceSlots();
+ }
+ }
+
+ // Consider the TaskTrackers that have yet to become active as being idle,
+ // otherwise we will launch excessive TaskTrackers.
+ int inactiveMapSlots = 0;
+ int inactiveReduceSlots = 0;
+ for (MesosTracker tracker : mesosTrackers.values()) {
+ if (!tracker.active) {
+ inactiveMapSlots += tracker.mapSlots;
+ inactiveReduceSlots += tracker.reduceSlots;
+ }
+ }
+
+ // To ensure Hadoop jobs begin promptly, we can specify a minimum number
+ // of 'hot slots' to be available for use. This addresses the
+ // TaskTracker spin up delay that exists with Hadoop on Mesos. This can
+ // be a nuisance with lower latency applications, such as ad-hoc Hive
+ // queries.
+ int minimumMapSlots = conf.getInt("mapred.mesos.total.map.slots.minimum", 0);
+ int minimumReduceSlots =
+ conf.getInt("mapred.mesos.total.reduce.slots.minimum", 0);
+
+ // Compute how many slots we need to allocate.
+ int neededMapSlots = Math.max(
+ minimumMapSlots - (idleMapSlots + inactiveMapSlots),
+ pendingMaps - (idleMapSlots + inactiveMapSlots));
+ int neededReduceSlots = Math.max(
+ minimumReduceSlots - (idleReduceSlots + inactiveReduceSlots),
+ pendingReduces - (idleReduceSlots + inactiveReduceSlots));
+
+ LOG.info(join("\n", Arrays.asList(
+ "JobTracker Status",
+ " Pending Map Tasks: " + pendingMaps,
+ " Pending Reduce Tasks: " + pendingReduces,
+ " Idle Map Slots: " + idleMapSlots,
+ " Idle Reduce Slots: " + idleReduceSlots,
+ " Inactive Map Slots: " + inactiveMapSlots
+ + " (launched but no hearbeat yet)",
+ " Inactive Reduce Slots: " + inactiveReduceSlots
+ + " (launched but no hearbeat yet)",
+ " Needed Map Slots: " + neededMapSlots,
+ " Needed Reduce Slots: " + neededReduceSlots,
+ " Unhealthy Trackers: " + unhealthyTrackers)));
+
+ // Launch TaskTrackers to satisfy the slot requirements.
+ // TODO(bmahler): Consider slotting intelligently.
+ // Ex: If more map slots are needed, but no reduce slots are needed,
+ // launch a map-only TaskTracker to better satisfy the slot needs.
+ for (Offer offer : offers) {
+ if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
+ driver.declineOffer(offer.getId());
+ continue;
+ }
+
+ // Ensure these values aren't < 0.
+ neededMapSlots = Math.max(0, neededMapSlots);
+ neededReduceSlots = Math.max(0, neededReduceSlots);
+
+ double cpus = -1.0;
+ double mem = -1.0;
+ double disk = -1.0;
+ Set<Integer> ports = new HashSet<Integer>();
+
+ // Pull out the cpus, memory, disk, and 2 ports from the offer.
+ for (Resource resource : offer.getResourcesList()) {
+ if (resource.getName().equals("cpus")
+ && resource.getType() == Value.Type.SCALAR) {
+ cpus = resource.getScalar().getValue();
+ } else if (resource.getName().equals("mem")
+ && resource.getType() == Value.Type.SCALAR) {
+ mem = resource.getScalar().getValue();
+ } else if (resource.getName().equals("disk")
+ && resource.getType() == Value.Type.SCALAR) {
+ disk = resource.getScalar().getValue();
+ } else if (resource.getName().equals("ports")
+ && resource.getType() == Value.Type.RANGES) {
+ for (Value.Range range : resource.getRanges().getRangeList()) {
+ Integer begin = (int)range.getBegin();
+ Integer end = (int)range.getEnd();
+ if (end < begin) {
+ LOG.warn("Ignoring invalid port range: begin=" + begin + " end=" + end);
+ continue;
+ }
+ while (begin <= end && ports.size() < 2) {
+ ports.add(begin);
+ begin += 1;
+ }
+ }
+ }
+ }
+
+ int mapSlotsMax = conf.getInt("mapred.tasktracker.map.tasks.maximum",
+ MAP_SLOTS_DEFAULT);
+ int reduceSlotsMax =
+ conf.getInt("mapred.tasktracker.reduce.tasks.maximum",
+ REDUCE_SLOTS_DEFAULT);
+
+ // What's the minimum number of map and reduce slots we should try to
+ // launch?
+ long mapSlots = 0;
+ long reduceSlots = 0;
+
+ double slotCpus = conf.getFloat("mapred.mesos.slot.cpus",
+ (float) SLOT_CPUS_DEFAULT);
+ double slotDisk = conf.getInt("mapred.mesos.slot.disk",
+ SLOT_DISK_DEFAULT);
+
+ int slotMem = conf.getInt("mapred.mesos.slot.mem",
+ SLOT_JVM_HEAP_DEFAULT);
+ long slotJVMHeap = Math.round((double)slotMem -
+ (JVM_MEM_OVERHEAD_PERCENT_DEFAULT * slotMem));
+
+ int tasktrackerMem = conf.getInt("mapred.mesos.tasktracker.mem",
+ TASKTRACKER_MEM_DEFAULT);
+ long tasktrackerJVMHeap = Math.round((double)tasktrackerMem -
+ (JVM_MEM_OVERHEAD_PERCENT_DEFAULT * tasktrackerMem));
+
+ // Minimum resource requirements for the container (TaskTracker + map/red
+ // tasks).
+ double containerCpus = TASKTRACKER_CPUS;
+ double containerMem = tasktrackerMem;
+ double containerDisk = 0;
+
+ // Determine how many slots we can allocate.
+ int slots = mapSlotsMax + reduceSlotsMax;
+ slots = (int)Math.min(slots, (cpus - containerCpus) / slotCpus);
+ slots = (int)Math.min(slots, (mem - containerMem) / slotMem);
+ slots = (int)Math.min(slots, (disk - containerDisk) / slotDisk);
+
+ // Is this offer too small for even the minimum slots?
+ if (slots < 1 || ports.size() < 2) {
+ LOG.info(join("\n", Arrays.asList(
+ "Declining offer with insufficient resources for a TaskTracker: ",
+ " cpus: offered " + cpus + " needed " + containerCpus,
+ " mem : offered " + mem + " needed " + containerMem,
+ " disk: offered " + disk + " needed " + containerDisk,
+ " ports: " + (ports.size() < 2
+ ? " less than 2 offered"
+ : " at least 2 (sufficient)"))));
+
+ driver.declineOffer(offer.getId());
+ continue;
+ }
+
+ // Is the number of slots we need sufficiently small? If so, we can
+ // allocate exactly the number we need.
+ if (slots >= neededMapSlots + neededReduceSlots && neededMapSlots <
+ mapSlotsMax && neededReduceSlots < reduceSlotsMax) {
+ mapSlots = neededMapSlots;
+ reduceSlots = neededReduceSlots;
+ } else {
+ // Allocate slots fairly for this resource offer.
+ double mapFactor = (double)neededMapSlots / (neededMapSlots + neededReduceSlots);
+ double reduceFactor = (double)neededReduceSlots / (neededMapSlots + neededReduceSlots);
+ // To avoid map/reduce slot starvation, don't allow more than 50%
+ // spread between map/reduce slots when we need both mappers and
+ // reducers.
+ if (neededMapSlots > 0 && neededReduceSlots > 0) {
+ if (mapFactor < 0.25) {
+ mapFactor = 0.25;
+ } else if (mapFactor > 0.75) {
+ mapFactor = 0.75;
+ }
+ if (reduceFactor < 0.25) {
+ reduceFactor = 0.25;
+ } else if (reduceFactor > 0.75) {
+ reduceFactor = 0.75;
+ }
+ }
+ mapSlots = Math.min(Math.min((long)(mapFactor * slots), mapSlotsMax), neededMapSlots);
+ // The remaining slots are allocated for reduces.
+ slots -= mapSlots;
+ reduceSlots = Math.min(Math.min(slots, reduceSlotsMax), neededReduceSlots);
+ }
+
+ Iterator<Integer> portIter = ports.iterator();
+ HttpHost httpAddress = new HttpHost(offer.getHostname(), portIter.next());
+ HttpHost reportAddress = new HttpHost(offer.getHostname(), portIter.next());
+
+ // Check that this tracker is not already launched. This problem was
+ // observed on a few occasions, but not reliably. The main symptom was
+ // that entries in `mesosTrackers` were being lost, and task trackers
+ // would be 'lost' mysteriously (probably because the ports were in
+ // use). This problem has since gone away with a rewrite of the port
+ // selection code, but the check + logging is left here.
+ // TODO(brenden): Diagnose this to determine root cause.
+
+ if (mesosTrackers.containsKey(httpAddress)) {
+ LOG.info(join("\n", Arrays.asList(
+ "Declining offer because host/port combination is in use: ",
+ " cpus: offered " + cpus + " needed " + containerCpus,
+ " mem : offered " + mem + " needed " + containerMem,
+ " disk: offered " + disk + " needed " + containerDisk,
+ " ports: " + ports)));
+
+ driver.declineOffer(offer.getId());
+ continue;
+ }
+
+ TaskID taskId = TaskID.newBuilder()
+ .setValue("Task_Tracker_" + launchedTrackers++).build();
+
+ LOG.info("Launching task " + taskId.getValue() + " on "
+ + httpAddress.toString() + " with mapSlots=" + mapSlots + " reduceSlots=" + reduceSlots);
+
+ // Add this tracker to Mesos tasks.
+ mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId,
+ mapSlots, reduceSlots, this));
+
+ // Create the environment depending on whether the executor is going to be
+ // run locally.
+ // TODO(vinod): Do not pass the mapred config options as environment
+ // variables.
+ Protos.Environment.Builder envBuilder = Protos.Environment
+ .newBuilder()
+ .addVariables(
+ Protos.Environment.Variable
+ .newBuilder()
+ .setName("mapred.job.tracker")
+ .setValue(jobTrackerAddress.getHostName() + ':'
+ + jobTrackerAddress.getPort()))
+ .addVariables(
+ Protos.Environment.Variable
+ .newBuilder()
+ .setName("mapred.task.tracker.http.address")
+ .setValue(
+ httpAddress.getHostName() + ':' + httpAddress.getPort()))
+ .addVariables(
+ Protos.Environment.Variable
+ .newBuilder()
+ .setName("mapred.task.tracker.report.address")
+ .setValue(reportAddress.getHostName() + ':'
+ + reportAddress.getPort()))
+ .addVariables(
+ Protos.Environment.Variable.newBuilder()
+ .setName("mapred.map.child.java.opts")
+ .setValue("-Xmx" + slotJVMHeap + "m"))
+ .addVariables(
+ Protos.Environment.Variable.newBuilder()
+ .setName("mapred.reduce.child.java.opts")
+ .setValue("-Xmx" + slotJVMHeap + "m"))
+ .addVariables(
+ Protos.Environment.Variable.newBuilder()
+ .setName("HADOOP_HEAPSIZE")
+ .setValue("" + tasktrackerJVMHeap))
+ .addVariables(
+ Protos.Environment.Variable.newBuilder()
+ .setName("mapred.tasktracker.map.tasks.maximum")
+ .setValue("" + mapSlots))
+ .addVariables(
+ Protos.Environment.Variable.newBuilder()
+ .setName("mapred.tasktracker.reduce.tasks.maximum")
+ .setValue("" + reduceSlots));
+
+ // Set java specific environment, appropriately.
+ Map<String, String> env = System.getenv();
+ if (env.containsKey("JAVA_HOME")) {
+ envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
+ .setName("JAVA_HOME")
+ .setValue(env.get("JAVA_HOME")));
+ }
+
+ if (env.containsKey("JAVA_LIBRARY_PATH")) {
+ envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
+ .setName("JAVA_LIBRARY_PATH")
+ .setValue(env.get("JAVA_LIBRARY_PATH")));
+ }
+
+ // Command info differs when performing a local run.
+ CommandInfo commandInfo = null;
+ String master = conf.get("mapred.mesos.master", "local");
+
+ if (master.equals("local")) {
+ commandInfo = CommandInfo.newBuilder()
+ .setEnvironment(envBuilder)
+ .setValue(new File("bin/hadoop").getCanonicalPath() +
+ " org.apache.hadoop.mapred.MesosExecutor")
+ .build();
+ } else {
+ String uri = conf.get("mapred.mesos.executor");
+ commandInfo = CommandInfo.newBuilder()
+ .setEnvironment(envBuilder)
+ .setValue("cd hadoop-* && " +
+ "./bin/hadoop org.apache.hadoop.mapred.MesosExecutor")
+ .addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
+ }
+
+ TaskInfo info = TaskInfo
+ .newBuilder()
+ .setName(taskId.getValue())
+ .setTaskId(taskId)
+ .setSlaveId(offer.getSlaveId())
+ .addResources(
+ Resource
+ .newBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(
+ (mapSlots + reduceSlots) * slotCpus)))
+ .addResources(
+ Resource
+ .newBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(
+ (mapSlots + reduceSlots) * slotMem)))
+ .addResources(
+ Resource
+ .newBuilder()
+ .setName("disk")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(
+ (mapSlots + reduceSlots) * slotDisk)))
+ .addResources(
+ Resource
+ .newBuilder()
+ .setName("ports")
+ .setType(Value.Type.RANGES)
+ .setRanges(
+ Value.Ranges
+ .newBuilder()
+ .addRange(Value.Range.newBuilder()
+ .setBegin(httpAddress.getPort())
+ .setEnd(httpAddress.getPort()))
+ .addRange(Value.Range.newBuilder()
+ .setBegin(reportAddress.getPort())
+ .setEnd(reportAddress.getPort()))))
+ .setExecutor(
+ ExecutorInfo
+ .newBuilder()
+ .setExecutorId(ExecutorID.newBuilder().setValue(
+ "executor_" + taskId.getValue()))
+ .setName("Hadoop TaskTracker")
+ .setSource(taskId.getValue())
+ .addResources(
+ Resource
+ .newBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(
+ (TASKTRACKER_CPUS))))
+ .addResources(
+ Resource
+ .newBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder().setValue(
+ (tasktrackerMem)))).setCommand(commandInfo))
+ .build();
+
+ driver.launchTasks(offer.getId(), Arrays.asList(info));
+
+ neededMapSlots -= mapSlots;
+ neededReduceSlots -= reduceSlots;
+ }
+
+ if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
+ LOG.info("Satisfied map and reduce slots needed.");
+ } else {
+ LOG.info("Unable to fully satisfy needed map/reduce slots: "
+ + (neededMapSlots > 0 ? neededMapSlots + " map slots " : "")
+ + (neededReduceSlots > 0 ? neededReduceSlots + " reduce slots " : "")
+ + "remaining");
+ }
+ }
+ }
+
+ @Override
+ public synchronized void offerRescinded(SchedulerDriver schedulerDriver,
+ OfferID offerID) {
+ LOG.warn("Rescinded offer: " + offerID.getValue());
+ }
+
+ @Override
+ public synchronized void statusUpdate(SchedulerDriver schedulerDriver,
+ Protos.TaskStatus taskStatus) {
+ LOG.info("Status update of " + taskStatus.getTaskId().getValue()
+ + " to " + taskStatus.getState().name()
+ + " with message " + taskStatus.getMessage());
+
+ // Remove the TaskTracker if the corresponding Mesos task has reached a
+ // terminal state.
+ switch (taskStatus.getState()) {
+ case TASK_FINISHED:
+ case TASK_FAILED:
+ case TASK_KILLED:
+ case TASK_LOST:
+ // Make a copy to iterate over keys and delete values.
+ Set<HttpHost> trackers = new HashSet<HttpHost>(mesosTrackers.keySet());
+
+ // Remove the task from the map.
+ for (HttpHost tracker : trackers) {
+ if (mesosTrackers.get(tracker).taskId.equals(taskStatus.getTaskId())) {
+ LOG.info("Removing terminated TaskTracker: " + tracker);
+ tracker.timer.cancel();
+ mesosTrackers.remove(tracker);
+ }
+ }
+ break;
+ case TASK_STAGING:
+ case TASK_STARTING:
+ case TASK_RUNNING:
+ break;
+ default:
+ LOG.error("Unexpected TaskStatus: " + taskStatus.getState().name());
+ break;
+ }
+ }
+
+ @Override
+ public synchronized void frameworkMessage(SchedulerDriver schedulerDriver,
+ ExecutorID executorID, SlaveID slaveID, byte[] bytes) {
+ LOG.info("Framework Message of " + bytes.length + " bytes"
+ + " from executor " + executorID.getValue()
+ + " on slave " + slaveID.getValue());
+ }
+
+ @Override
+ public synchronized void disconnected(SchedulerDriver schedulerDriver) {
+ LOG.warn("Disconnected from Mesos master.");
+ }
+
+ @Override
+ public synchronized void slaveLost(SchedulerDriver schedulerDriver,
+ SlaveID slaveID) {
+ LOG.warn("Slave lost: " + slaveID.getValue());
+ }
+
+ @Override
+ public synchronized void executorLost(SchedulerDriver schedulerDriver,
+ ExecutorID executorID, SlaveID slaveID, int status) {
+ LOG.warn("Executor " + executorID.getValue()
+ + " lost with status " + status + " on slave " + slaveID);
+ }
+
+ @Override
+ public synchronized void error(SchedulerDriver schedulerDriver, String s) {
+ LOG.error("Error from scheduler driver: " + s);
+ }
+
+ /**
+ * Used to track the our launched TaskTrackers.
+ */
+ private class MesosTracker {
+ public volatile HttpHost host;
+ public TaskID taskId;
+ public long mapSlots;
+ public long reduceSlots;
+ public volatile boolean active = false; // Set once tracked by the JobTracker.
+ public Timer timer;
+ public volatile MesosScheduler scheduler;
+
+ // Tracks Hadoop job tasks running on the tracker.
+ public Set<JobID> hadoopJobs = new HashSet<JobID>();
+
+ public MesosTracker(HttpHost host, TaskID taskId, long mapSlots,
+ long reduceSlots, MesosScheduler scheduler) {
+ this.host = host;
+ this.taskId = taskId;
+ this.mapSlots = mapSlots;
+ this.reduceSlots = reduceSlots;
+ this.scheduler = scheduler;
+
+ this.timer = new Timer();
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ synchronized (MesosTracker.this.scheduler) {
+ // If the tracker activated while we were awaiting to acquire the
+ // lock, return.
+ if (MesosTracker.this.active) return;
+
+ LOG.warn("Tracker " + MesosTracker.this.host + " failed to launch within " +
+ LAUNCH_TIMEOUT_MS / 1000 + " seconds, killing it");
+ MesosTracker.this.scheduler.killTracker(MesosTracker.this);
+ }
+ }
+ }, LAUNCH_TIMEOUT_MS);
+ }
+ }
+}
[5/5] git commit: Removed Hadoop from repository.
Posted by be...@apache.org.
Removed Hadoop from repository.
Review: https://reviews.apache.org/r/13245
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c621ae05
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c621ae05
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c621ae05
Branch: refs/heads/master
Commit: c621ae052f15ce6f18d6e08a7c9064b6a7a74711
Parents: 3a7b926
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Aug 2 14:30:12 2013 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Aug 2 15:02:31 2013 -0700
----------------------------------------------------------------------
Makefile.am | 2 +-
README | 12 -
configure.ac | 1 -
hadoop/README.md | 128 ---
hadoop/pom.xml | 110 ---
.../org/apache/hadoop/mapred/MesosExecutor.java | 145 ----
.../apache/hadoop/mapred/MesosScheduler.java | 845 -------------------
7 files changed, 1 insertion(+), 1242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c621ae05/Makefile.am
----------------------------------------------------------------------
diff --git a/Makefile.am b/Makefile.am
index fc1c1d0..260486b 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -18,7 +18,7 @@ ACLOCAL_AMFLAGS = -I m4
AUTOMAKE_OPTIONS = foreign
-SUBDIRS = . 3rdparty src ec2 hadoop jenkins
+SUBDIRS = . 3rdparty src ec2 jenkins
EXTRA_DIST =
http://git-wip-us.apache.org/repos/asf/mesos/blob/c621ae05/README
----------------------------------------------------------------------
diff --git a/README b/README
index 7fb6cba..e94531d 100644
--- a/README
+++ b/README
@@ -95,18 +95,6 @@ addition, see [build]/bin for scripts that can run the tests and also
launch the tests in GDB.
-Hadoop
-======
-
-Included in the distribution is a runnable tutorial on using Hadoop on
-Mesos (./hadoop/TUTORIAL.sh). Try it out!
-
-You can also "build" a self-contained distribution of Hadoop with the
-necessary Mesos components by doing 'make hadoop-0.20.205.0' or 'make
-hadoop-0.20.2-cdh3u3' from within [build]/hadoop (this uses the
-tutorial mentioned above).
-
-
Installing
==========
http://git-wip-us.apache.org/repos/asf/mesos/blob/c621ae05/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index 4dc1cc7..7c9adac 100644
--- a/configure.ac
+++ b/configure.ac
@@ -78,7 +78,6 @@ AC_CONFIG_SUBDIRS([3rdparty/libprocess])
AC_CONFIG_FILES([Makefile])
AC_CONFIG_FILES([ec2/Makefile])
-AC_CONFIG_FILES([hadoop/Makefile])
AC_CONFIG_FILES([jenkins/Makefile])
AC_CONFIG_FILES([jenkins/pom.xml])
AC_CONFIG_FILES([src/Makefile])
http://git-wip-us.apache.org/repos/asf/mesos/blob/c621ae05/hadoop/README.md
----------------------------------------------------------------------
diff --git a/hadoop/README.md b/hadoop/README.md
deleted file mode 100644
index 36d7eb4..0000000
--- a/hadoop/README.md
+++ /dev/null
@@ -1,128 +0,0 @@
-Hadoop on Mesos
----------------
-
-#### Overview ####
-
-To run _Hadoop on Mesos_ you need to add the `hadoop-mesos-0.0.1.jar`
-library to your Hadoop distribution (any distribution that supports
-`hadoop-core-1.2.0` should work) and set some new configuration
-properties. Read on for details.
-
-#### Build ####
-
-You can build `hadoop-mesos-0.0.1.jar` using Maven:
-
-```
-$ mvn package
-```
-
-If successful, the JAR will be at `target/hadoop-mesos-0.0.1.jar`.
-
-> NOTE: If you want to build against a different version of Mesos than
-> the default you'll need to update `mesos-version` in `pom.xml`.
-
-We plan to provide already built JARs at http://repository.apache.org
-in the near future!
-
-#### Package ####
-
-You'll need to download an existing Hadoop distribution. For this
-guide, we'll use [CDH4.2.1][CDH4.2.1]. First grab the tar archive and
-extract it.
-
-```
-$ wget http://archive.cloudera.com/cdh4/cdh/4/mr1-2.0.0-mr1-cdh4.2.1.tar.gz
-...
-$ tar zxf mr1-2.0.0-mr1-cdh4.2.1.tar.gz
-```
-
-> **Take note**, the extracted directory is `hadoop-2.0.0-mr1-cdh4.2.1`.
-
-Now copy `hadoop-mesos-0.0.1.jar` into the `lib` folder.
-
-```
-$ cp /path/to/hadoop-mesos-0.0.1.jar hadoop-2.0.0-mr1-cdh4.2.1/lib/
-```
-
-_That's it!_ You now have a _Hadoop on Mesos_ distribution!
-
-[CDH4.2.1]: http://www.cloudera.com/content/support/en/documentation/cdh4-documentation/cdh4-documentation-v4-2-1.html
-
-#### Upload ####
-
-You'll want to upload your _Hadoop on Mesos_ distribution somewhere
-that Mesos can access in order to launch each `TaskTracker`. For
-example, if you're already running HDFS:
-
-```
-$ tar czf hadoop-2.0.0-mr1-cdh4.2.1.tar.gz hadoop-2.0.0-mr1-cdh4.2.1
-$ hadoop fs -put hadoop-2.0.0-mr1-cdh4.2.1.tar.gz /hadoop-2.0.0-mr1-cdh4.2.1.tar.gz
-```
-
-> **Consider** any permissions issues with your uploaded location
-> (i.e., on HDFS you'll probably want to make the file world
-> readable).
-
-Now you'll need to configure your `JobTracker` to launch each
-`TaskTracker` on Mesos!
-
-#### Configure ####
-
-Along with the normal configuration properties you might want to set
-to launch a `JobTracker`, you'll need to set some Mesos specific ones
-too.
-
-Here are the mandatory configuration properties for
-`conf/mapred-site.xml` (initialized to values representative of
-running in [pseudo distributed
-operation](http://hadoop.apache.org/docs/stable/single_node_setup.html#PseudoDistributed):
-
-```
-<property>
- <name>mapred.job.tracker</name>
- <value>localhost:9001</value>
-</property>
-<property>
- <name>mapred.jobtracker.taskScheduler</name>
- <value>org.apache.hadoop.mapred.MesosScheduler</value>
-</property>
-<property>
- <name>mapred.mesos.taskScheduler</name>
- <value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value>
-</property>
-<property>
- <name>mapred.mesos.master</name>
- <value>localhost:5050</value>
-</property>
-<property>
- <name>mapred.mesos.executor</name>
- <value>hdfs://localhost:9000/hadoop-2.0.0-mr1-cdh4.2.1.tar.gz</value>
-</property>
-```
-
-#### Start ####
-
-Now you can start the `JobTracker` but you'll need to include the path
-to the Mesos native library.
-
-On Linux:
-
-```
-$ MESOS_NATIVE_LIBRARY=/path/to/libmesos.so hadoop jobtracker
-```
-
-And on OS X:
-
-```
-$ MESOS_NATIVE_LIBRARY=/path/to/libmesos.dylib hadoop jobtracker
-```
-
-> **NOTE: You do not need to worry about distributing your Hadoop
-> configuration! All of the configuration properties read by the**
-> `JobTracker` **along with any necessary** `TaskTracker` **specific
-> _overrides_ will get serialized and passed to each** `TaskTracker`
-> **on startup.**
-
-_Please email user@mesos.apache.org with questions!_
-
-----------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c621ae05/hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop/pom.xml b/hadoop/pom.xml
deleted file mode 100644
index baf3e5b..0000000
--- a/hadoop/pom.xml
+++ /dev/null
@@ -1,110 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>org.apache.mesos</groupId>
- <artifactId>hadoop-mesos</artifactId>
- <version>0.0.1</version>
-
- <properties>
- <encoding>UTF-8</encoding>
-
- <!-- language versions -->
- <java.abi>1.6</java.abi>
-
- <!-- runtime deps versions -->
- <commons-logging.version>1.1.1</commons-logging.version>
- <hadoop-core.version>1.2.0</hadoop-core.version>
- <mesos.version>0.14.0</mesos.version>
- <protobuf.version>2.4.1</protobuf.version>
-
- <!-- test deps versions -->
- <junit.version>4.11</junit.version>
- <mockito.version>1.9.5</mockito.version>
- </properties>
-
- <!-- TODO(benh): Remove repository once we've published a JDK6 JAR. -->
- <repositories>
- <repository>
- <id>mesosphere-public-repo</id>
- <name>Mesosphere Public Repo</name>
- <url>http://s3.amazonaws.com/mesosphere-maven-public/</url>
- </repository>
- </repositories>
-
- <dependencies>
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- <version>${commons-logging.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>${hadoop-core.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.mesos</groupId>
- <!-- TODO(benh): Use 'mesos' after we've published a JDK6 JAR. -->
- <artifactId>mesos_jdk6</artifactId>
- <version>${mesos.version}</version>
- </dependency>
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>${protobuf.version}</version>
- </dependency>
-
- <!-- Test scope -->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>${junit.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>${java.abi}</source>
- <target>${java.abi}</target>
- <showDeprecation>true</showDeprecation>
- <showWarnings>true</showWarnings>
- </configuration>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.0</version>
- <configuration>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
- </build>
-
-</project>
http://git-wip-us.apache.org/repos/asf/mesos/blob/c621ae05/hadoop/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java b/hadoop/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java
deleted file mode 100644
index ccf0090..0000000
--- a/hadoop/src/main/java/org/apache/hadoop/mapred/MesosExecutor.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.mesos.Executor;
-import org.apache.mesos.ExecutorDriver;
-import org.apache.mesos.MesosExecutorDriver;
-import org.apache.mesos.Protos.Environment.Variable;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.FrameworkInfo;
-import org.apache.mesos.Protos.SlaveInfo;
-import org.apache.mesos.Protos.Status;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.TaskState;
-import org.apache.mesos.Protos.TaskStatus;
-
-public class MesosExecutor implements Executor {
- public static final Log LOG = LogFactory.getLog(MesosExecutor.class);
-
- private JobConf conf;
- private TaskTracker taskTracker;
-
- @Override
- public void registered(ExecutorDriver driver, ExecutorInfo executorInfo,
- FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) {
- LOG.info("Executor registered with the slave");
-
- conf = new JobConf();
-
- // Get TaskTracker's config options from environment variables set by the
- // JobTracker.
- if (executorInfo.getCommand().hasEnvironment()) {
- for (Variable variable : executorInfo.getCommand().getEnvironment()
- .getVariablesList()) {
- LOG.info("Setting config option : " + variable.getName() + " to "
- + variable.getValue());
- conf.set(variable.getName(), variable.getValue());
- }
- }
-
- // Get hostname from Mesos to make sure we match what it reports
- // to the JobTracker.
- conf.set("slave.host.name", slaveInfo.getHostname());
-
- // Set the mapred.local directory inside the executor sandbox, so that
- // different TaskTrackers on the same host do not step on each other.
- conf.set("mapred.local.dir", System.getProperty("user.dir") + "/mapred");
- }
-
- @Override
- public void launchTask(final ExecutorDriver driver, final TaskInfo task) {
- LOG.info("Launching task : " + task.getTaskId().getValue());
-
- // NOTE: We need to manually set the context class loader here because,
- // the TaskTracker is unable to find LoginModule class otherwise.
- Thread.currentThread().setContextClassLoader(
- TaskTracker.class.getClassLoader());
-
- try {
- taskTracker = new TaskTracker(conf);
- } catch (IOException e) {
- LOG.fatal("Failed to start TaskTracker", e);
- System.exit(1);
- } catch (InterruptedException e) {
- LOG.fatal("Failed to start TaskTracker", e);
- System.exit(1);
- }
-
- // Spin up a TaskTracker in a new thread.
- new Thread("TaskTracker Run Thread") {
- @Override
- public void run() {
- taskTracker.run();
-
- // Send a TASK_FINISHED status update.
- // We do this here because we want to send it in a separate thread
- // than was used to call killTask().
- driver.sendStatusUpdate(TaskStatus.newBuilder()
- .setTaskId(task.getTaskId())
- .setState(TaskState.TASK_FINISHED)
- .build());
-
- // Give some time for the update to reach the slave.
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOG.error("Failed to sleep TaskTracker thread", e);
- }
-
- // Stop the executor.
- driver.stop();
- }
- }.start();
-
- driver.sendStatusUpdate(TaskStatus.newBuilder()
- .setTaskId(task.getTaskId())
- .setState(TaskState.TASK_RUNNING).build());
- }
-
- @Override
- public void killTask(ExecutorDriver driver, TaskID taskId) {
- LOG.info("Killing task : " + taskId.getValue());
- try {
- taskTracker.shutdown();
- } catch (IOException e) {
- LOG.error("Failed to shutdown TaskTracker", e);
- } catch (InterruptedException e) {
- LOG.error("Failed to shutdown TaskTracker", e);
- }
- }
-
- @Override
- public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) {
- LOG.info("Executor reregistered with the slave");
- }
-
- @Override
- public void disconnected(ExecutorDriver driver) {
- LOG.info("Executor disconnected from the slave");
- }
-
- @Override
- public void frameworkMessage(ExecutorDriver d, byte[] msg) {
- LOG.info("Executor received framework message of length: " + msg.length
- + " bytes");
- }
-
- @Override
- public void error(ExecutorDriver d, String message) {
- LOG.error("MesosExecutor.error: " + message);
- }
-
- @Override
- public void shutdown(ExecutorDriver d) {
- LOG.info("Executor asked to shutdown");
- }
-
- public static void main(String[] args) {
- MesosExecutorDriver driver = new MesosExecutorDriver(new MesosExecutor());
- System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1);
- }
-}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c621ae05/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
deleted file mode 100644
index 0332dea..0000000
--- a/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
+++ /dev/null
@@ -1,845 +0,0 @@
-package org.apache.hadoop.mapred;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.Iterator;
-
-import org.apache.commons.httpclient.HttpHost;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.mesos.MesosSchedulerDriver;
-import org.apache.mesos.Protos;
-import org.apache.mesos.Protos.CommandInfo;
-import org.apache.mesos.Protos.ExecutorID;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.FrameworkID;
-import org.apache.mesos.Protos.FrameworkInfo;
-import org.apache.mesos.Protos.MasterInfo;
-import org.apache.mesos.Protos.Offer;
-import org.apache.mesos.Protos.OfferID;
-import org.apache.mesos.Protos.Resource;
-import org.apache.mesos.Protos.SlaveID;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.TaskState;
-import org.apache.mesos.Protos.Value;
-import org.apache.mesos.Scheduler;
-import org.apache.mesos.SchedulerDriver;
-
-import static org.apache.hadoop.util.StringUtils.join;
-
-public class MesosScheduler extends TaskScheduler implements Scheduler {
- public static final Log LOG = LogFactory.getLog(MesosScheduler.class);
-
- private SchedulerDriver driver;
- private TaskScheduler taskScheduler;
- private JobTracker jobTracker;
- private Configuration conf;
-
- // This is the memory overhead for a jvm process. This needs to be added
- // to a jvm process's resource requirement, in addition to its heap size.
- private static final double JVM_MEM_OVERHEAD_PERCENT_DEFAULT = 0.1; // 10%.
-
- // TODO(vinod): Consider parsing the slot memory from the configuration jvm
- // heap options (e.g: mapred.child.java.opts).
-
- // NOTE: It appears that there's no real resource requirements for a
- // map / reduce slot. We therefore define a default slot as:
- // 0.2 cores.
- // 512 MB memory.
- // 1 GB of disk space.
- private static final double SLOT_CPUS_DEFAULT = 0.2; // 0.2 cores.
- private static final int SLOT_DISK_DEFAULT = 1024; // 1 GB.
- private static final int SLOT_JVM_HEAP_DEFAULT = 256; // 256MB.
-
- private static final double TASKTRACKER_CPUS = 1.0; // 1 core.
- private static final int TASKTRACKER_MEM_DEFAULT = 1024; // 1 GB.
-
- // The default behavior in Hadoop is to use 4 slots per TaskTracker:
- private static final int MAP_SLOTS_DEFAULT = 2;
- private static final int REDUCE_SLOTS_DEFAULT = 2;
-
- // The amount of time to wait for task trackers to launch before
- // giving up.
- private static final long LAUNCH_TIMEOUT_MS = 300000; // 5 minutes
-
- // Count of the launched trackers for TaskID generation.
- private long launchedTrackers = 0;
-
- // Maintains a mapping from {tracker host:port -> MesosTracker}.
- // Used for tracking the slots of each TaskTracker and the corresponding
- // Mesos TaskID.
- private Map<HttpHost, MesosTracker> mesosTrackers =
- new HashMap<HttpHost, MesosTracker>();
-
- private JobInProgressListener jobListener = new JobInProgressListener() {
- @Override
- public void jobAdded(JobInProgress job) throws IOException {
- LOG.info("Added job " + job.getJobID());
- }
-
- @Override
- public void jobRemoved(JobInProgress job) {
- LOG.info("Removed job " + job.getJobID());
- }
-
- @Override
- public void jobUpdated(JobChangeEvent event) {
- synchronized (MesosScheduler.this) {
- JobInProgress job = event.getJobInProgress();
-
- // If the job is complete, kill all the corresponding idle TaskTrackers.
- if (!job.isComplete()) {
- return;
- }
-
- LOG.info("Completed job : " + job.getJobID());
-
- List<TaskInProgress> completed = new ArrayList<TaskInProgress>();
-
- // Map tasks.
- completed.addAll(job.reportTasksInProgress(true, true));
-
- // Reduce tasks.
- completed.addAll(job.reportTasksInProgress(false, true));
-
- for (TaskInProgress task : completed) {
- // Check that this task actually belongs to this job
- if (task.getJob().getJobID() != job.getJobID()) {
- continue;
- }
-
- for (TaskStatus status : task.getTaskStatuses()) {
- // Make a copy to iterate over keys and delete values.
- Set<HttpHost> trackers = new HashSet<HttpHost>(
- mesosTrackers.keySet());
-
- // Remove the task from the map.
- for (HttpHost tracker : trackers) {
- MesosTracker mesosTracker = mesosTrackers.get(tracker);
-
- if (!mesosTracker.active) {
- LOG.warn("Ignoring TaskTracker: " + tracker
- + " because it might not have sent a hearbeat");
- continue;
- }
-
- LOG.info("Removing completed task : " + status.getTaskID()
- + " of tracker " + status.getTaskTracker());
-
- mesosTracker.hadoopJobs.remove(job.getJobID());
-
- // If the TaskTracker doesn't have any running tasks, kill it.
- if (mesosTracker.hadoopJobs.isEmpty()) {
- LOG.info("Killing Mesos task: " + mesosTracker.taskId + " on host "
- + mesosTracker.host);
-
- driver.killTask(mesosTracker.taskId);
- mesosTracker.timer.cancel();
- mesosTrackers.remove(tracker);
- }
- }
- }
- }
- }
- }
- };
-
- public MesosScheduler() {}
-
- // TaskScheduler methods.
- @Override
- public synchronized void start() throws IOException {
- conf = getConf();
- String taskTrackerClass = conf.get("mapred.mesos.taskScheduler",
- "org.apache.hadoop.mapred.JobQueueTaskScheduler");
-
- try {
- taskScheduler =
- (TaskScheduler) Class.forName(taskTrackerClass).newInstance();
- taskScheduler.setConf(conf);
- taskScheduler.setTaskTrackerManager(taskTrackerManager);
- } catch (ClassNotFoundException e) {
- LOG.fatal("Failed to initialize the TaskScheduler", e);
- System.exit(1);
- } catch (InstantiationException e) {
- LOG.fatal("Failed to initialize the TaskScheduler", e);
- System.exit(1);
- } catch (IllegalAccessException e) {
- LOG.fatal("Failed to initialize the TaskScheduler", e);
- System.exit(1);
- }
-
- // Add the job listener to get job related updates.
- taskTrackerManager.addJobInProgressListener(jobListener);
-
- LOG.info("Starting MesosScheduler");
- jobTracker = (JobTracker) super.taskTrackerManager;
-
- String master = conf.get("mapred.mesos.master", "local");
-
- try {
- FrameworkInfo frameworkInfo = FrameworkInfo
- .newBuilder()
- .setUser("")
- .setCheckpoint(conf.getBoolean("mapred.mesos.checkpoint", false))
- .setName("Hadoop: (RPC port: " + jobTracker.port + ","
- + " WebUI port: " + jobTracker.infoPort + ")").build();
-
- driver = new MesosSchedulerDriver(this, frameworkInfo, master);
- driver.start();
- } catch (Exception e) {
- // If the MesosScheduler can't be loaded, the JobTracker won't be useful
- // at all, so crash it now so that the user notices.
- LOG.fatal("Failed to start MesosScheduler", e);
- System.exit(1);
- }
-
- taskScheduler.start();
- }
-
- @Override
- public synchronized void terminate() throws IOException {
- try {
- LOG.info("Stopping MesosScheduler");
- driver.stop();
- } catch (Exception e) {
- LOG.error("Failed to stop Mesos scheduler", e);
- }
-
- taskScheduler.terminate();
- }
-
- @Override
- public synchronized List<Task> assignTasks(TaskTracker taskTracker)
- throws IOException {
- HttpHost tracker = new HttpHost(taskTracker.getStatus().getHost(),
- taskTracker.getStatus().getHttpPort());
-
- if (!mesosTrackers.containsKey(tracker)) {
- // TODO(bmahler): Consider allowing non-Mesos TaskTrackers.
- LOG.info("Unknown/exited TaskTracker: " + tracker + ". ");
- return null;
- }
-
- // Let the underlying task scheduler do the actual task scheduling.
- List<Task> tasks = taskScheduler.assignTasks(taskTracker);
-
- // The Hadoop Fair Scheduler is known to return null.
- if (tasks != null) {
- // Keep track of which TaskTracker contains which tasks.
- for (Task task : tasks) {
- mesosTrackers.get(tracker).hadoopJobs.add(task.getJobID());
- }
- }
-
- return tasks;
- }
-
- @Override
- public synchronized Collection<JobInProgress> getJobs(String queueName) {
- return taskScheduler.getJobs(queueName);
- }
-
- @Override
- public synchronized void refresh() throws IOException {
- taskScheduler.refresh();
- }
-
- // Mesos Scheduler methods.
- // These are synchronized, where possible. Some of these methods need to access the
- // JobTracker, which can lead to a deadlock:
- // See: https://issues.apache.org/jira/browse/MESOS-429
- // The workaround employed here is to unsynchronize those methods needing access to
- // the JobTracker state and use explicit synchronization instead as appropriate.
- // TODO(bmahler): Provide a cleaner solution to this issue. One solution is to
- // split up the Scheduler and TaskScheduler implementations in order to break the
- // locking cycle. This would require a synchronized class to store the shared
- // state across our Scheduler and TaskScheduler implementations, and provide
- // atomic operations as needed.
- @Override
- public synchronized void registered(SchedulerDriver schedulerDriver,
- FrameworkID frameworkID, MasterInfo masterInfo) {
- LOG.info("Registered as " + frameworkID.getValue()
- + " with master " + masterInfo);
- }
-
- @Override
- public synchronized void reregistered(SchedulerDriver schedulerDriver,
- MasterInfo masterInfo) {
- LOG.info("Re-registered with master " + masterInfo);
- }
-
- public synchronized void killTracker(MesosTracker tracker) {
- driver.killTask(tracker.taskId);
- mesosTrackers.remove(tracker.host);
- }
-
- // For some reason, pendingMaps() and pendingReduces() doesn't return the
- // values we expect. We observed negative values, which may be related to
- // https://issues.apache.org/jira/browse/MAPREDUCE-1238. Below is the
- // algorithm that is used to calculate the pending tasks within the Hadoop
- // JobTracker sources (see 'printTaskSummary' in
- // src/org/apache/hadoop/mapred/jobdetails_jsp.java).
- private int getPendingTasks(TaskInProgress[] tasks) {
- int totalTasks = tasks.length;
- int runningTasks = 0;
- int finishedTasks = 0;
- int killedTasks = 0;
- for (int i = 0; i < totalTasks; ++i) {
- TaskInProgress task = tasks[i];
- if (task == null) {
- continue;
- }
- if (task.isComplete()) {
- finishedTasks += 1;
- } else if (task.isRunning()) {
- runningTasks += 1;
- } else if (task.wasKilled()) {
- killedTasks += 1;
- }
- }
- int pendingTasks = totalTasks - runningTasks - killedTasks - finishedTasks;
- return pendingTasks;
- }
-
- // This method uses explicit synchronization in order to avoid deadlocks when
- // accessing the JobTracker.
- @Override
- public void resourceOffers(SchedulerDriver schedulerDriver,
- List<Offer> offers) {
- // Before synchronizing, we pull all needed information from the JobTracker.
- final HttpHost jobTrackerAddress =
- new HttpHost(jobTracker.getHostname(), jobTracker.getTrackerPort());
-
- final Collection<TaskTrackerStatus> taskTrackers = jobTracker.taskTrackers();
-
- final List<JobInProgress> jobsInProgress = new ArrayList<JobInProgress>();
- for (JobStatus status : jobTracker.jobsToComplete()) {
- jobsInProgress.add(jobTracker.getJob(status.getJobID()));
- }
-
- synchronized (this) {
- // Compute the number of pending maps and reduces.
- int pendingMaps = 0;
- int pendingReduces = 0;
- for (JobInProgress progress : jobsInProgress) {
- // JobStatus.pendingMaps/Reduces may return the wrong value on
- // occasion. This seems to be safer.
- pendingMaps += getPendingTasks(progress.getTasks(TaskType.MAP));
- pendingReduces += getPendingTasks(progress.getTasks(TaskType.REDUCE));
- }
-
- // Mark active (heartbeated) TaskTrackers and compute idle slots.
- int idleMapSlots = 0;
- int idleReduceSlots = 0;
- int unhealthyTrackers = 0;
-
- for (TaskTrackerStatus status : taskTrackers) {
- if (!status.getHealthStatus().isNodeHealthy()) {
- // Skip this node if it's unhealthy.
- ++unhealthyTrackers;
- continue;
- }
-
- HttpHost host = new HttpHost(status.getHost(), status.getHttpPort());
- if (mesosTrackers.containsKey(host)) {
- mesosTrackers.get(host).active = true;
- mesosTrackers.get(host).timer.cancel();
- idleMapSlots += status.getAvailableMapSlots();
- idleReduceSlots += status.getAvailableReduceSlots();
- }
- }
-
- // Consider the TaskTrackers that have yet to become active as being idle,
- // otherwise we will launch excessive TaskTrackers.
- int inactiveMapSlots = 0;
- int inactiveReduceSlots = 0;
- for (MesosTracker tracker : mesosTrackers.values()) {
- if (!tracker.active) {
- inactiveMapSlots += tracker.mapSlots;
- inactiveReduceSlots += tracker.reduceSlots;
- }
- }
-
- // To ensure Hadoop jobs begin promptly, we can specify a minimum number
- // of 'hot slots' to be available for use. This addresses the
- // TaskTracker spin up delay that exists with Hadoop on Mesos. This can
- // be a nuisance with lower latency applications, such as ad-hoc Hive
- // queries.
- int minimumMapSlots = conf.getInt("mapred.mesos.total.map.slots.minimum", 0);
- int minimumReduceSlots =
- conf.getInt("mapred.mesos.total.reduce.slots.minimum", 0);
-
- // Compute how many slots we need to allocate.
- int neededMapSlots = Math.max(
- minimumMapSlots - (idleMapSlots + inactiveMapSlots),
- pendingMaps - (idleMapSlots + inactiveMapSlots));
- int neededReduceSlots = Math.max(
- minimumReduceSlots - (idleReduceSlots + inactiveReduceSlots),
- pendingReduces - (idleReduceSlots + inactiveReduceSlots));
-
- LOG.info(join("\n", Arrays.asList(
- "JobTracker Status",
- " Pending Map Tasks: " + pendingMaps,
- " Pending Reduce Tasks: " + pendingReduces,
- " Idle Map Slots: " + idleMapSlots,
- " Idle Reduce Slots: " + idleReduceSlots,
- " Inactive Map Slots: " + inactiveMapSlots
- + " (launched but no hearbeat yet)",
- " Inactive Reduce Slots: " + inactiveReduceSlots
- + " (launched but no hearbeat yet)",
- " Needed Map Slots: " + neededMapSlots,
- " Needed Reduce Slots: " + neededReduceSlots,
- " Unhealthy Trackers: " + unhealthyTrackers)));
-
- // Launch TaskTrackers to satisfy the slot requirements.
- // TODO(bmahler): Consider slotting intelligently.
- // Ex: If more map slots are needed, but no reduce slots are needed,
- // launch a map-only TaskTracker to better satisfy the slot needs.
- for (Offer offer : offers) {
- if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
- driver.declineOffer(offer.getId());
- continue;
- }
-
- // Ensure these values aren't < 0.
- neededMapSlots = Math.max(0, neededMapSlots);
- neededReduceSlots = Math.max(0, neededReduceSlots);
-
- double cpus = -1.0;
- double mem = -1.0;
- double disk = -1.0;
- Set<Integer> ports = new HashSet<Integer>();
-
- // Pull out the cpus, memory, disk, and 2 ports from the offer.
- for (Resource resource : offer.getResourcesList()) {
- if (resource.getName().equals("cpus")
- && resource.getType() == Value.Type.SCALAR) {
- cpus = resource.getScalar().getValue();
- } else if (resource.getName().equals("mem")
- && resource.getType() == Value.Type.SCALAR) {
- mem = resource.getScalar().getValue();
- } else if (resource.getName().equals("disk")
- && resource.getType() == Value.Type.SCALAR) {
- disk = resource.getScalar().getValue();
- } else if (resource.getName().equals("ports")
- && resource.getType() == Value.Type.RANGES) {
- for (Value.Range range : resource.getRanges().getRangeList()) {
- Integer begin = (int)range.getBegin();
- Integer end = (int)range.getEnd();
- if (end < begin) {
- LOG.warn("Ignoring invalid port range: begin=" + begin + " end=" + end);
- continue;
- }
- while (begin <= end && ports.size() < 2) {
- ports.add(begin);
- begin += 1;
- }
- }
- }
- }
-
- int mapSlotsMax = conf.getInt("mapred.tasktracker.map.tasks.maximum",
- MAP_SLOTS_DEFAULT);
- int reduceSlotsMax =
- conf.getInt("mapred.tasktracker.reduce.tasks.maximum",
- REDUCE_SLOTS_DEFAULT);
-
- // What's the minimum number of map and reduce slots we should try to
- // launch?
- long mapSlots = 0;
- long reduceSlots = 0;
-
- double slotCpus = conf.getFloat("mapred.mesos.slot.cpus",
- (float) SLOT_CPUS_DEFAULT);
- double slotDisk = conf.getInt("mapred.mesos.slot.disk",
- SLOT_DISK_DEFAULT);
-
- int slotMem = conf.getInt("mapred.mesos.slot.mem",
- SLOT_JVM_HEAP_DEFAULT);
- long slotJVMHeap = Math.round((double)slotMem -
- (JVM_MEM_OVERHEAD_PERCENT_DEFAULT * slotMem));
-
- int tasktrackerMem = conf.getInt("mapred.mesos.tasktracker.mem",
- TASKTRACKER_MEM_DEFAULT);
- long tasktrackerJVMHeap = Math.round((double)tasktrackerMem -
- (JVM_MEM_OVERHEAD_PERCENT_DEFAULT * tasktrackerMem));
-
- // Minimum resource requirements for the container (TaskTracker + map/red
- // tasks).
- double containerCpus = TASKTRACKER_CPUS;
- double containerMem = tasktrackerMem;
- double containerDisk = 0;
-
- // Determine how many slots we can allocate.
- int slots = mapSlotsMax + reduceSlotsMax;
- slots = (int)Math.min(slots, (cpus - containerCpus) / slotCpus);
- slots = (int)Math.min(slots, (mem - containerMem) / slotMem);
- slots = (int)Math.min(slots, (disk - containerDisk) / slotDisk);
-
- // Is this offer too small for even the minimum slots?
- if (slots < 1 || ports.size() < 2) {
- LOG.info(join("\n", Arrays.asList(
- "Declining offer with insufficient resources for a TaskTracker: ",
- " cpus: offered " + cpus + " needed " + containerCpus,
- " mem : offered " + mem + " needed " + containerMem,
- " disk: offered " + disk + " needed " + containerDisk,
- " ports: " + (ports.size() < 2
- ? " less than 2 offered"
- : " at least 2 (sufficient)"))));
-
- driver.declineOffer(offer.getId());
- continue;
- }
-
- // Is the number of slots we need sufficiently small? If so, we can
- // allocate exactly the number we need.
- if (slots >= neededMapSlots + neededReduceSlots && neededMapSlots <
- mapSlotsMax && neededReduceSlots < reduceSlotsMax) {
- mapSlots = neededMapSlots;
- reduceSlots = neededReduceSlots;
- } else {
- // Allocate slots fairly for this resource offer.
- double mapFactor = (double)neededMapSlots / (neededMapSlots + neededReduceSlots);
- double reduceFactor = (double)neededReduceSlots / (neededMapSlots + neededReduceSlots);
- // To avoid map/reduce slot starvation, don't allow more than 50%
- // spread between map/reduce slots when we need both mappers and
- // reducers.
- if (neededMapSlots > 0 && neededReduceSlots > 0) {
- if (mapFactor < 0.25) {
- mapFactor = 0.25;
- } else if (mapFactor > 0.75) {
- mapFactor = 0.75;
- }
- if (reduceFactor < 0.25) {
- reduceFactor = 0.25;
- } else if (reduceFactor > 0.75) {
- reduceFactor = 0.75;
- }
- }
- mapSlots = Math.min(Math.min((long)(mapFactor * slots), mapSlotsMax), neededMapSlots);
- // The remaining slots are allocated for reduces.
- slots -= mapSlots;
- reduceSlots = Math.min(Math.min(slots, reduceSlotsMax), neededReduceSlots);
- }
-
- Iterator<Integer> portIter = ports.iterator();
- HttpHost httpAddress = new HttpHost(offer.getHostname(), portIter.next());
- HttpHost reportAddress = new HttpHost(offer.getHostname(), portIter.next());
-
- // Check that this tracker is not already launched. This problem was
- // observed on a few occasions, but not reliably. The main symptom was
- // that entries in `mesosTrackers` were being lost, and task trackers
- // would be 'lost' mysteriously (probably because the ports were in
- // use). This problem has since gone away with a rewrite of the port
- // selection code, but the check + logging is left here.
- // TODO(brenden): Diagnose this to determine root cause.
-
- if (mesosTrackers.containsKey(httpAddress)) {
- LOG.info(join("\n", Arrays.asList(
- "Declining offer because host/port combination is in use: ",
- " cpus: offered " + cpus + " needed " + containerCpus,
- " mem : offered " + mem + " needed " + containerMem,
- " disk: offered " + disk + " needed " + containerDisk,
- " ports: " + ports)));
-
- driver.declineOffer(offer.getId());
- continue;
- }
-
- TaskID taskId = TaskID.newBuilder()
- .setValue("Task_Tracker_" + launchedTrackers++).build();
-
- LOG.info("Launching task " + taskId.getValue() + " on "
- + httpAddress.toString() + " with mapSlots=" + mapSlots + " reduceSlots=" + reduceSlots);
-
- // Add this tracker to Mesos tasks.
- mesosTrackers.put(httpAddress, new MesosTracker(httpAddress, taskId,
- mapSlots, reduceSlots, this));
-
- // Create the environment depending on whether the executor is going to be
- // run locally.
- // TODO(vinod): Do not pass the mapred config options as environment
- // variables.
- Protos.Environment.Builder envBuilder = Protos.Environment
- .newBuilder()
- .addVariables(
- Protos.Environment.Variable
- .newBuilder()
- .setName("mapred.job.tracker")
- .setValue(jobTrackerAddress.getHostName() + ':'
- + jobTrackerAddress.getPort()))
- .addVariables(
- Protos.Environment.Variable
- .newBuilder()
- .setName("mapred.task.tracker.http.address")
- .setValue(
- httpAddress.getHostName() + ':' + httpAddress.getPort()))
- .addVariables(
- Protos.Environment.Variable
- .newBuilder()
- .setName("mapred.task.tracker.report.address")
- .setValue(reportAddress.getHostName() + ':'
- + reportAddress.getPort()))
- .addVariables(
- Protos.Environment.Variable.newBuilder()
- .setName("mapred.map.child.java.opts")
- .setValue("-Xmx" + slotJVMHeap + "m"))
- .addVariables(
- Protos.Environment.Variable.newBuilder()
- .setName("mapred.reduce.child.java.opts")
- .setValue("-Xmx" + slotJVMHeap + "m"))
- .addVariables(
- Protos.Environment.Variable.newBuilder()
- .setName("HADOOP_HEAPSIZE")
- .setValue("" + tasktrackerJVMHeap))
- .addVariables(
- Protos.Environment.Variable.newBuilder()
- .setName("mapred.tasktracker.map.tasks.maximum")
- .setValue("" + mapSlots))
- .addVariables(
- Protos.Environment.Variable.newBuilder()
- .setName("mapred.tasktracker.reduce.tasks.maximum")
- .setValue("" + reduceSlots));
-
- // Set java specific environment, appropriately.
- Map<String, String> env = System.getenv();
- if (env.containsKey("JAVA_HOME")) {
- envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
- .setName("JAVA_HOME")
- .setValue(env.get("JAVA_HOME")));
- }
-
- if (env.containsKey("JAVA_LIBRARY_PATH")) {
- envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
- .setName("JAVA_LIBRARY_PATH")
- .setValue(env.get("JAVA_LIBRARY_PATH")));
- }
-
- // Command info differs when performing a local run.
- CommandInfo commandInfo = null;
- String master = conf.get("mapred.mesos.master", "local");
-
- if (master.equals("local")) {
- commandInfo = CommandInfo.newBuilder()
- .setEnvironment(envBuilder)
- .setValue(new File("bin/hadoop").getCanonicalPath() +
- " org.apache.hadoop.mapred.MesosExecutor")
- .build();
- } else {
- String uri = conf.get("mapred.mesos.executor");
- commandInfo = CommandInfo.newBuilder()
- .setEnvironment(envBuilder)
- .setValue("cd hadoop-* && " +
- "./bin/hadoop org.apache.hadoop.mapred.MesosExecutor")
- .addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
- }
-
- TaskInfo info = TaskInfo
- .newBuilder()
- .setName(taskId.getValue())
- .setTaskId(taskId)
- .setSlaveId(offer.getSlaveId())
- .addResources(
- Resource
- .newBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (mapSlots + reduceSlots) * slotCpus)))
- .addResources(
- Resource
- .newBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (mapSlots + reduceSlots) * slotMem)))
- .addResources(
- Resource
- .newBuilder()
- .setName("disk")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (mapSlots + reduceSlots) * slotDisk)))
- .addResources(
- Resource
- .newBuilder()
- .setName("ports")
- .setType(Value.Type.RANGES)
- .setRanges(
- Value.Ranges
- .newBuilder()
- .addRange(Value.Range.newBuilder()
- .setBegin(httpAddress.getPort())
- .setEnd(httpAddress.getPort()))
- .addRange(Value.Range.newBuilder()
- .setBegin(reportAddress.getPort())
- .setEnd(reportAddress.getPort()))))
- .setExecutor(
- ExecutorInfo
- .newBuilder()
- .setExecutorId(ExecutorID.newBuilder().setValue(
- "executor_" + taskId.getValue()))
- .setName("Hadoop TaskTracker")
- .setSource(taskId.getValue())
- .addResources(
- Resource
- .newBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (TASKTRACKER_CPUS))))
- .addResources(
- Resource
- .newBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(
- (tasktrackerMem)))).setCommand(commandInfo))
- .build();
-
- driver.launchTasks(offer.getId(), Arrays.asList(info));
-
- neededMapSlots -= mapSlots;
- neededReduceSlots -= reduceSlots;
- }
-
- if (neededMapSlots <= 0 && neededReduceSlots <= 0) {
- LOG.info("Satisfied map and reduce slots needed.");
- } else {
- LOG.info("Unable to fully satisfy needed map/reduce slots: "
- + (neededMapSlots > 0 ? neededMapSlots + " map slots " : "")
- + (neededReduceSlots > 0 ? neededReduceSlots + " reduce slots " : "")
- + "remaining");
- }
- }
- }
-
- @Override
- public synchronized void offerRescinded(SchedulerDriver schedulerDriver,
- OfferID offerID) {
- LOG.warn("Rescinded offer: " + offerID.getValue());
- }
-
- @Override
- public synchronized void statusUpdate(SchedulerDriver schedulerDriver,
- Protos.TaskStatus taskStatus) {
- LOG.info("Status update of " + taskStatus.getTaskId().getValue()
- + " to " + taskStatus.getState().name()
- + " with message " + taskStatus.getMessage());
-
- // Remove the TaskTracker if the corresponding Mesos task has reached a
- // terminal state.
- switch (taskStatus.getState()) {
- case TASK_FINISHED:
- case TASK_FAILED:
- case TASK_KILLED:
- case TASK_LOST:
- // Make a copy to iterate over keys and delete values.
- Set<HttpHost> trackers = new HashSet<HttpHost>(mesosTrackers.keySet());
-
- // Remove the task from the map.
- for (HttpHost tracker : trackers) {
- if (mesosTrackers.get(tracker).taskId.equals(taskStatus.getTaskId())) {
- LOG.info("Removing terminated TaskTracker: " + tracker);
- mesosTrackers.get(tracker).timer.cancel();
- mesosTrackers.remove(tracker);
- }
- }
- break;
- case TASK_STAGING:
- case TASK_STARTING:
- case TASK_RUNNING:
- break;
- default:
- LOG.error("Unexpected TaskStatus: " + taskStatus.getState().name());
- break;
- }
- }
-
- @Override
- public synchronized void frameworkMessage(SchedulerDriver schedulerDriver,
- ExecutorID executorID, SlaveID slaveID, byte[] bytes) {
- LOG.info("Framework Message of " + bytes.length + " bytes"
- + " from executor " + executorID.getValue()
- + " on slave " + slaveID.getValue());
- }
-
- @Override
- public synchronized void disconnected(SchedulerDriver schedulerDriver) {
- LOG.warn("Disconnected from Mesos master.");
- }
-
- @Override
- public synchronized void slaveLost(SchedulerDriver schedulerDriver,
- SlaveID slaveID) {
- LOG.warn("Slave lost: " + slaveID.getValue());
- }
-
- @Override
- public synchronized void executorLost(SchedulerDriver schedulerDriver,
- ExecutorID executorID, SlaveID slaveID, int status) {
- LOG.warn("Executor " + executorID.getValue()
- + " lost with status " + status + " on slave " + slaveID);
- }
-
- @Override
- public synchronized void error(SchedulerDriver schedulerDriver, String s) {
- LOG.error("Error from scheduler driver: " + s);
- }
-
- /**
- * Used to track the our launched TaskTrackers.
- */
- private class MesosTracker {
- public volatile HttpHost host;
- public TaskID taskId;
- public long mapSlots;
- public long reduceSlots;
- public volatile boolean active = false; // Set once tracked by the JobTracker.
- public Timer timer;
- public volatile MesosScheduler scheduler;
-
- // Tracks Hadoop job tasks running on the tracker.
- public Set<JobID> hadoopJobs = new HashSet<JobID>();
-
- public MesosTracker(HttpHost host, TaskID taskId, long mapSlots,
- long reduceSlots, MesosScheduler scheduler) {
- this.host = host;
- this.taskId = taskId;
- this.mapSlots = mapSlots;
- this.reduceSlots = reduceSlots;
- this.scheduler = scheduler;
-
- this.timer = new Timer();
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- synchronized (MesosTracker.this.scheduler) {
- // If the tracker activated while we were awaiting to acquire the
- // lock, return.
- if (MesosTracker.this.active) return;
-
- LOG.warn("Tracker " + MesosTracker.this.host + " failed to launch within " +
- LAUNCH_TIMEOUT_MS / 1000 + " seconds, killing it");
- MesosTracker.this.scheduler.killTracker(MesosTracker.this);
- }
- }
- }, LAUNCH_TIMEOUT_MS);
- }
- }
-}
[2/5] git commit: Refactored Hadoop on Mesos from contrib to JAR.
Posted by be...@apache.org.
Refactored Hadoop on Mesos from contrib to JAR.
Review: https://reviews.apache.org/r/13225
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/25cb6f91
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/25cb6f91
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/25cb6f91
Branch: refs/heads/master
Commit: 25cb6f91862b34d1f471e9ef027a2b7123b10d20
Parents: f6d95e8
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Aug 1 21:40:11 2013 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Aug 2 14:25:30 2013 -0700
----------------------------------------------------------------------
hadoop/HadoopPipes.cc.patch | 10 -
hadoop/Makefile.am | 139 ---
hadoop/NOTES | 6 -
hadoop/README.md | 54 ++
hadoop/TUTORIAL.sh | 710 ----------------
hadoop/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch | 14 -
hadoop/hadoop-0.20.2-cdh3u3_mesos.patch | 38 -
hadoop/hadoop-0.20.2-cdh3u5_hadoop-env.sh.patch | 14 -
hadoop/hadoop-0.20.2-cdh3u5_mesos.patch | 38 -
hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch | 14 -
hadoop/hadoop-0.20.205.0_mesos.patch | 22 -
...adoop-2.0.0-mr1-cdh4.1.2_hadoop-env.sh.patch | 14 -
hadoop/hadoop-2.0.0-mr1-cdh4.1.2_mesos.patch | 22 -
...adoop-2.0.0-mr1-cdh4.2.1_hadoop-env.sh.patch | 14 -
hadoop/hadoop-2.0.0-mr1-cdh4.2.1_mesos.patch | 22 -
hadoop/hadoop-7698-1.patch | 27 -
hadoop/hadoop-gridmix.patch | 17 -
hadoop/mapred-site.xml.patch | 54 --
hadoop/mesos-executor | 3 -
hadoop/mesos/build.xml | 28 -
hadoop/mesos/ivy.xml | 42 -
hadoop/mesos/ivy/libraries.properties | 5 -
.../org/apache/hadoop/mapred/MesosExecutor.java | 145 ----
.../apache/hadoop/mapred/MesosScheduler.java | 848 -------------------
hadoop/pom.xml | 110 +++
.../org/apache/hadoop/mapred/MesosExecutor.java | 145 ++++
.../apache/hadoop/mapred/MesosScheduler.java | 845 ++++++++++++++++++
27 files changed, 1154 insertions(+), 2246 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/HadoopPipes.cc.patch
----------------------------------------------------------------------
diff --git a/hadoop/HadoopPipes.cc.patch b/hadoop/HadoopPipes.cc.patch
deleted file mode 100644
index e0ca33a..0000000
--- a/hadoop/HadoopPipes.cc.patch
+++ /dev/null
@@ -1,10 +0,0 @@
---- hadoop-2.0.0-mr1-cdh4.2.1/src/c++/pipes/impl/HadoopPipes.cc.old 2013-04-16 20:18:22.681061322 +0000
-+++ hadoop-2.0.0-mr1-cdh4.2.1/src/c++/pipes/impl/HadoopPipes.cc 2013-04-16 20:18:44.005060961 +0000
-@@ -34,6 +34,7 @@
- #include <pthread.h>
- #include <iostream>
- #include <fstream>
-+#include <unistd.h>
-
- #include <openssl/hmac.h>
- #include <openssl/buffer.h>
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/Makefile.am
----------------------------------------------------------------------
diff --git a/hadoop/Makefile.am b/hadoop/Makefile.am
deleted file mode 100644
index 2702924..0000000
--- a/hadoop/Makefile.am
+++ /dev/null
@@ -1,139 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License
-
-EXTRA_DIST = TUTORIAL.sh hadoop-gridmix.patch \
- hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch \
- hadoop-0.20.2-cdh3u3_mesos.patch \
- hadoop-0.20.2-cdh3u5_hadoop-env.sh.patch \
- hadoop-0.20.2-cdh3u5_mesos.patch \
- hadoop-2.0.0-mr1-cdh4.1.2_hadoop-env.sh.patch \
- hadoop-2.0.0-mr1-cdh4.1.2_mesos.patch \
- hadoop-2.0.0-mr1-cdh4.2.1_hadoop-env.sh.patch \
- hadoop-2.0.0-mr1-cdh4.2.1_mesos.patch \
- hadoop-7698-1.patch \
- hadoop-0.20.205.0_hadoop-env.sh.patch hadoop-0.20.205.0_mesos.patch \
- HadoopPipes.cc.patch \
- mapred-site.xml.patch mesos-executor mesos/build.xml \
- mesos/ivy/libraries.properties mesos/ivy.xml \
- mesos/src/java/org/apache/hadoop/mapred/MesosExecutor.java \
- mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
-
-# Defines some targets to run the Hadoop tutorial using a specified
-# distribution. At some point we might want to do this automagically
-# (i.e., as part of 'make check'). Note that we set the environment
-# variable TMOUT to 1 so that each prompt in the tutorial will return
-# after 1 second so no interaction from the user is required.
-hadoop-0.20.205.0:
- if test "$(top_srcdir)" != "$(top_builddir)"; then \
- cp -p $(srcdir)/TUTORIAL.sh .; \
- cp -p $(srcdir)/hadoop-gridmix.patch .; \
- cp -p $(srcdir)/hadoop-7698-1.patch .; \
- cp -p $(srcdir)/hadoop-0.20.205.0_hadoop-env.sh.patch .; \
- cp -p $(srcdir)/hadoop-0.20.205.0_mesos.patch .; \
- cp -p $(srcdir)/mapred-site.xml.patch .; \
- cp -rp $(srcdir)/mesos .; \
- cp -p $(srcdir)/mesos-executor .; \
- fi
- rm -rf hadoop-0.20.205.0
- @TMOUT=1 JAVA_HOME=$(JAVA_HOME) ./TUTORIAL.sh
-
-hadoop-0.20.2-cdh3u3:
- if test "$(top_srcdir)" != "$(top_builddir)"; then \
- cp -p $(srcdir)/TUTORIAL.sh .; \
- cp -p $(srcdir)/hadoop-gridmix.patch .; \
- cp -p $(srcdir)/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch .; \
- cp -p $(srcdir)/hadoop-0.20.2-cdh3u3_mesos.patch .; \
- cp -p $(srcdir)/mapred-site.xml.patch .; \
- cp -rp $(srcdir)/mesos .; \
- cp -p $(srcdir)/mesos-executor .; \
- fi
- rm -rf hadoop-0.20.2-cdh3u3
- @TMOUT=1 JAVA_HOME=$(JAVA_HOME) ./TUTORIAL.sh 0.20.2-cdh3u3
-
-hadoop-0.20.2-cdh3u5:
- if test "$(top_srcdir)" != "$(top_builddir)"; then \
- cp -p $(srcdir)/TUTORIAL.sh .; \
- cp -p $(srcdir)/hadoop-gridmix.patch .; \
- cp -p $(srcdir)/hadoop-0.20.2-cdh3u5_hadoop-env.sh.patch .; \
- cp -p $(srcdir)/hadoop-0.20.2-cdh3u5_mesos.patch .; \
- cp -p $(srcdir)/mapred-site.xml.patch .; \
- cp -rp $(srcdir)/mesos .; \
- cp -p $(srcdir)/mesos-executor .; \
- fi
- rm -rf hadoop-0.20.2-cdh3u5
- @TMOUT=1 JAVA_HOME=$(JAVA_HOME) ./TUTORIAL.sh 0.20.2-cdh3u5
-
-hadoop-2.0.0-mr1-cdh4.1.2:
- if test "$(top_srcdir)" != "$(top_builddir)"; then \
- cp -p $(srcdir)/TUTORIAL.sh .; \
- cp -p $(srcdir)/hadoop-gridmix.patch .; \
- cp -p $(srcdir)/hadoop-2.0.0-mr1-cdh4.1.2_hadoop-env.sh.patch .; \
- cp -p $(srcdir)/hadoop-2.0.0-mr1-cdh4.1.2_mesos.patch .; \
- cp -p $(srcdir)/mapred-site.xml.patch .; \
- cp -p $(srcdir)/HadoopPipes.cc.patch .; \
- cp -rp $(srcdir)/mesos .; \
- cp -p $(srcdir)/mesos-executor .; \
- fi
- rm -rf hadoop-2.0.0-mr1-cdh4.1.2
- @TMOUT=1 JAVA_HOME=$(JAVA_HOME) ./TUTORIAL.sh 2.0.0-mr1-cdh4.1.2
-
-hadoop-2.0.0-mr1-cdh4.2.1:
- if test "$(top_srcdir)" != "$(top_builddir)"; then \
- cp -p $(srcdir)/TUTORIAL.sh .; \
- cp -p $(srcdir)/hadoop-gridmix.patch .; \
- cp -p $(srcdir)/hadoop-2.0.0-mr1-cdh4.2.1_hadoop-env.sh.patch .; \
- cp -p $(srcdir)/hadoop-2.0.0-mr1-cdh4.2.1_mesos.patch .; \
- cp -p $(srcdir)/mapred-site.xml.patch .; \
- cp -p $(srcdir)/HadoopPipes.cc.patch .; \
- cp -rp $(srcdir)/mesos .; \
- cp -p $(srcdir)/mesos-executor .; \
- fi
- rm -rf hadoop-2.0.0-mr1-cdh4.2.1
- @TMOUT=1 JAVA_HOME=$(JAVA_HOME) ./TUTORIAL.sh 2.0.0-mr1-cdh4.2.1
-
-
-clean-local:
- if test "$(top_srcdir)" != "$(top_builddir)"; then \
- rm -f TUTORIAL.sh; \
- rm -f hadoop-gridmix.patch; \
- rm -f hadoop-7698-1.patch; \
- rm -f hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch; \
- rm -f hadoop-0.20.2-cdh3u3_mesos.patch; \
- rm -f hadoop-0.20.2-cdh3u5_hadoop-env.sh.patch; \
- rm -f hadoop-0.20.2-cdh3u5_mesos.patch; \
- rm -f hadoop-2.0.0-mr1-cdh4.1.2_hadoop-env.sh.patch; \
- rm -f hadoop-2.0.0-mr1-cdh4.1.2_mesos.patch; \
- rm -f hadoop-2.0.0-mr1-cdh4.2.1_hadoop-env.sh.patch; \
- rm -f hadoop-2.0.0-mr1-cdh4.2.1_mesos.patch; \
- rm -f hadoop-0.20.205.0_hadoop-env.sh.patch; \
- rm -f hadoop-0.20.205.0_mesos.patch; \
- rm -f mapred-site.xml.patch; \
- rm -f mesos-executor; \
- rm -rf mesos; \
- fi
- rm -rf hadoop-0.20.2-cdh3u3
- rm -f hadoop-0.20.2-cdh3u3.tar.gz
- rm -rf hadoop-0.20.2-cdh3u5
- rm -f hadoop-0.20.2-cdh3u5.tar.gz
- rm -rf hadoop-2.0.0-mr1-cdh4.1.2
- rm -f mr1-2.0.0-mr1-cdh4.1.2.tar.gz
- rm -rf hadoop-2.0.0-mr1-cdh4.2.1
- rm -f mr1-2.0.0-mr1-cdh4.2.1.tar.gz
- rm -rf hadoop-0.20.205.0
- rm -f hadoop-0.20.205.0.tar.gz
-
-
-.PHONY: hadoop-0.20.205.0 hadoop-0.20.2-cdh3u3 hadoop-0.20.2-cdh3u5 hadoop-2.0.0-mr1-cdh4.1.2 hadoop-2.0.0-mr1-cdh4.2.1
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/NOTES
----------------------------------------------------------------------
diff --git a/hadoop/NOTES b/hadoop/NOTES
deleted file mode 100644
index 1c9948c..0000000
--- a/hadoop/NOTES
+++ /dev/null
@@ -1,6 +0,0 @@
-We've patched GridMix (a contribution in Hadoop) because it was broken under Java 7.
-We can stop applying this custom patch when (if?) GridMix is patched in the Hadoop versions
-we support.
-
-We've also patched build.xml of hadoop-0.20.205.0 because its 'jsvc' target is broken.
-The patch is obtained from https://issues.apache.org/jira/browse/HADOOP-7698.
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/README.md
----------------------------------------------------------------------
diff --git a/hadoop/README.md b/hadoop/README.md
new file mode 100644
index 0000000..d5a99f6
--- /dev/null
+++ b/hadoop/README.md
@@ -0,0 +1,54 @@
+diff --git a/conf/mapred-site.xml b/conf/mapred-site.xml
+index 970c8fe..f9f272d 100644
+--- a/conf/mapred-site.xml
++++ b/conf/mapred-site.xml
+@@ -4,5 +4,48 @@
+ <!-- Put site-specific property overrides in this file. -->
+
+ <configuration>
+-
++ <property>
++ <name>mapred.job.tracker</name>
++ <value>localhost:54311</value>
++ </property>
++ <property>
++ <name>mapred.jobtracker.taskScheduler</name>
++ <value>org.apache.hadoop.mapred.MesosScheduler</value>
++ </property>
++ <property>
++ <name>mapred.mesos.taskScheduler</name>
++ <value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value>
++ </property>
++ <property>
++ <name>mapred.mesos.master</name>
++ <value>local</value>
++ </property>
++#
++# Make sure to uncomment the 'mapred.mesos.executor' property,
++# when running the Hadoop JobTracker on a real Mesos cluster.
++# NOTE: You need to MANUALLY upload the Mesos executor bundle
++# to the location that is set as the value of this property.
++# <property>
++# <name>mapred.mesos.executor</name>
++# <value>hdfs://hdfs.name.node:port/hadoop.zip</value>
++# </property>
++#
++# The properties below indicate the amount of resources
++# that are allocated to a Hadoop slot (i.e., map/reduce task) by Mesos.
++ <property>
++ <name>mapred.mesos.slot.cpus</name>
++ <value>1</value>
++ </property>
++ <property>
++ <name>mapred.mesos.slot.disk</name>
++ <!-- The value is in MB. -->
++ <value>1024</value>
++ </property>
++ <property>
++ <name>mapred.mesos.slot.mem</name>
++ <!-- Note that this is the total memory required for
++ JVM overhead (10% of this value) and the heap (-Xmx) of the task.
++ The value is in MB. -->
++ <value>1024</value>
++ </property>
+ </configuration>
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/TUTORIAL.sh
----------------------------------------------------------------------
diff --git a/hadoop/TUTORIAL.sh b/hadoop/TUTORIAL.sh
deleted file mode 100755
index f4285d5..0000000
--- a/hadoop/TUTORIAL.sh
+++ /dev/null
@@ -1,710 +0,0 @@
-#!/bin/bash
-
-# Determine the Hadoop distribution to use.
-if test -z "${1}"; then
- distribution="0.20.205.0"
- url="http://archive.apache.org/dist/hadoop/core/hadoop-0.20.205.0"
- bundle="hadoop-0.20.205.0.tar.gz"
-elif test "${1}" = "0.20.2-cdh3u3"; then
- distribution="0.20.2-cdh3u3"
- url="http://archive.cloudera.com/cdh/3"
- bundle="hadoop-0.20.2-cdh3u3.tar.gz"
-elif test "${1}" = "0.20.2-cdh3u5"; then
- distribution="0.20.2-cdh3u5"
- url="http://archive.cloudera.com/cdh/3"
- bundle="hadoop-0.20.2-cdh3u5.tar.gz"
-elif test "${1}" = "2.0.0-mr1-cdh4.1.2"; then
- distribution="2.0.0-mr1-cdh4.1.2"
- url="http://archive.cloudera.com/cdh4/cdh/4"
- bundle="mr1-2.0.0-mr1-cdh4.1.2.tar.gz"
-elif test "${1}" = "2.0.0-mr1-cdh4.2.1"; then
- distribution="2.0.0-mr1-cdh4.2.1"
- url="http://archive.cloudera.com/cdh4/cdh/4"
- bundle="mr1-2.0.0-mr1-cdh4.2.1.tar.gz"
-fi
-
-hadoop="hadoop-${distribution}"
-
-# The potentially running JobTracker, that we need to kill.
-jobtracker_pid=
-
-# Trap Ctrl-C (signal 2).
-trap 'test ! -z ${jobtracker_pid} && kill ${jobtracker_pid}; echo; exit 1' 2
-
-
-# A helper function to run one or more commands.
-# If any command fails, the tutorial exits with a helpful message.
-function run() {
- for command in "${@}"; do
- eval ${command}
-
- if test "$?" != 0; then
- cat <<__EOF__
-
-${RED}Oh no! We failed to run '${command}'. If you need help try emailing:
-
- mesos-dev@incubator.apache.org
-
-(Remember to include as much debug information as possible.)${NORMAL}
-
-__EOF__
- exit 1
- fi
- done
-}
-
-
-# A helper function to execute a step of the tutorial (i.e., one or
-# more commands). Prints the command(s) out before they are run and
-# waits for the user to confirm. In addition, the commands are
-# appended to the summary.
-summary=""
-function execute() {
- echo
- for command in "${@}"; do
- echo " $ ${command}"
- done
- echo
-
- read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
- echo
-
- for command in "${@}"; do
- run "${command}"
-
- # Append to the summary.
- summary="${summary}
-$ ${command}"
- done
-}
-
-
-# Make sure we start out in the right directory.
-cd `dirname ${0}`
-
-# Include wonderful colors for our tutorial!
-test -f ../support/colors.sh && . ../support/colors.sh
-
-# Make sure we have all the necessary files/directories we need.
-resources="TUTORIAL.sh \
- hadoop-gridmix.patch \
- ${hadoop}_hadoop-env.sh.patch \
- ${hadoop}_mesos.patch \
- mapred-site.xml.patch \
- mesos \
- mesos-executor"
-
-if test ${distribution} = "0.20.205.0"; then
- resources="${resources} hadoop-7698-1.patch"
-fi
-
-if test ${distribution} = "2.0.0-mr1-cdh4.1.2" -o ${distribution} = "2.0.0-mr1-cdh4.2.1"; then
- resources="${resources} HadoopPipes.cc.patch"
-fi
-
-for resource in `echo ${resources}`; do
- if test ! -e ${resource}; then
- cat <<__EOF__
-
-${RED}We seem to be missing ${resource} from the directory containing
-this tutorial and we can't continue without it. If you haven't
-made any modifications to this directory, please report this to:
-
- mesos-dev@incubator.apache.org
-
-(Remember to include as much debug information as possible.)${NORMAL}
-
-__EOF__
- exit 1
- fi
-done
-
-# Make sure we have all the build tools we need.
-programs="mvn \
- ant"
-
-for program in `echo ${programs}`; do
- which ${program} > /dev/null
- if test "$?" != 0; then
- cat <<__EOF__
-
-${RED}We seem to be missing ${program} from PATH. Please install
-${program} and re-run this tutorial. If you still have troubles, please report
-this to:
-
- mesos-dev@incubator.apache.org
-
-(Remember to include as much debug information as possible.)${NORMAL}
-
-__EOF__
- exit 1
- fi
-done
-
-
-# Start the tutorial!
-cat <<__EOF__
-
-Welcome to the tutorial on running Apache Hadoop on top of Mesos!
-During this ${BRIGHT}interactive${NORMAL} guide we'll ask some yes/no
-questions and you should enter your answer via 'Y' or 'y' for yes and
-'N' or 'n' for no.
-
-Let's begin!
-
-__EOF__
-
-
-# Check for JAVA_HOME.
-if test -z ${JAVA_HOME}; then
- cat <<__EOF__
-
-${RED}You probably need to set JAVA_HOME in order to run this tutorial!${NORMAL}
-
-__EOF__
- read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
- echo
-fi
-
-
-# Download Hadoop.
-if test ! -e ${bundle}; then
- cat <<__EOF__
-
-We'll try and grab ${hadoop} from ${url}/${bundle} for you now.
-
-__EOF__
- execute "wget ${url}/${bundle}"
-else
- cat <<__EOF__
-
-${RED}It looks like you've already downloaded ${bundle}, so
-we'll skip that step.${NORMAL}
-
-__EOF__
-fi
-
-
-# Extract the archive.
-if test ! -d ${hadoop}; then
- cat <<__EOF__
-
-Let's start by extracting ${bundle}.
-
-__EOF__
- execute "tar zxf ${bundle}"
-else
- cat <<__EOF__
-
-${RED}It looks like you've already extracted ${bundle}, so
-we'll skip that step.${NORMAL}
-
-__EOF__
-fi
-
-
-# Change into Hadoop directory.
-cat <<__EOF__
-
-Okay, now let's change into the ${hadoop} directory in order to apply
-some patches, copy in the Mesos specific code, and build everything.
-
-__EOF__
-
-execute "cd ${hadoop}"
-
-
-# Apply the GridMix patch.
-cat <<__EOF__
-
-To run Hadoop on Mesos under Java 7 we need to apply a rather minor patch
-(hadoop-gridmix.patch) to GridMix, a contribution in Hadoop. See 'NOTES'
-file for more info.
-
-__EOF__
-
-# Check and see if the patch has already been applied.
-grep 'private String getEnumValues' \
- src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java \
- >/dev/null
-
-if test ${?} == "0"; then
- cat <<__EOF__
-
-${RED}It looks like you've already applied the patch, so we'll skip
-applying it now.${NORMAL}
-
-__EOF__
-else
- execute "patch -p1 <../hadoop-gridmix.patch"
-fi
-
-# Apply the 'jsvc' patch for hadoop-0.20.205.0.
-if test ${distribution} = "0.20.205.0"; then
- cat <<__EOF__
-
-To build Mesos executor bundle, we need to apply a patch for
-'jsvc' target (hadoop-7698-1.patch) that is broken in build.xml.
-
-__EOF__
-
- # Check and see if the patch has already been applied.
- grep 'os-name' build.xml >/dev/null
-
- if test ${?} == "0"; then
- cat <<__EOF__
-
- ${RED}It looks like you've already applied the patch, so we'll skip
- applying it now.${NORMAL}
-
-__EOF__
- else
- execute "patch -p1 <../hadoop-7698-1.patch"
- fi
-fi
-
-# Copy over the Mesos contrib components (and mesos-executor).
-cat <<__EOF__
-
-Now, we'll copy over the Mesos contrib components.
-
-__EOF__
-
-execute "cp -r ../mesos src/contrib" \
- "cp -p ../mesos-executor bin"
-
-
-# Apply the patch to build the contrib.
-cat <<__EOF__
-
-In addition, we will need to edit ivy/libraries.properties and
-src/contrib/build.xml to hook the Mesos contrib component into the
-build. We've included a patch (${hadoop}_mesos.patch) to do that for
-you.
-
-__EOF__
-
-
-# Check and see if the patch has already been applied.
-grep mesos src/contrib/build.xml >/dev/null
-
-if test ${?} == "0"; then
- cat <<__EOF__
-
-${RED}It looks like you've already applied the patch, so we'll skip
-applying it now.${NORMAL}
-
-__EOF__
-else
- execute "patch -p1 <../${hadoop}_mesos.patch"
-fi
-
-
-# Determine MESOS_BUILD_DIR.
-cat <<__EOF__
-
-Okay, now we're ready to build and then run Hadoop! There are a couple
-important considerations. First, we need to locate the Mesos JAR and
-native library (i.e., libmesos.so on Linux and libmesos.dylib on Mac
-OS X). The Mesos JAR is used for both building and running, while the
-native library is only used for running. In addition, we need to
-locate the Protobuf JAR (if you don't already have one one your
-default classpath).
-
-This tutorial assumes you've built Mesos already. We'll use the
-environment variable MESOS_BUILD_DIR to denote this directory.
-
-__EOF__
-
-read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
-echo
-
-MESOS_BUILD_DIR=`cd ../../ && pwd`
-
-while test ! -f `echo ${MESOS_BUILD_DIR}/src/mesos-*.jar`; do
- cat <<__EOF__
-
-${RED}We couldn't automagically determine MESOS_BUILD_DIR. It doesn't
-look like you used ${MESOS_BUILD_DIR} to build Mesos. Maybe you need
-to go back and run 'make' in that directory before
-continuing?${NORMAL}
-
-__EOF__
-
- DEFAULT=${MESOS_BUILD_DIR}
- read -e -p "${BRIGHT}Where is the build directory?${NORMAL} [${DEFAULT}] "
- echo
- test -z ${REPLY} && REPLY=${DEFAULT}
- MESOS_BUILD_DIR=`cd ${REPLY} && pwd`
-done
-
-
-cat <<__EOF__
-
-Using ${BRIGHT}${MESOS_BUILD_DIR}${NORMAL} as the build directory.
-
-__EOF__
-
-LIBRARY=${MESOS_BUILD_DIR}/src/.libs/libmesos.so
-
-if test ! -f ${LIBRARY}; then
- LIBRARY=${MESOS_BUILD_DIR}/src/.libs/libmesos.dylib
-fi
-
-if test ! -f ${LIBRARY}; then
- cat <<__EOF__
-
-${RED}We seem to be having trouble locating the native library (it's
-not at ${MESOS_BUILD_DIR}/src/.libs/libmesos.so or
-${MESOS_BUILD_DIR}/src/.libs/libmesos.dylib).
-
-Have you already built Mesos? If you have, please report this to:
-
- mesos-dev@incubator.apache.org
-
-(Remember to include as much debug information as possible.)${NORMAL}
-
-__EOF__
- exit 1
-fi
-
-# Determine the "platform name" to copy the native library.
-cat <<__EOF__ >PlatformName.java
-public class PlatformName {
- public static void main(String[] args) {
- System.out.println(System.getProperty("os.name") + "-" +
- System.getProperty("os.arch") + "-" +
- System.getProperty("sun.arch.data.model"));
- System.exit(0);
- }
-}
-__EOF__
-
-run "${JAVA_HOME}/bin/javac PlatformName.java"
-
-PLATFORM=`${JAVA_HOME}/bin/java -Xmx32m PlatformName | sed -e "s/ /_/g"`
-
-run "rm PlatformName.*"
-
-
-# Copy over libraries.
-MESOS_JAR=`echo ${MESOS_BUILD_DIR}/src/mesos-*.jar`
-PROTOBUF_JAR=`echo ${MESOS_BUILD_DIR}/protobuf-*.jar`
-
-cat <<__EOF__
-
-Now we'll copy over the necessary libraries we need from the build
-directory.
-
-__EOF__
-
-execute "cp ${PROTOBUF_JAR} lib" \
- "cp ${MESOS_JAR} lib" \
- "mkdir -p lib/native/${PLATFORM}" \
- "cp ${LIBRARY} lib/native/${PLATFORM}"
-
-if test ${distribution} = "0.20.205.0"; then
- cat <<__EOF__
-
-The Apache Hadoop distribution requires that we also copy some
-libraries to multiple places. :/
-
-__EOF__
-
- execute "cp ${PROTOBUF_JAR} share/hadoop/lib" \
- "cp ${MESOS_JAR} share/hadoop/lib" \
- "cp ${LIBRARY} lib"
-fi
-
-
-# Apply conf/mapred-site.xml patch.
-cat <<__EOF__
-
-First we need to configure Hadoop appropriately by modifying
-conf/mapred-site.xml (as is always required when running Hadoop).
-In order to run Hadoop on Mesos we need to set at least these four
-properties:
-
- mapred.job.tracker
-
- mapred.jobtracker.taskScheduler
-
- mapred.mesos.master
-
- mapred.mesos.executor
-
-The 'mapred.job.tracker' property should be set to the host:port where
-you want to launch the JobTracker (e.g., localhost:54321).
-
-The 'mapred.jobtracker.taskScheduler' property must be set to
-'org.apache.hadoop.mapred.MesosScheduler'.
-
-If you've alredy got a Mesos master running you can use that for
-'mapred.mesos.master', but for this tutorial well just use 'local' in
-order to bring up a Mesos "cluster" within the process. To connect to
-a remote master simply use the URL used to connect the slave to the
-master (e.g., localhost:5050).
-
-The 'mapred.mesos.executor' property must be set to the location
-of Mesos executor bundle so that Mesos slaves can download
-and run the executor.
-NOTE: You need to MANUALLY upload the Mesos executor bundle to
-the above location.
-
-
-We've got a prepared patch (mapred-site.xml.patch) for
-conf/mapred-site.xml that makes the changes necessary
-to get everything running with a local Mesos cluster.
-
-__EOF__
-
-read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
-echo
-
-patch --dry-run --silent --force -p1 \
- <../mapred-site.xml.patch 1>/dev/null 2>&1
-
-if test ${?} == "1"; then
- cat <<__EOF__
-
-${RED}It looks like conf/mapred-site.xml has been modified. You'll
-need to copy that to something else and restore the file to it's
-original contents before we'll be able to apply this patch.${NORMAL}
-
-__EOF__
- DEFAULT="N"
-else
- DEFAULT="Y"
-fi
-
-read -e -p "${BRIGHT}Patch conf/mapred-site.xml?${NORMAL} [${DEFAULT}] "
-echo
-test -z ${REPLY} && REPLY=${DEFAULT}
-if test ${REPLY} == "Y" -o ${REPLY} == "y"; then
- execute "patch -p1 <../mapred-site.xml.patch"
-fi
-
-
-
-# Apply conf/hadoop-env.sh patch.
-cat <<__EOF__
-
-Most users will need to set JAVA_HOME in conf/hadoop-env.sh, but we'll
-also need to set MESOS_NATIVE_LIBRARY and update the HADOOP_CLASSPATH
-to include the Mesos contrib classfiles. We've prepared a patch
-(${hadoop}_hadoop-env.sh.patch) for conf/hadoop-env.sh that makes the
-necessary changes.
-
-__EOF__
-
-read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
-echo
-
-patch --dry-run --silent --force -p1 \
- <../${hadoop}_hadoop-env.sh.patch 1>/dev/null 2>&1
-
-if test ${?} == "1"; then
- cat <<__EOF__
-
-${RED}It looks like conf/hadoop-env.sh has been modified. You'll need
-to copy that to something else and restore the file to it's original
-contents before we'll be able to apply this patch.${NORMAL}
-
-__EOF__
- DEFAULT="N"
-else
- DEFAULT="Y"
-fi
-
-read -e -p "${BRIGHT}Patch conf/hadoop-env.sh?${NORMAL} [${DEFAULT}] "
-echo
-test -z ${REPLY} && REPLY=${DEFAULT}
-if test ${REPLY} == "Y" -o ${REPLY} == "y"; then
- execute "patch -p1 <../${hadoop}_hadoop-env.sh.patch"
-fi
-
-if test ${distribution} = "2.0.0-mr1-cdh4.1.2" -o ${distribution} = "2.0.0-mr1-cdh4.2.1"; then
- # Apply HadoopPipes.cc patch.
- cat <<__EOF__
-
- This version of Hadoop needs to be patched to build on GCC 4.7 and newer compilers.
-
-__EOF__
-
- read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
- echo
-
- patch --dry-run --silent --force -p1 \
- <../HadoopPipes.cc.patch 1>/dev/null 2>&1
-
- if test ${?} == "1"; then
- cat <<__EOF__
-
- ${RED}It looks like conf/hadoop-env.sh has been modified. You'll need
- to copy that to something else and restore the file to it's original
- contents before we'll be able to apply this patch.${NORMAL}
-
-__EOF__
- DEFAULT="N"
- else
- DEFAULT="Y"
- fi
-
- read -e -p "${BRIGHT}Patch src/c++/pipes/impl/HadoopPipes.cc?${NORMAL} [${DEFAULT}] "
- echo
- test -z ${REPLY} && REPLY=${DEFAULT}
- if test ${REPLY} == "Y" -o ${REPLY} == "y"; then
- execute "patch -p1 <../HadoopPipes.cc.patch"
- fi
-fi
-
-# Build Hadoop and Mesos executor package that Mesos slaves can download
-# and execute.
-# TODO(vinod): Create a new ant target in build.xml that builds the executor.
-# NOTE: We specifically set the version when calling ant, to ensure we know
-# the resulting directory name.
-cat <<__EOF__
-
-Okay, let's try building Hadoop.
-
-__EOF__
-
-read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
-echo
-
-if test ${distribution} = "2.0.0-mr1-cdh4.1.2" -o ${distribution} = "2.0.0-mr1-cdh4.2.1"; then
- cat <<__EOF__
-
- We need to chmod +x install-sh scripts to compile
- C++ components needed by the 'bin-package' target in CDH4.
- We also need to specifically set the 'reactor.repo' property.
-
-__EOF__
- execute "find . -name "*install-sh*" | xargs chmod +x" \
- "ant -Dreactor.repo=file://$HOME/.m2/repository \
--Dversion=${distribution} -Dcompile.c++=true compile bin-package"
-else
- execute "ant -Dversion=${distribution} compile bin-package"
-fi
-
-cat <<__EOF__
-
-To build the Mesos executor package, we first copy the
-necessary Mesos libraries.
-
-__EOF__
-
-# Copy the Mesos native library.
-execute "cd build/${hadoop}" \
- "mkdir -p lib/native/${PLATFORM}" \
- "cp ${LIBRARY} lib/native/${PLATFORM}"
-
-if test ${distribution} != "0.20.205.0"; then
- cat <<__EOF__
-
- We will remove Cloudera patches from the Mesos executor package
- to save space (~62MB).
-
-__EOF__
- execute "rm -rf cloudera"
-fi
-
-cat <<__EOF__
-
- Finally, we will build the Mesos executor package as follows:
-
-__EOF__
-
-# We re-name the directory to 'hadoop' so that the Mesos executor
-# can be agnostic to the Hadoop version.
-execute "cd .." \
- "mv ${hadoop} ${hadoop}-mesos" \
- "tar czf ${hadoop}-mesos.tar.gz ${hadoop}-mesos/"
-
-# Start JobTracker.
-cat <<__EOF__
-
-${GREEN}Build success!${NORMAL}
-
-The Mesos distribution is now built in '${hadoop}-mesos'
-
-Now let's run something!
-
-We'll try and start the JobTracker from the Mesos distribution path via:
- $ cd ${hadoop}-mesos
- $ ./bin/hadoop jobtracker
-
-__EOF__
-
-read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
-echo
-
-
-# Fake the resources for this local slave, because the default resources
-# (esp. memory on MacOSX) offered by the slave might not be enough to
-# launch TaskTrackers.
-# TODO(vinod): Pipe these commands through 'execute()' so that they
-# can be appended to the summary.
-cd ${hadoop}-mesos
-export MESOS_RESOURCES="cpus:16;mem:16384;disk:307200;ports:[31000-32000]"
-./bin/hadoop jobtracker 1>/dev/null 2>&1 &
-
-jobtracker_pid=${!}
-
-cat <<__EOF__
-
-JobTracker started at ${BRIGHT}${jobtracker_pid}${NORMAL}.
-
-__EOF__
-
-echo -n "Waiting 5 seconds for it to start."
-
-for i in 1 2 3 4 5; do sleep 1 && echo -n " ."; done
-
-# Now let's run an example.
-cat <<__EOF__
-
-Alright, now let's run the "wordcount" example via:
-
- $ ./bin/hadoop jar hadoop-examples-${distribution}.jar wordcount \
- ${MESOS_BUILD_DIR}/src/mesos out
-
-__EOF__
-
-read -e -p "${BRIGHT}Hit enter to continue.${NORMAL} "
-echo
-
-rm -rf out # TODO(benh): Ask about removing this first.
-
-./bin/hadoop jar hadoop-examples-${distribution}.jar wordcount \
- ${MESOS_BUILD_DIR}/src/mesos out
-
-
-if test ${?} == "0"; then
- cat <<__EOF__
-
-${GREEN}Success!${NORMAL} We'll kill the JobTracker and exit.
-
-Summary:
-${summary}
-
-Remember you'll need to make some changes to
-${hadoop}/conf/mapred-site.xml to run Hadoop on a
-real Mesos cluster:
-
-We hope you found this was helpful!
-
-__EOF__
- kill ${jobtracker_pid}
-else
- cat <<__EOF__
-
-${RED}Oh no, it failed! Try running the JobTracker and wordcount
-example manually ... it might be an issue with your environment that
-this tutorial didn't cover (if you find this to be the case, please
-create a JIRA for us and/or send us a code review).${NORMAL}
-
-__EOF__
- kill ${jobtracker_pid}
- exit 1
-fi
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch b/hadoop/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch
deleted file mode 100644
index 88c0754..0000000
--- a/hadoop/hadoop-0.20.2-cdh3u3_hadoop-env.sh.patch
+++ /dev/null
@@ -1,14 +0,0 @@
-diff --git a/conf/hadoop-env.sh b/conf/hadoop-env.sh
-index ada5bef..76aaf48 100644
---- a/conf/hadoop-env.sh
-+++ b/conf/hadoop-env.sh
-@@ -9,7 +9,8 @@
- # export JAVA_HOME=/usr/lib/j2sdk1.6-sun
-
- # Extra Java CLASSPATH elements. Optional.
--# export HADOOP_CLASSPATH="<extra_entries>:$HADOOP_CLASSPATH"
-+MESOS_CLASSPATH=${HADOOP_HOME}/contrib/mesos/hadoop-mesos-0.20.2-cdh3u3.jar:${HADOOP_HOME}/build/contrib/mesos/hadoop-mesos-0.20.2-cdh3u3.jar
-+export HADOOP_CLASSPATH=${MESOS_CLASSPATH}:${HADOOP_CLASSPATH}
-
- # The maximum amount of heap to use, in MB. Default is 1000.
- # export HADOOP_HEAPSIZE=2000
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-0.20.2-cdh3u3_mesos.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-0.20.2-cdh3u3_mesos.patch b/hadoop/hadoop-0.20.2-cdh3u3_mesos.patch
deleted file mode 100644
index 9284788..0000000
--- a/hadoop/hadoop-0.20.2-cdh3u3_mesos.patch
+++ /dev/null
@@ -1,38 +0,0 @@
-diff --git a/ivy/libraries.properties b/ivy/libraries.properties
-index 0b3c715..6b7770b 100644
---- a/ivy/libraries.properties
-+++ b/ivy/libraries.properties
-@@ -75,3 +75,5 @@ slf4j-log4j12.version=1.4.3
- wagon-http.version=1.0-beta-2
- xmlenc.version=0.52
- xerces.version=1.4.4
-+
-+protobuf-java.version=2.4.1
-diff --git a/src/contrib/build.xml b/src/contrib/build.xml
-index e41c132..593aecd 100644
---- a/src/contrib/build.xml
-+++ b/src/contrib/build.xml
-@@ -57,6 +57,7 @@
- <fileset dir="." includes="capacity-scheduler/build.xml"/>
- <fileset dir="." includes="mrunit/build.xml"/>
- <fileset dir="." includes="gridmix/build.xml"/>
-+ <fileset dir="." includes="mesos/build.xml"/>
- </subant>
- <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
- <fail if="testsfailed">Tests failed!</fail>
-diff --git a/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java b/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
-index 545c3c7..f6950be 100644
---- a/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
-+++ b/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
-@@ -257,6 +257,11 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
- taskScheduler.refresh();
- }
-
-+ @Override
-+ public synchronized void checkJobSubmission(JobInProgress job) throws IOException {
-+ taskScheduler.checkJobSubmission(job);
-+ }
-+
- // Mesos Scheduler methods.
- @Override
- public synchronized void registered(
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-0.20.2-cdh3u5_hadoop-env.sh.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-0.20.2-cdh3u5_hadoop-env.sh.patch b/hadoop/hadoop-0.20.2-cdh3u5_hadoop-env.sh.patch
deleted file mode 100644
index 38eca6c..0000000
--- a/hadoop/hadoop-0.20.2-cdh3u5_hadoop-env.sh.patch
+++ /dev/null
@@ -1,14 +0,0 @@
-diff --git a/conf/hadoop-env.sh b/conf/hadoop-env.sh
-index ada5bef..76aaf48 100644
---- a/conf/hadoop-env.sh
-+++ b/conf/hadoop-env.sh
-@@ -9,7 +9,8 @@
- # export JAVA_HOME=/usr/lib/j2sdk1.6-sun
-
- # Extra Java CLASSPATH elements. Optional.
--# export HADOOP_CLASSPATH="<extra_entries>:$HADOOP_CLASSPATH"
-+MESOS_CLASSPATH=${HADOOP_HOME}/contrib/mesos/hadoop-mesos-0.20.2-cdh3u5.jar:${HADOOP_HOME}/build/contrib/mesos/hadoop-mesos-0.20.2-cdh3u5.jar
-+export HADOOP_CLASSPATH=${MESOS_CLASSPATH}:${HADOOP_CLASSPATH}
-
- # The maximum amount of heap to use, in MB. Default is 1000.
- # export HADOOP_HEAPSIZE=2000
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-0.20.2-cdh3u5_mesos.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-0.20.2-cdh3u5_mesos.patch b/hadoop/hadoop-0.20.2-cdh3u5_mesos.patch
deleted file mode 100644
index 9284788..0000000
--- a/hadoop/hadoop-0.20.2-cdh3u5_mesos.patch
+++ /dev/null
@@ -1,38 +0,0 @@
-diff --git a/ivy/libraries.properties b/ivy/libraries.properties
-index 0b3c715..6b7770b 100644
---- a/ivy/libraries.properties
-+++ b/ivy/libraries.properties
-@@ -75,3 +75,5 @@ slf4j-log4j12.version=1.4.3
- wagon-http.version=1.0-beta-2
- xmlenc.version=0.52
- xerces.version=1.4.4
-+
-+protobuf-java.version=2.4.1
-diff --git a/src/contrib/build.xml b/src/contrib/build.xml
-index e41c132..593aecd 100644
---- a/src/contrib/build.xml
-+++ b/src/contrib/build.xml
-@@ -57,6 +57,7 @@
- <fileset dir="." includes="capacity-scheduler/build.xml"/>
- <fileset dir="." includes="mrunit/build.xml"/>
- <fileset dir="." includes="gridmix/build.xml"/>
-+ <fileset dir="." includes="mesos/build.xml"/>
- </subant>
- <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
- <fail if="testsfailed">Tests failed!</fail>
-diff --git a/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java b/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
-index 545c3c7..f6950be 100644
---- a/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
-+++ b/src/contrib/mesos/src/java/org/apache/hadoop/mapred/MesosScheduler.java
-@@ -257,6 +257,11 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
- taskScheduler.refresh();
- }
-
-+ @Override
-+ public synchronized void checkJobSubmission(JobInProgress job) throws IOException {
-+ taskScheduler.checkJobSubmission(job);
-+ }
-+
- // Mesos Scheduler methods.
- @Override
- public synchronized void registered(
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch b/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch
deleted file mode 100644
index 066f715..0000000
--- a/hadoop/hadoop-0.20.205.0_hadoop-env.sh.patch
+++ /dev/null
@@ -1,14 +0,0 @@
-diff --git a/conf/hadoop-env.sh b/conf/hadoop-env.sh
-index ada5bef..76aaf48 100644
---- a/conf/hadoop-env.sh
-+++ b/conf/hadoop-env.sh
-@@ -9,7 +9,8 @@
- # export JAVA_HOME=/usr/lib/j2sdk1.5-sun
-
- # Extra Java CLASSPATH elements. Optional.
--# export HADOOP_CLASSPATH=
-+MESOS_CLASSPATH=${HADOOP_HOME}/contrib/mesos/hadoop-mesos-0.20.205.0.jar:${HADOOP_HOME}/build/contrib/mesos/hadoop-mesos-0.20.205.0.jar
-+export HADOOP_CLASSPATH=${MESOS_CLASSPATH}:${HADOOP_CLASSPATH}
-
- # The maximum amount of heap to use, in MB. Default is 1000.
- # export HADOOP_HEAPSIZE=2000
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-0.20.205.0_mesos.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-0.20.205.0_mesos.patch b/hadoop/hadoop-0.20.205.0_mesos.patch
deleted file mode 100644
index 35fe3ba..0000000
--- a/hadoop/hadoop-0.20.205.0_mesos.patch
+++ /dev/null
@@ -1,22 +0,0 @@
-diff --git a/ivy/libraries.properties b/ivy/libraries.properties
-index b47b4c3..713f0c1 100644
---- a/ivy/libraries.properties
-+++ b/ivy/libraries.properties
-@@ -86,3 +86,5 @@ slf4j-log4j12.version=1.4.3
- wagon-http.version=1.0-beta-2
- xmlenc.version=0.52
- xerces.version=1.4.4
-+
-+protobuf-java.version=2.4.1
-diff --git a/src/contrib/build.xml b/src/contrib/build.xml
-index 3c19e25..ecb7198 100644
---- a/src/contrib/build.xml
-+++ b/src/contrib/build.xml
-@@ -55,6 +55,7 @@
- <fileset dir="." includes="fairscheduler/build.xml"/>
- <fileset dir="." includes="capacity-scheduler/build.xml"/>
- <fileset dir="." includes="gridmix/build.xml"/>
-+ <fileset dir="." includes="mesos/build.xml"/>
- </subant>
- <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
- <fail if="testsfailed">Tests failed!</fail>
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-2.0.0-mr1-cdh4.1.2_hadoop-env.sh.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-2.0.0-mr1-cdh4.1.2_hadoop-env.sh.patch b/hadoop/hadoop-2.0.0-mr1-cdh4.1.2_hadoop-env.sh.patch
deleted file mode 100644
index 9dfe4a3..0000000
--- a/hadoop/hadoop-2.0.0-mr1-cdh4.1.2_hadoop-env.sh.patch
+++ /dev/null
@@ -1,14 +0,0 @@
-diff --git a/conf/hadoop-env.sh b/conf/hadoop-env.sh
-index ada5bef..76aaf48 100644
---- a/conf/hadoop-env.sh
-+++ b/conf/hadoop-env.sh
-@@ -9,7 +9,8 @@
- # export JAVA_HOME=/usr/lib/j2sdk1.6-sun
-
- # Extra Java CLASSPATH elements. Optional.
--# export HADOOP_CLASSPATH="<extra_entries>:$HADOOP_CLASSPATH"
-+MESOS_CLASSPATH=${HADOOP_HOME}/contrib/mesos/hadoop-mesos-2.0.0-mr1-cdh4.1.2.jar:${HADOOP_HOME}/build/contrib/mesos/hadoop-mesos-2.0.0-mr1-cdh4.1.2.jar
-+export HADOOP_CLASSPATH=${MESOS_CLASSPATH}:${HADOOP_CLASSPATH}
-
- # The maximum amount of heap to use, in MB. Default is 1000.
- # export HADOOP_HEAPSIZE=2000
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-2.0.0-mr1-cdh4.1.2_mesos.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-2.0.0-mr1-cdh4.1.2_mesos.patch b/hadoop/hadoop-2.0.0-mr1-cdh4.1.2_mesos.patch
deleted file mode 100644
index 2fc254f..0000000
--- a/hadoop/hadoop-2.0.0-mr1-cdh4.1.2_mesos.patch
+++ /dev/null
@@ -1,22 +0,0 @@
-diff --git a/ivy/libraries.properties b/ivy/libraries.properties
-index b47b4c3..713f0c1 100644
---- a/ivy/libraries.properties
-+++ b/ivy/libraries.properties
-@@ -86,3 +86,5 @@ slf4j-log4j12.version=1.4.3
- wagon-http.version=1.0-beta-2
- xmlenc.version=0.52
- xerces.version=1.4.4
-+
-+protobuf-java.version=2.4.1
-diff --git a/src/contrib/build.xml b/src/contrib/build.xml
-index 3c19e25..ecb7198 100644
---- a/src/contrib/build.xml
-+++ b/src/contrib/build.xml
-@@ -54,6 +54,7 @@
- <fileset dir="." includes="fairscheduler/build.xml"/>
- <fileset dir="." includes="capacity-scheduler/build.xml"/>
- <!-- <fileset dir="." includes="gridmix/build.xml"/> -->
-+ <fileset dir="." includes="mesos/build.xml"/>
- </subant>
- <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
- <fail if="testsfailed">Tests failed!</fail>
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-2.0.0-mr1-cdh4.2.1_hadoop-env.sh.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-2.0.0-mr1-cdh4.2.1_hadoop-env.sh.patch b/hadoop/hadoop-2.0.0-mr1-cdh4.2.1_hadoop-env.sh.patch
deleted file mode 100644
index d57c1e7..0000000
--- a/hadoop/hadoop-2.0.0-mr1-cdh4.2.1_hadoop-env.sh.patch
+++ /dev/null
@@ -1,14 +0,0 @@
-diff --git a/conf/hadoop-env.sh b/conf/hadoop-env.sh
-index ada5bef..76aaf48 100644
---- a/conf/hadoop-env.sh
-+++ b/conf/hadoop-env.sh
-@@ -9,7 +9,8 @@
- # export JAVA_HOME=/usr/lib/j2sdk1.6-sun
-
- # Extra Java CLASSPATH elements. Optional.
--# export HADOOP_CLASSPATH="<extra_entries>:$HADOOP_CLASSPATH"
-+MESOS_CLASSPATH=${HADOOP_HOME}/contrib/mesos/hadoop-mesos-2.0.0-mr1-cdh4.2.1.jar:${HADOOP_HOME}/build/contrib/mesos/hadoop-mesos-2.0.0-mr1-cdh4.2.1.jar
-+export HADOOP_CLASSPATH=${MESOS_CLASSPATH}:${HADOOP_CLASSPATH}
-
- # The maximum amount of heap to use, in MB. Default is 1000.
- # export HADOOP_HEAPSIZE=2000
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-2.0.0-mr1-cdh4.2.1_mesos.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-2.0.0-mr1-cdh4.2.1_mesos.patch b/hadoop/hadoop-2.0.0-mr1-cdh4.2.1_mesos.patch
deleted file mode 100644
index 8a39444..0000000
--- a/hadoop/hadoop-2.0.0-mr1-cdh4.2.1_mesos.patch
+++ /dev/null
@@ -1,22 +0,0 @@
-diff --git a/ivy/libraries.properties b/ivy/libraries.properties
-index b47b4c3..713f0c1 100644
---- a/ivy/libraries.properties
-+++ b/ivy/libraries.properties
-@@ -86,3 +86,5 @@ slf4j-log4j12.version=1.4.3
- xerces.version=1.4.4
-
- zookeeper.version=3.4.2
-+
-+protobuf-java.version=2.4.1
-diff --git a/src/contrib/build.xml b/src/contrib/build.xml
-index 3c19e25..ecb7198 100644
---- a/src/contrib/build.xml
-+++ b/src/contrib/build.xml
-@@ -54,6 +54,7 @@
- <fileset dir="." includes="fairscheduler/build.xml"/>
- <fileset dir="." includes="capacity-scheduler/build.xml"/>
- <!-- <fileset dir="." includes="gridmix/build.xml"/> -->
-+ <fileset dir="." includes="mesos/build.xml"/>
- </subant>
- <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
- <fail if="testsfailed">Tests failed!</fail>
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-7698-1.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-7698-1.patch b/hadoop/hadoop-7698-1.patch
deleted file mode 100644
index 0ab424b..0000000
--- a/hadoop/hadoop-7698-1.patch
+++ /dev/null
@@ -1,27 +0,0 @@
-diff --git a/build.xml b/build.xml
-index a5ba0f5..cc8a606 100644
---- a/build.xml (revision 1177084)
-+++ b/build.xml (working copy)
-@@ -170,6 +170,13 @@
-
- <property name="jsvc.build.dir" value="${build.dir}/jsvc.${os.arch}" />
- <property name="jsvc.install.dir" value="${dist.dir}/libexec" />
-+ <exec executable="sh" outputproperty="os-name">
-+ <arg value="-c" />
-+ <arg value="uname -s | tr '[:upper:]' '[:lower:]'" />
-+ </exec>
-+ <condition property="os-arch" value="universal">
-+ <equals arg1="darwin" arg2="${os-name}" />
-+ </condition>
- <condition property="os-arch" value="x86_64">
- <and>
- <os arch="amd64" />
-@@ -183,7 +190,7 @@
- <os arch="i686" />
- </or>
- </condition>
-- <property name="jsvc.location" value="http://archive.apache.org/dist/commons/daemon/binaries/1.0.2/linux/commons-daemon-1.0.2-bin-linux-${os-arch}.tar.gz" />
-+ <property name="jsvc.location" value="http://archive.apache.org/dist/commons/daemon/binaries/1.0.2/${os-name}/commons-daemon-1.0.2-bin-${os-name}-${os-arch}.tar.gz" />
- <property name="jsvc.dest.name" value="jsvc.${os.arch}.tar.gz" />
-
- <!-- task-controller properties set here -->
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/hadoop-gridmix.patch
----------------------------------------------------------------------
diff --git a/hadoop/hadoop-gridmix.patch b/hadoop/hadoop-gridmix.patch
deleted file mode 100644
index 92b55e0..0000000
--- a/hadoop/hadoop-gridmix.patch
+++ /dev/null
@@ -1,17 +0,0 @@
-diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
-index a5ba0f5..cc8a606 100644
---- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
-+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
-@@ -609,10 +609,10 @@
- }
- }
-
-- private <T> String getEnumValues(Enum<? extends T>[] e) {
-+ private String getEnumValues(Enum<?>[] e) {
- StringBuilder sb = new StringBuilder();
- String sep = "";
-- for (Enum<? extends T> v : e) {
-+ for (Enum<?> v : e) {
- sb.append(sep);
- sb.append(v.name());
- sep = "|";
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/mapred-site.xml.patch
----------------------------------------------------------------------
diff --git a/hadoop/mapred-site.xml.patch b/hadoop/mapred-site.xml.patch
deleted file mode 100644
index d5a99f6..0000000
--- a/hadoop/mapred-site.xml.patch
+++ /dev/null
@@ -1,54 +0,0 @@
-diff --git a/conf/mapred-site.xml b/conf/mapred-site.xml
-index 970c8fe..f9f272d 100644
---- a/conf/mapred-site.xml
-+++ b/conf/mapred-site.xml
-@@ -4,5 +4,48 @@
- <!-- Put site-specific property overrides in this file. -->
-
- <configuration>
--
-+ <property>
-+ <name>mapred.job.tracker</name>
-+ <value>localhost:54311</value>
-+ </property>
-+ <property>
-+ <name>mapred.jobtracker.taskScheduler</name>
-+ <value>org.apache.hadoop.mapred.MesosScheduler</value>
-+ </property>
-+ <property>
-+ <name>mapred.mesos.taskScheduler</name>
-+ <value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value>
-+ </property>
-+ <property>
-+ <name>mapred.mesos.master</name>
-+ <value>local</value>
-+ </property>
-+#
-+# Make sure to uncomment the 'mapred.mesos.executor' property,
-+# when running the Hadoop JobTracker on a real Mesos cluster.
-+# NOTE: You need to MANUALLY upload the Mesos executor bundle
-+# to the location that is set as the value of this property.
-+# <property>
-+# <name>mapred.mesos.executor</name>
-+# <value>hdfs://hdfs.name.node:port/hadoop.zip</value>
-+# </property>
-+#
-+# The properties below indicate the amount of resources
-+# that are allocated to a Hadoop slot (i.e., map/reduce task) by Mesos.
-+ <property>
-+ <name>mapred.mesos.slot.cpus</name>
-+ <value>1</value>
-+ </property>
-+ <property>
-+ <name>mapred.mesos.slot.disk</name>
-+ <!-- The value is in MB. -->
-+ <value>1024</value>
-+ </property>
-+ <property>
-+ <name>mapred.mesos.slot.mem</name>
-+ <!-- Note that this is the total memory required for
-+ JVM overhead (10% of this value) and the heap (-Xmx) of the task.
-+ The value is in MB. -->
-+ <value>1024</value>
-+ </property>
- </configuration>
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/mesos-executor
----------------------------------------------------------------------
diff --git a/hadoop/mesos-executor b/hadoop/mesos-executor
deleted file mode 100755
index 7902e80..0000000
--- a/hadoop/mesos-executor
+++ /dev/null
@@ -1,3 +0,0 @@
-#!/bin/sh
-
-exec `dirname ${0}`/hadoop org.apache.hadoop.mapred.MesosExecutor
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/mesos/build.xml
----------------------------------------------------------------------
diff --git a/hadoop/mesos/build.xml b/hadoop/mesos/build.xml
deleted file mode 100644
index 9edf9e2..0000000
--- a/hadoop/mesos/build.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0"?>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<!--
-Before you can run these subtargets directly, you need
-to call at top-level: ant deploy-contrib compile-core-test
--->
-<project name="mesos" default="jar">
-
- <import file="../build-contrib.xml"/>
-
-</project>
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/mesos/ivy.xml
----------------------------------------------------------------------
diff --git a/hadoop/mesos/ivy.xml b/hadoop/mesos/ivy.xml
deleted file mode 100644
index fc4fb16..0000000
--- a/hadoop/mesos/ivy.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" ?>
-<ivy-module version="1.0">
- <info organisation="org.apache.hadoop" module="${ant.project.name}">
- <license name="Apache 2.0"/>
- <ivyauthor name="Apache Hadoop Team" url="http://hadoop.apache.org"/>
- <description>
- Apache Hadoop contrib
- </description>
- </info>
- <configurations defaultconfmapping="default">
- <!--these match the Maven configurations-->
- <conf name="default" extends="master,runtime"/>
- <conf name="master" description="contains the artifact but no dependencies"/>
- <conf name="runtime" description="runtime but not the artifact" />
-
- <conf name="common" visibility="private"
- description="artifacts needed to compile/test the application"/>
- </configurations>
-
- <publications>
- <!--get the artifact from our module name-->
- <artifact conf="master"/>
- </publications>
- <dependencies>
- <dependency org="commons-logging"
- name="commons-logging"
- rev="${commons-logging.version}"
- conf="common->default"/>
- <dependency org="log4j"
- name="log4j"
- rev="${log4j.version}"
- conf="common->master"/>
- <dependency org="junit"
- name="junit"
- rev="${junit.version}"
- conf="common->default"/>
- <dependency org="com.google.protobuf"
- name="protobuf-java"
- rev="${protobuf-java.version}"
- conf="common->default"/>
- </dependencies>
-</ivy-module>
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/mesos/ivy/libraries.properties
----------------------------------------------------------------------
diff --git a/hadoop/mesos/ivy/libraries.properties b/hadoop/mesos/ivy/libraries.properties
deleted file mode 100644
index d740528..0000000
--- a/hadoop/mesos/ivy/libraries.properties
+++ /dev/null
@@ -1,5 +0,0 @@
-#This properties file lists the versions of the various artifacts used by streaming.
-#It drives ivy and the generation of a maven POM
-
-#Please list the dependencies name with version if they are different from the ones
-#listed in the global libraries.properties file (in alphabetical order)
http://git-wip-us.apache.org/repos/asf/mesos/blob/25cb6f91/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosExecutor.java b/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosExecutor.java
deleted file mode 100644
index ccf0090..0000000
--- a/hadoop/mesos/src/java/org/apache/hadoop/mapred/MesosExecutor.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.mesos.Executor;
-import org.apache.mesos.ExecutorDriver;
-import org.apache.mesos.MesosExecutorDriver;
-import org.apache.mesos.Protos.Environment.Variable;
-import org.apache.mesos.Protos.ExecutorInfo;
-import org.apache.mesos.Protos.FrameworkInfo;
-import org.apache.mesos.Protos.SlaveInfo;
-import org.apache.mesos.Protos.Status;
-import org.apache.mesos.Protos.TaskID;
-import org.apache.mesos.Protos.TaskInfo;
-import org.apache.mesos.Protos.TaskState;
-import org.apache.mesos.Protos.TaskStatus;
-
-public class MesosExecutor implements Executor {
- public static final Log LOG = LogFactory.getLog(MesosExecutor.class);
-
- private JobConf conf;
- private TaskTracker taskTracker;
-
- @Override
- public void registered(ExecutorDriver driver, ExecutorInfo executorInfo,
- FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) {
- LOG.info("Executor registered with the slave");
-
- conf = new JobConf();
-
- // Get TaskTracker's config options from environment variables set by the
- // JobTracker.
- if (executorInfo.getCommand().hasEnvironment()) {
- for (Variable variable : executorInfo.getCommand().getEnvironment()
- .getVariablesList()) {
- LOG.info("Setting config option : " + variable.getName() + " to "
- + variable.getValue());
- conf.set(variable.getName(), variable.getValue());
- }
- }
-
- // Get hostname from Mesos to make sure we match what it reports
- // to the JobTracker.
- conf.set("slave.host.name", slaveInfo.getHostname());
-
- // Set the mapred.local directory inside the executor sandbox, so that
- // different TaskTrackers on the same host do not step on each other.
- conf.set("mapred.local.dir", System.getProperty("user.dir") + "/mapred");
- }
-
- @Override
- public void launchTask(final ExecutorDriver driver, final TaskInfo task) {
- LOG.info("Launching task : " + task.getTaskId().getValue());
-
- // NOTE: We need to manually set the context class loader here because,
- // the TaskTracker is unable to find LoginModule class otherwise.
- Thread.currentThread().setContextClassLoader(
- TaskTracker.class.getClassLoader());
-
- try {
- taskTracker = new TaskTracker(conf);
- } catch (IOException e) {
- LOG.fatal("Failed to start TaskTracker", e);
- System.exit(1);
- } catch (InterruptedException e) {
- LOG.fatal("Failed to start TaskTracker", e);
- System.exit(1);
- }
-
- // Spin up a TaskTracker in a new thread.
- new Thread("TaskTracker Run Thread") {
- @Override
- public void run() {
- taskTracker.run();
-
- // Send a TASK_FINISHED status update.
- // We do this here because we want to send it in a separate thread
- // than was used to call killTask().
- driver.sendStatusUpdate(TaskStatus.newBuilder()
- .setTaskId(task.getTaskId())
- .setState(TaskState.TASK_FINISHED)
- .build());
-
- // Give some time for the update to reach the slave.
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOG.error("Failed to sleep TaskTracker thread", e);
- }
-
- // Stop the executor.
- driver.stop();
- }
- }.start();
-
- driver.sendStatusUpdate(TaskStatus.newBuilder()
- .setTaskId(task.getTaskId())
- .setState(TaskState.TASK_RUNNING).build());
- }
-
- @Override
- public void killTask(ExecutorDriver driver, TaskID taskId) {
- LOG.info("Killing task : " + taskId.getValue());
- try {
- taskTracker.shutdown();
- } catch (IOException e) {
- LOG.error("Failed to shutdown TaskTracker", e);
- } catch (InterruptedException e) {
- LOG.error("Failed to shutdown TaskTracker", e);
- }
- }
-
- @Override
- public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) {
- LOG.info("Executor reregistered with the slave");
- }
-
- @Override
- public void disconnected(ExecutorDriver driver) {
- LOG.info("Executor disconnected from the slave");
- }
-
- @Override
- public void frameworkMessage(ExecutorDriver d, byte[] msg) {
- LOG.info("Executor received framework message of length: " + msg.length
- + " bytes");
- }
-
- @Override
- public void error(ExecutorDriver d, String message) {
- LOG.error("MesosExecutor.error: " + message);
- }
-
- @Override
- public void shutdown(ExecutorDriver d) {
- LOG.info("Executor asked to shutdown");
- }
-
- public static void main(String[] args) {
- MesosExecutorDriver driver = new MesosExecutorDriver(new MesosExecutor());
- System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1);
- }
-}
[4/5] git commit: Updated README.md.
Posted by be...@apache.org.
Updated README.md.
Review: https://reviews.apache.org/r/13231
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3a7b9260
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3a7b9260
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3a7b9260
Branch: refs/heads/master
Commit: 3a7b9260bf49fbba82ef73163505ff20b18326b6
Parents: dba8c38
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Aug 1 23:40:40 2013 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Aug 2 14:28:28 2013 -0700
----------------------------------------------------------------------
hadoop/README.md | 182 +++++++++++++++++++++++++++++++++++---------------
1 file changed, 128 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3a7b9260/hadoop/README.md
----------------------------------------------------------------------
diff --git a/hadoop/README.md b/hadoop/README.md
index d5a99f6..36d7eb4 100644
--- a/hadoop/README.md
+++ b/hadoop/README.md
@@ -1,54 +1,128 @@
-diff --git a/conf/mapred-site.xml b/conf/mapred-site.xml
-index 970c8fe..f9f272d 100644
---- a/conf/mapred-site.xml
-+++ b/conf/mapred-site.xml
-@@ -4,5 +4,48 @@
- <!-- Put site-specific property overrides in this file. -->
-
- <configuration>
--
-+ <property>
-+ <name>mapred.job.tracker</name>
-+ <value>localhost:54311</value>
-+ </property>
-+ <property>
-+ <name>mapred.jobtracker.taskScheduler</name>
-+ <value>org.apache.hadoop.mapred.MesosScheduler</value>
-+ </property>
-+ <property>
-+ <name>mapred.mesos.taskScheduler</name>
-+ <value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value>
-+ </property>
-+ <property>
-+ <name>mapred.mesos.master</name>
-+ <value>local</value>
-+ </property>
-+#
-+# Make sure to uncomment the 'mapred.mesos.executor' property,
-+# when running the Hadoop JobTracker on a real Mesos cluster.
-+# NOTE: You need to MANUALLY upload the Mesos executor bundle
-+# to the location that is set as the value of this property.
-+# <property>
-+# <name>mapred.mesos.executor</name>
-+# <value>hdfs://hdfs.name.node:port/hadoop.zip</value>
-+# </property>
-+#
-+# The properties below indicate the amount of resources
-+# that are allocated to a Hadoop slot (i.e., map/reduce task) by Mesos.
-+ <property>
-+ <name>mapred.mesos.slot.cpus</name>
-+ <value>1</value>
-+ </property>
-+ <property>
-+ <name>mapred.mesos.slot.disk</name>
-+ <!-- The value is in MB. -->
-+ <value>1024</value>
-+ </property>
-+ <property>
-+ <name>mapred.mesos.slot.mem</name>
-+ <!-- Note that this is the total memory required for
-+ JVM overhead (10% of this value) and the heap (-Xmx) of the task.
-+ The value is in MB. -->
-+ <value>1024</value>
-+ </property>
- </configuration>
+Hadoop on Mesos
+---------------
+
+#### Overview ####
+
+To run _Hadoop on Mesos_ you need to add the `hadoop-mesos-0.0.1.jar`
+library to your Hadoop distribution (any distribution that supports
+`hadoop-core-1.2.0` should work) and set some new configuration
+properties. Read on for details.
+
+#### Build ####
+
+You can build `hadoop-mesos-0.0.1.jar` using Maven:
+
+```
+$ mvn package
+```
+
+If successful, the JAR will be at `target/hadoop-mesos-0.0.1.jar`.
+
+> NOTE: If you want to build against a different version of Mesos than
+> the default you'll need to update `mesos-version` in `pom.xml`.
+
+We plan to provide already built JARs at http://repository.apache.org
+in the near future!
+
+#### Package ####
+
+You'll need to download an existing Hadoop distribution. For this
+guide, we'll use [CDH4.2.1][CDH4.2.1]. First grab the tar archive and
+extract it.
+
+```
+$ wget http://archive.cloudera.com/cdh4/cdh/4/mr1-2.0.0-mr1-cdh4.2.1.tar.gz
+...
+$ tar zxf mr1-2.0.0-mr1-cdh4.2.1.tar.gz
+```
+
+> **Take note**, the extracted directory is `hadoop-2.0.0-mr1-cdh4.2.1`.
+
+Now copy `hadoop-mesos-0.0.1.jar` into the `lib` folder.
+
+```
+$ cp /path/to/hadoop-mesos-0.0.1.jar hadoop-2.0.0-mr1-cdh4.2.1/lib/
+```
+
+_That's it!_ You now have a _Hadoop on Mesos_ distribution!
+
+[CDH4.2.1]: http://www.cloudera.com/content/support/en/documentation/cdh4-documentation/cdh4-documentation-v4-2-1.html
+
+#### Upload ####
+
+You'll want to upload your _Hadoop on Mesos_ distribution somewhere
+that Mesos can access in order to launch each `TaskTracker`. For
+example, if you're already running HDFS:
+
+```
+$ tar czf hadoop-2.0.0-mr1-cdh4.2.1.tar.gz hadoop-2.0.0-mr1-cdh4.2.1
+$ hadoop fs -put hadoop-2.0.0-mr1-cdh4.2.1.tar.gz /hadoop-2.0.0-mr1-cdh4.2.1.tar.gz
+```
+
+> **Consider** any permissions issues with your uploaded location
+> (i.e., on HDFS you'll probably want to make the file world
+> readable).
+
+Now you'll need to configure your `JobTracker` to launch each
+`TaskTracker` on Mesos!
+
+#### Configure ####
+
+Along with the normal configuration properties you might want to set
+to launch a `JobTracker`, you'll need to set some Mesos specific ones
+too.
+
+Here are the mandatory configuration properties for
+`conf/mapred-site.xml` (initialized to values representative of
+running in [pseudo distributed
+operation](http://hadoop.apache.org/docs/stable/single_node_setup.html#PseudoDistributed):
+
+```
+<property>
+ <name>mapred.job.tracker</name>
+ <value>localhost:9001</value>
+</property>
+<property>
+ <name>mapred.jobtracker.taskScheduler</name>
+ <value>org.apache.hadoop.mapred.MesosScheduler</value>
+</property>
+<property>
+ <name>mapred.mesos.taskScheduler</name>
+ <value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value>
+</property>
+<property>
+ <name>mapred.mesos.master</name>
+ <value>localhost:5050</value>
+</property>
+<property>
+ <name>mapred.mesos.executor</name>
+ <value>hdfs://localhost:9000/hadoop-2.0.0-mr1-cdh4.2.1.tar.gz</value>
+</property>
+```
+
+#### Start ####
+
+Now you can start the `JobTracker` but you'll need to include the path
+to the Mesos native library.
+
+On Linux:
+
+```
+$ MESOS_NATIVE_LIBRARY=/path/to/libmesos.so hadoop jobtracker
+```
+
+And on OS X:
+
+```
+$ MESOS_NATIVE_LIBRARY=/path/to/libmesos.dylib hadoop jobtracker
+```
+
+> **NOTE: You do not need to worry about distributing your Hadoop
+> configuration! All of the configuration properties read by the**
+> `JobTracker` **along with any necessary** `TaskTracker` **specific
+> _overrides_ will get serialized and passed to each** `TaskTracker`
+> **on startup.**
+
+_Please email user@mesos.apache.org with questions!_
+
+----------
[3/5] git commit: Get Hadoop code to compile!
Posted by be...@apache.org.
Get Hadoop code to compile!
Review: https://reviews.apache.org/r/13226
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dba8c38d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dba8c38d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dba8c38d
Branch: refs/heads/master
Commit: dba8c38df8694ed2a93af2445b28238b05e69315
Parents: 25cb6f9
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Aug 1 22:02:06 2013 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Aug 2 14:25:32 2013 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/hadoop/mapred/MesosScheduler.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/dba8c38d/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java b/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
index 7a1469d..0332dea 100644
--- a/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
+++ b/hadoop/src/main/java/org/apache/hadoop/mapred/MesosScheduler.java
@@ -148,7 +148,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
+ mesosTracker.host);
driver.killTask(mesosTracker.taskId);
- tracker.timer.cancel();
+ mesosTracker.timer.cancel();
mesosTrackers.remove(tracker);
}
}
@@ -756,7 +756,7 @@ public class MesosScheduler extends TaskScheduler implements Scheduler {
for (HttpHost tracker : trackers) {
if (mesosTrackers.get(tracker).taskId.equals(taskStatus.getTaskId())) {
LOG.info("Removing terminated TaskTracker: " + tracker);
- tracker.timer.cancel();
+ mesosTrackers.get(tracker).timer.cancel();
mesosTrackers.remove(tracker);
}
}