You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:33:38 UTC

svn commit: r1131642 - /incubator/mesos/trunk/src/master.cpp

Author: benh
Date: Sun Jun  5 03:33:37 2011
New Revision: 1131642

URL: http://svn.apache.org/viewvc?rev=1131642&view=rev
Log:
Bug fixed in calls to accept message

Modified:
    incubator/mesos/trunk/src/master.cpp

Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131642&r1=1131641&r2=1131642&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun  5 03:33:37 2011
@@ -1,15 +1,11 @@
-#include "config.hpp" // Need to define first to get USING_ZOOKEEPER
-
-#include <glog/logging.h>
-
-#ifdef USING_ZOOKEEPER
-#include <zookeeper.hpp>
-#endif
-
 #include "allocator.hpp"
-#include "allocator_factory.hpp"
 #include "master.hpp"
 #include "master_webui.hpp"
+#include "allocator_factory.hpp"
+#include "ft_messaging.hpp"
+
+#include <glog/logging.h>
+
 
 using std::endl;
 using std::max;
@@ -31,56 +27,8 @@ using namespace nexus::internal;
 using namespace nexus::internal::master;
 
 
-/* List of ZooKeeper host:port pairs (from master_main.cpp/local.cpp). */
-extern string zookeeper;
-
 namespace {
 
-#ifdef USING_ZOOKEEPER
-class MasterWatcher : public Watcher
-{
-private:
-  Master *master;
-
-public:
-  void process(ZooKeeper *zk, int type, int state, const string &path)
-  {
-    if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_SESSION_EVENT)) {
-      // Create znode for master identification.
-      string znode = "/home/nexus/master";
-      string dirname = "/home/nexus";
-      string delimiter = "/";
-      string contents = "";
-
-      int ret;
-      string result;
-
-      // Create directory path znodes as necessary.
-      size_t index = dirname.find(delimiter, 0);
-      while (index < string::npos) {
-	index = dirname.find(delimiter, index+1);
-	string prefix = dirname.substr(0, index);
-	ret = zk->create(prefix, contents, ZOO_CREATOR_ALL_ACL,
-			 0, &result);
-	if (ret != ZOK && ret != ZNODEEXISTS)
-	  fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
-      }
-
-      // Now create znode.
-      ret = zk->create(znode, master->getPID(), ZOO_CREATOR_ALL_ACL,
-		       ZOO_EPHEMERAL, &result);
-
-      if (ret != ZOK)
-	fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
-    } else {
-      fatal("unhandled ZooKeeper event!");
-    }
-  }
-
-  MasterWatcher(Master *_master) : master(_master) {}
-};
-#endif
-
 // A process that periodically pings the master to check filter expiries, etc
 class AllocatorTimer : public Tuple<Process>
 {
@@ -163,24 +111,50 @@ public:
 }
 
 
-Master::Master()
-  : nextFrameworkId(0), nextSlaveId(0), nextSlotOfferId(0),
-    allocatorType("simple")
-{}
+Master::Master(const string &zk)
+  : leaderDetector(NULL), nextFrameworkId(0), nextSlaveId(0), 
+    nextSlotOfferId(0), allocatorType("simple"), masterId(0)
+{
+  if (zk != "") {
+    pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(zk);
+    if (urlPair.first == UrlProcessor::ZOO) {
+      isFT = true;
+      zkServers = urlPair.second;
+    } else {
+      LOG(ERROR) << "Failed to parse URL for ZooKeeper servers. URL must start with zoo:// or zoofile://";
+      exit(1);
+    }
+  }
+  ftMsg = FTMessaging::getInstance();
+}
 
 
-Master::Master(const string& _allocatorType)
-  : nextFrameworkId(0), nextSlaveId(0), nextSlotOfferId(0),
-    allocatorType(_allocatorType)
-{}
+Master::Master(const string& _allocatorType, const string &zk)
+  : leaderDetector(NULL), nextFrameworkId(0), nextSlaveId(0), 
+    nextSlotOfferId(0), allocatorType(_allocatorType), masterId(0)
+{
+  if (zk != "") {
+    pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(zk);
+    if (urlPair.first == UrlProcessor::ZOO) {
+      isFT = true;
+      zkServers = urlPair.second;
+    } else {
+      LOG(ERROR) << "Failed to parse URL for ZooKeeper servers. URL must start with zoo:// or zoofile://";
+      exit(1);
+    }
+  }
+  ftMsg = FTMessaging::getInstance();
+}
                    
 
 Master::~Master()
 {
+  if (isFT && leaderDetector != NULL)
+    delete leaderDetector;
   LOG(INFO) << "Shutting down master";
   delete allocator;
   foreachpair (_, Framework *framework, frameworks) {
-    foreachpair(_, Task *task, framework->tasks)
+    foreachpair(_, TaskInfo *task, framework->tasks)
       delete task;
     delete framework;
   }
@@ -211,7 +185,7 @@ state::MasterState * Master::getState()
        f->executorInfo.uri, f->resources.cpus, f->resources.mem,
        f->connectTime);
     state->frameworks.push_back(framework);
-    foreachpair (_, Task *t, f->tasks) {
+    foreachpair (_, TaskInfo *t, f->tasks) {
       state::Task *task = new state::Task(t->id, t->name, t->frameworkId,
           t->slaveId, t->state, t->resources.cpus, t->resources.mem);
       framework->tasks.push_back(task);
@@ -285,10 +259,44 @@ SlotOffer * Master::lookupSlotOffer(Offe
     return NULL;
 }
 
+void Master::updateFrameworkTasks() {
+  foreachpair (SlaveID sid, Slave *slave, slaves) {
+    foreachpair (_, TaskInfo *task, slave->tasks) {
+      updateFrameworkTasks(task);
+    }
+  }
+}
+
+//alibandali++
+void Master::updateFrameworkTasks(TaskInfo *task) {
+  Framework *fwrk = lookupFramework(task->frameworkId);
+  if (fwrk != NULL) {
+    if (fwrk->tasks.find(task->id) == fwrk->tasks.end()) {
+      fwrk->tasks[task->id] = task;
+      // this->resources += resources; // alig: not sure if this should be done or not
+    }
+  }
+}
+
 
 void Master::operator () ()
 {
-  LOG(INFO) << "Master started at " << self();
+  LOG(INFO) << "Master started at nexus://" << self();
+
+  if (isFT) {
+    LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
+    ostringstream lpid;
+    lpid << self();
+    leaderDetector = new LeaderDetector(zkServers, true, lpid.str());
+    
+    string myLeaderSeq = leaderDetector->getMySeq();
+    if (myLeaderSeq == "") {
+      LOG(FATAL) << "Cannot proceed since new FT master sequence number could not be fetched from ZK.";
+      exit(1);
+    }
+    masterId = lexical_cast<long>(myLeaderSeq);
+    LOG(INFO) << "Master ID:" << masterId;
+  }
 
   allocator = createAllocator();
   if (!allocator)
@@ -297,21 +305,16 @@ void Master::operator () ()
   link(spawn(new AllocatorTimer(self())));
   //link(spawn(new SharesPrinter(self())));
 
-#ifdef USING_ZOOKEEPER
-  ZooKeeper *zk;
-  if (!zookeeper.empty())
-    zk = new ZooKeeper(zookeeper, 10000, new MasterWatcher(this));
-#endif
-
   while (true) {
     switch (receive()) {
 
     case F2M_REGISTER_FRAMEWORK: {
-      FrameworkID fid = nextFrameworkId++;
+      FrameworkID fid = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextFrameworkId++);
+
       Framework *framework = new Framework(from(), fid);
       unpack<F2M_REGISTER_FRAMEWORK>(framework->name,
-                                     framework->user,
-                                     framework->executorInfo);
+				     framework->user,
+				     framework->executorInfo);
       LOG(INFO) << "Registering " << framework << " at " << framework->pid;
       frameworks[fid] = framework;
       pidToFid[framework->pid] = fid;
@@ -323,13 +326,75 @@ void Master::operator () ()
       break;
     }
 
+    case F2M_REREGISTER_FRAMEWORK: {
+
+      Framework *framework = new Framework(from());
+      unpack<F2M_REREGISTER_FRAMEWORK>(framework->id,
+                                       framework->name,
+                                       framework->user,
+                                       framework->executorInfo);
+
+      if (framework->id == "") {
+        DLOG(INFO) << "Framework reconnecting without a FrameworkID, generating new id";
+        framework->id = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextFrameworkId++);
+      }
+
+      LOG(INFO) << "Registering " << framework << " at " << framework->pid;
+      frameworks[framework->id] = framework;
+      pidToFid[framework->pid] = framework->id;
+
+      updateFrameworkTasks();
+
+      link(framework->pid);
+      send(framework->pid, pack<M2F_REGISTER_REPLY>(framework->id));
+      allocator->frameworkAdded(framework);
+      if (framework->executorInfo.uri == "")
+        terminateFramework(framework, 1, "No executor URI given");
+
+      DLOG(INFO) << "STAT: Slave count: " << slaves.size() << " Framework count: " << frameworks.size();
+
+      break;
+    }
+
     case F2M_UNREGISTER_FRAMEWORK: {
       FrameworkID fid;
       unpack<F2M_UNREGISTER_FRAMEWORK>(fid);
       LOG(INFO) << "Asked to unregister framework " << fid;
       Framework *framework = lookupFramework(fid);
       if (framework != NULL)
-        removeFramework(framework);
+	removeFramework(framework);
+      break;
+    }
+
+    case F2M_FT_SLOT_OFFER_REPLY: {
+      FrameworkID fid;
+      OfferID oid;
+      vector<TaskDescription> tasks;
+      Params params;
+      string ftId, senderStr;
+      unpack<F2M_FT_SLOT_OFFER_REPLY>(ftId, senderStr, fid, oid, tasks, params);
+      PID senderPid;
+      istringstream ss(senderStr);
+      ss >> senderPid;
+      if (!ftMsg->acceptMessageAckTo(ftId, senderPid, senderStr)) {
+        LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << ftId;
+        break;
+      } 
+      Framework *framework = lookupFramework(fid);
+      if (framework != NULL) {
+	SlotOffer *offer = lookupSlotOffer(oid);
+	if (offer != NULL) {
+	  processOfferReply(offer, tasks, params);
+	} else {
+	  // The slot offer is gone, meaning that we rescinded it or that
+	  // the slave was lost; immediately report any tasks in it as lost
+	  foreach (TaskDescription &t, tasks) {
+	    send(framework->pid,
+		 pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
+	  }
+	}
+      } else
+        DLOG(INFO) << "F2M_FT_SLOT_OFFER_REPLY error: couldn't lookup framework id" << fid;
       break;
     }
 
@@ -341,17 +406,17 @@ void Master::operator () ()
       unpack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params);
       Framework *framework = lookupFramework(fid);
       if (framework != NULL) {
-        SlotOffer *offer = lookupSlotOffer(oid);
-        if (offer != NULL) {
-          processOfferReply(offer, tasks, params);
-        } else {
-          // The slot offer is gone, meaning that we rescinded it or that
-          // the slave was lost; immediately report any tasks in it as lost
-          foreach (TaskDescription &t, tasks) {
-            send(framework->pid,
-                 pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
-          }
-        }
+	SlotOffer *offer = lookupSlotOffer(oid);
+	if (offer != NULL) {
+	  processOfferReply(offer, tasks, params);
+	} else {
+	  // The slot offer is gone, meaning that we rescinded it or that
+	  // the slave was lost; immediately report any tasks in it as lost
+	  foreach (TaskDescription &t, tasks) {
+	    send(framework->pid,
+		 pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
+	  }
+	}
       }
       break;
     }
@@ -361,9 +426,9 @@ void Master::operator () ()
       unpack<F2M_REVIVE_OFFERS>(fid);
       Framework *framework = lookupFramework(fid);
       if (framework != NULL) {
-        LOG(INFO) << "Reviving offers for " << framework;
-        framework->slaveFilter.clear();
-        allocator->offersRevived(framework);
+	LOG(INFO) << "Reviving offers for " << framework;
+	framework->slaveFilter.clear();
+	allocator->offersRevived(framework);
       }
       break;
     }
@@ -374,34 +439,39 @@ void Master::operator () ()
       unpack<F2M_KILL_TASK>(fid, tid);
       Framework *framework = lookupFramework(fid);
       if (framework != NULL) {
-        Task *task = framework->lookupTask(tid);
-        if (task != NULL) {
-          LOG(INFO) << "Asked to kill " << task << " by its framework";
-          killTask(task);
-        }
+	TaskInfo *task = framework->lookupTask(tid);
+	if (task != NULL) {
+	  LOG(INFO) << "Asked to kill " << task << " by its framework";
+	  killTask(task);
+	}
       }
       break;
     }
 
-    case F2M_FRAMEWORK_MESSAGE: {
+    case F2M_FT_FRAMEWORK_MESSAGE: {
       FrameworkID fid;
       FrameworkMessage message;
-      unpack<F2M_FRAMEWORK_MESSAGE>(fid, message);
+      string ftId, senderStr;
+      unpack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, fid, message);
       Framework *framework = lookupFramework(fid);
       if (framework != NULL) {
-        Slave *slave = lookupSlave(message.slaveId);
-        if (slave != NULL) {
-          LOG(INFO) << "Sending framework message to " << slave;
-          send(slave->pid, pack<M2S_FRAMEWORK_MESSAGE>(fid, message));
-        }
-      }
+	Slave *slave = lookupSlave(message.slaveId);
+	if (slave != NULL) {
+	  LOG(INFO) << "Sending framework message to " << slave;
+	  send(slave->pid, pack<M2S_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, fid, message));
+        } else
+          DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup framework id" << fid;
+      } else
+        DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup slave id" << message.slaveId;
       break;
     }
 
     case S2M_REGISTER_SLAVE: {
-      Slave *slave = new Slave(from(), nextSlaveId++);
+      string slaveId = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextSlaveId++);
+
+      Slave *slave = new Slave(from(), slaveId);
       unpack<S2M_REGISTER_SLAVE>(slave->hostname, slave->publicDns,
-          slave->resources);
+	  slave->resources);
       LOG(INFO) << "Registering " << slave << " at " << slave->pid;
       slaves[slave->id] = slave;
       pidToSid[slave->pid] = slave->id;
@@ -411,29 +481,70 @@ void Master::operator () ()
       break;
     }
 
+    case S2M_REREGISTER_SLAVE: {
+      Slave *slave = new Slave(from());
+      vector<TaskInfo> taskVec;
+
+      unpack<S2M_REREGISTER_SLAVE>(slave->id, slave->hostname, slave->publicDns,
+      				   slave->resources, taskVec);
+
+      if (slave->id == "") {
+        slave->id = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextSlaveId++);
+        DLOG(WARNING) << "Slave re-registered without a SlaveID, generating a new id for it.";
+      }
+
+      foreach(TaskInfo &ti, taskVec) {
+        TaskInfo *tip = new TaskInfo(ti);
+	slave->addTask(tip);
+        updateFrameworkTasks(tip);
+      }
+  
+     //alibandali
+      LOG(INFO) << "Re-registering " << slave << " at " << slave->pid;
+      slaves[slave->id] = slave;
+      pidToSid[slave->pid] = slave->id;
+      link(slave->pid);
+      send(slave->pid, pack<M2S_REREGISTER_REPLY>(slave->id));
+      allocator->slaveAdded(slave);
+
+      DLOG(INFO) << "STAT: Slave count: " << slaves.size() << " Framework count: " << frameworks.size();
+
+      break;
+    }
+
     case S2M_UNREGISTER_SLAVE: {
       SlaveID sid;
       unpack<S2M_UNREGISTER_SLAVE>(sid);
       LOG(INFO) << "Asked to unregister slave " << sid;
       Slave *slave = lookupSlave(sid);
       if (slave != NULL)
-        removeSlave(slave);
+	removeSlave(slave);
       break;
     }
 
-    case S2M_STATUS_UPDATE: {
+    case S2M_FT_STATUS_UPDATE: {
       SlaveID sid;
       FrameworkID fid;
       TaskID tid;
       TaskState state;
       string data;
-      unpack<S2M_STATUS_UPDATE>(sid, fid, tid, state, data);
+      string ftId, senderStr;
+
+      unpack<S2M_FT_STATUS_UPDATE>(ftId, senderStr, sid, fid, tid, state, data);
+      DLOG(INFO) << "FT: prepare relay ftId:"<< ftId << " from: "<< senderStr;
       if (Slave *slave = lookupSlave(sid)) {
-        if (Framework *framework = lookupFramework(fid)) {
-          // Pass on the status update to the framework
-          send(framework->pid, pack<M2F_STATUS_UPDATE>(tid, state, data));
+	if (Framework *framework = lookupFramework(fid)) {
+	  // Pass on the status update to the framework
+
+          DLOG(INFO) << "FT: relaying ftId:"<< ftId << " from: "<< senderStr;
+          send(framework->pid, pack<M2F_FT_STATUS_UPDATE>(ftId, senderStr, tid, state, data));
+
+          if (!ftMsg->acceptMessage(ftId, senderStr)) {
+            LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << ftId;
+            break;
+          } 
           // Update the task state locally
-          Task *task = slave->lookupTask(fid, tid);
+          TaskInfo *task = slave->lookupTask(fid, tid);
           if (task != NULL) {
             LOG(INFO) << "Status update: " << task << " is in state " << state;
             task->state = state;
@@ -444,11 +555,65 @@ void Master::operator () ()
               removeTask(task, TRR_TASK_ENDED);
             }
           }
-        }
-        break;
-      }
+	} else
+          DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup framework id" << fid;
+      } else 
+        DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup slave id" << sid;
+      
+      break;
+    }
+
+    case S2M_STATUS_UPDATE: {
+      SlaveID sid;
+      FrameworkID fid;
+      TaskID tid;
+      TaskState state;
+      string data;
+      unpack<S2M_STATUS_UPDATE>(sid, fid, tid, state, data);
+      if (Slave *slave = lookupSlave(sid)) {
+	if (Framework *framework = lookupFramework(fid)) {
+	  // Pass on the status update to the framework
+	  send(framework->pid, pack<M2F_STATUS_UPDATE>(tid, state, data));
+	  // Update the task state locally
+	  TaskInfo *task = slave->lookupTask(fid, tid);
+	  if (task != NULL) {
+	    LOG(INFO) << "Status update: " << task << " is in state " << state;
+	    task->state = state;
+	    // Remove the task if it finished or failed
+	    if (state == TASK_FINISHED || state == TASK_FAILED ||
+		state == TASK_KILLED || state == TASK_LOST) {
+	      LOG(INFO) << "Removing " << task << " because it's done";
+	      removeTask(task, TRR_TASK_ENDED);
+	    }
+	  }
+	} else
+          DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup framework id" << fid;
+      } else
+        DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup slave id" << sid;
+      break;
     }
       
+    case S2M_FT_FRAMEWORK_MESSAGE: {
+      SlaveID sid;
+      FrameworkID fid;
+      FrameworkMessage message; 
+      string ftId, senderStr;
+      unpack<S2M_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, sid, fid, message);
+      Slave *slave = lookupSlave(sid);
+      if (slave != NULL) {
+	Framework *framework = lookupFramework(fid);
+	if (framework != NULL) {
+
+	  send(framework->pid, pack<M2F_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, message));
+
+        } else
+          DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup framework id" << fid;
+      } else
+        DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup slave id" << sid;
+
+      break;
+    }
+
     case S2M_FRAMEWORK_MESSAGE: {
       SlaveID sid;
       FrameworkID fid;
@@ -456,9 +621,9 @@ void Master::operator () ()
       unpack<S2M_FRAMEWORK_MESSAGE>(sid, fid, message);
       Slave *slave = lookupSlave(sid);
       if (slave != NULL) {
-        Framework *framework = lookupFramework(fid);
-        if (framework != NULL)
-          send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
+	Framework *framework = lookupFramework(fid);
+	if (framework != NULL)
+	  send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
       }
       break;
     }
@@ -470,18 +635,18 @@ void Master::operator () ()
       unpack<S2M_LOST_EXECUTOR>(sid, fid, status);
       Slave *slave = lookupSlave(sid);
       if (slave != NULL) {
-        Framework *framework = lookupFramework(fid);
-        if (framework != NULL) {
-          ostringstream oss;
-          if (status == -1) {
-            oss << "Executor on " << slave << " (" << slave->hostname
-                << ") disconnected";
-          } else {
-            oss << "Executor on " << slave << " (" << slave->hostname
-                << ") exited with status " << status;
-          }
-          terminateFramework(framework, status, oss.str());
-        }
+	Framework *framework = lookupFramework(fid);
+	if (framework != NULL) {
+	  ostringstream oss;
+	  if (status == -1) {
+	    oss << "Executor on " << slave << " (" << slave->hostname
+		<< ") disconnected";
+	  } else {
+	    oss << "Executor on " << slave << " (" << slave->hostname
+		<< ") exited with status " << status;
+	  }
+	  terminateFramework(framework, status, oss.str());
+	}
       }
       break;
     }
@@ -491,19 +656,43 @@ void Master::operator () ()
       unpack<SH2M_HEARTBEAT>(sid);
       Slave *slave = lookupSlave(sid);
       if (slave != NULL)
-        //LOG(INFO) << "Received heartbeat for " << slave << " from " << from();
-        ;
+	//LOG(INFO) << "Received heartbeat for " << slave << " from " << from();
+	;
       else
-        LOG(WARNING) << "Received heartbeat for UNKNOWN slave " << sid
-                     << " from " << from();
+	LOG(WARNING) << "Received heartbeat for UNKNOWN slave " << sid
+		     << " from " << from();
       break;
     }
 
     case M2M_TIMER_TICK: {
       LOG(INFO) << "Allocator timer tick";
       foreachpair (_, Framework *framework, frameworks)
-        framework->removeExpiredFilters();
+	framework->removeExpiredFilters();
       allocator->timerTick();
+
+      // int cnts = 0;
+      // foreachpair(_, Framework *framework, frameworks) {
+      // 	DLOG(INFO) << (cnts++) << " resourceInUse:" << framework->resources;
+      // }
+      break;
+    }
+
+    case FT_RELAY_ACK: {
+      string ftId;
+      string origPidStr;
+      unpack<FT_RELAY_ACK>(ftId, origPidStr);
+      
+      DLOG(INFO) << "FT_RELAY_ACK for " << ftId << " forwarding it to " << origPidStr;
+            
+      PID origPid;
+      istringstream iss(origPidStr);
+      if (!(iss >> origPid)) {
+        cerr << "FT: Failed to resolve PID for originator: " << origPidStr << endl;
+        break;
+      }
+
+      send(origPid, pack<FT_RELAY_ACK>(ftId, origPidStr));
+
       break;
     }
 
@@ -548,7 +737,8 @@ void Master::operator () ()
 OfferID Master::makeOffer(Framework *framework,
                           const vector<SlaveResources>& resources)
 {
-  OfferID oid = nextSlotOfferId++;
+  OfferID oid = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextSlotOfferId++);
+
   SlotOffer *offer = new SlotOffer(oid, framework->id, resources);
   slotOffers[offer->id] = offer;
   framework->addOffer(offer);
@@ -562,7 +752,7 @@ OfferID Master::makeOffer(Framework *fra
     Params params;
     params.set("cpus", r.resources.cpus);
     params.set("mem", r.resources.mem);
-    SlaveOffer offer(r.slave->id, r.slave->hostname, params.getMap());
+    SlaveOffer offer(r.slave->id, r.slave->hostname, params.getMap(), r.slave->pid);
     offers.push_back(offer);
   }
   send(framework->pid, pack<M2F_SLOT_OFFER>(oid, offers));
@@ -661,13 +851,13 @@ void Master::launchTask(Framework *f, co
   Resources res(params.getInt32("cpus", -1),
                 params.getInt64("mem", -1));
   Slave *slave = lookupSlave(t.slaveId);
-  Task *task = f->addTask(t.taskId, t.name, slave->id, res);
+  TaskInfo *task = f->addTask(t.taskId, t.name, slave->id, res);
   LOG(INFO) << "Launching " << task << " on " << slave;
   slave->addTask(task);
   allocator->taskAdded(task);
   send(slave->pid, pack<M2S_RUN_TASK>(
         f->id, t.taskId, f->name, f->user, f->executorInfo,
-        t.name, t.arg, t.params));
+        t.name, t.arg, t.params, (string)f->pid));
 }
 
 
@@ -677,7 +867,7 @@ void Master::rescindOffer(SlotOffer *off
 }
 
 
-void Master::killTask(Task *task)
+void Master::killTask(TaskInfo *task)
 {
   LOG(FATAL) << "Killing " << task;
   Framework *framework = lookupFramework(task->frameworkId);
@@ -746,8 +936,8 @@ void Master::removeFramework(Framework *
     send(slave->pid, pack<M2S_KILL_FRAMEWORK>(framework->id));
 
   // Remove pointers to the framework's tasks in slaves
-  unordered_map<TaskID, Task *> tasksCopy = framework->tasks;
-  foreachpair (_, Task *task, tasksCopy) {
+  unordered_map<TaskID, TaskInfo *> tasksCopy = framework->tasks;
+  foreachpair (_, TaskInfo *task, tasksCopy) {
     Slave *slave = lookupSlave(task->slaveId);
     CHECK(slave != NULL);
     removeTask(task, TRR_FRAMEWORK_LOST);
@@ -776,8 +966,8 @@ void Master::removeSlave(Slave *slave)
   // TODO: Notify allocator that a slave removal is beginning?
   
   // Remove pointers to slave's tasks in frameworks, and send status updates
-  unordered_map<pair<FrameworkID, TaskID>, Task *> tasksCopy = slave->tasks;
-  foreachpair (_, Task *task, tasksCopy) {
+  unordered_map<pair<FrameworkID, TaskID>, TaskInfo *> tasksCopy = slave->tasks;
+  foreachpair (_, TaskInfo *task, tasksCopy) {
     Framework *framework = lookupFramework(task->frameworkId);
     CHECK(framework != NULL);
     send(framework->pid, pack<M2F_STATUS_UPDATE>(task->id, TASK_LOST,
@@ -818,7 +1008,7 @@ void Master::removeSlave(Slave *slave)
 
 
 // Remove a slot offer (because it was replied or we lost a framework or slave)
-void Master::removeTask(Task *task, TaskRemovalReason reason)
+void Master::removeTask(TaskInfo *task, TaskRemovalReason reason)
 {
   Framework *framework = lookupFramework(task->frameworkId);
   Slave *slave = lookupSlave(task->slaveId);