You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:28:53 UTC
svn commit: r1131606 - in /incubator/mesos/trunk/src: ft_messaging.cpp
ft_messaging.hpp master.cpp messages.hpp nexus_sched.cpp slave.cpp
Author: benh
Date: Sun Jun 5 03:28:52 2011
New Revision: 1131606
URL: http://svn.apache.org/viewvc?rev=1131606&view=rev
Log:
Merged two messages into one, removed a serious bug. Added more functionality to ft_messaging, which makes it complicated. But we have to live with the complexity until we can embed messages inside messages in libprocess.
Modified:
incubator/mesos/trunk/src/ft_messaging.cpp
incubator/mesos/trunk/src/ft_messaging.hpp
incubator/mesos/trunk/src/master.cpp
incubator/mesos/trunk/src/messages.hpp
incubator/mesos/trunk/src/nexus_sched.cpp
incubator/mesos/trunk/src/slave.cpp
Modified: incubator/mesos/trunk/src/ft_messaging.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.cpp?rev=1131606&r1=1131605&r2=1131606&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.cpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.cpp Sun Jun 5 03:28:52 2011
@@ -100,7 +100,7 @@ void FTMessaging::sendOutstanding() {
}
// Careful: not idempotent function.
-bool FTMessaging::acceptMessage(string from, string ftId) {
+bool FTMessaging::acceptMessage(string ftId, string from) {
if (inMsgs.find(from)==inMsgs.end()) {
DLOG(INFO) << "FT: new msgs seq: " << ftId;
inMsgs[from] = ftId;
@@ -129,22 +129,26 @@ bool FTMessaging::acceptMessage(string f
}
}
-bool FTMessaging::acceptMessageAck(string from, string ftId) {
- DLOG(INFO) << "FT: Received message with id: " << ftId << " sending FT_RELAY_ACK";
-
+bool FTMessaging::acceptMessageAckTo(PID to, string ftId, string from) {
+ DLOG(INFO) << "FT: Received msg with id: " << ftId << " sending FT_RELAY_ACK to " << to;
+
bool res = acceptMessage(from, ftId);
-
+
if (!res) {
LOG(WARNING) << "FT: asked caller to ignore duplicate message " << ftId;
return res;
}
-
+
string msgStr = Tuple<EmptyClass>::tupleToString( Tuple<EmptyClass>::pack<FT_RELAY_ACK>(ftId, from) );
- Process::post(master, FT_RELAY_ACK, msgStr.data(), msgStr.size());
-
+ Process::post(to, FT_RELAY_ACK, msgStr.data(), msgStr.size());
+
return res;
}
+bool FTMessaging::acceptMessageAck(string ftId, string from) {
+ return acceptMessageAckTo(master, ftId, from);
+}
+
void FTMessaging::setMasterPid(const PID &mPid) {
master = mPid;
}
Modified: incubator/mesos/trunk/src/ft_messaging.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.hpp?rev=1131606&r1=1131605&r2=1131606&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.hpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.hpp Sun Jun 5 03:28:52 2011
@@ -152,19 +152,28 @@ public:
/**
* Checks if a message with FT ID from a node has already been received previously.
- * @param from libprocess PID string representing the original sender of the message
* @param ftId the FT ID of the message
+ * @param from libprocess PID string representing the original sender of the message
* @return true if message has not been received before and it is the next message expected to be received, false otherwise.
*/
- bool acceptMessage(string from, string ftId);
+ bool acceptMessage(string ftId, string from);
/**
* Same as acceptMessage, but also sends an ACK back to the original sender if it returns true.
+ * @param ftId the FT ID of the message
* @param from libprocess PID string representing the original sender of the message
+ * @return true if message has not been received before and it is the next message expected to be received, false otherwise.
+ */
+ bool acceptMessageAck(string ftId, string from);
+
+ /**
+ * Same as acceptMessageAck, but explicitly specifies the pid of the node that should receive the ack.
+ * @param to PID string of the node to receive the ack.
* @param ftId the FT ID of the message
+ * @param from libprocess PID string representing the original sender of the message
* @return true if message has not been received before and it is the next message expected to be received, false otherwise.
*/
- bool acceptMessageAck(string from, string ftId);
+ bool acceptMessageAckTo(PID to, string ftId, string from);
/**
* @return a new unique FT ID for a message to be sent
Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131606&r1=1131605&r2=1131606&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun 5 03:28:52 2011
@@ -360,7 +360,10 @@ void Master::operator () ()
Params params;
string ftId, senderStr;
unpack<F2M_FT_SLOT_OFFER_REPLY>(ftId, senderStr, fid, oid, tasks, params);
- if (!ftMsg->acceptMessageAck(senderStr, ftId)) {
+ PID senderPid;
+ istringstream ss(senderStr);
+ ss >> senderPid;
+ if (!ftMsg->acceptMessageAckTo(senderPid, ftId, senderStr)) {
LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << ftId;
break;
}
@@ -658,7 +661,7 @@ void Master::operator () ()
string origPidStr;
unpack<FT_RELAY_ACK>(ftId, origPidStr);
- DLOG(INFO) << "FT_RELAY_ACK for " << ftId << " FT_ACK sent to " << origPidStr;
+ DLOG(INFO) << "FT_RELAY_ACK for " << ftId << " forwarding it to " << origPidStr;
PID origPid;
istringstream iss(origPidStr);
@@ -667,7 +670,7 @@ void Master::operator () ()
break;
}
- send(origPid, pack<FT_ACK>(ftId));
+ send(origPid, pack<FT_RELAY_ACK>(ftId, origPidStr));
break;
}
Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131606&r1=1131605&r2=1131606&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun 5 03:28:52 2011
@@ -200,18 +200,11 @@ TUPLE(M2F_ERROR,
std::string /*msg*/));
-
TUPLE(FT_RELAY_ACK,
(std::string, /* FT ID */
std::string /* PID of orig */
));
-TUPLE(FT_ACK,
- (std::string /* FT ID */
- ));
-
-
-
TUPLE(S2M_REGISTER_SLAVE,
(std::string /*name*/,
std::string /*publicDns*/,
Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131606&r1=1131605&r2=1131606&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun 5 03:28:52 2011
@@ -228,7 +228,7 @@ protected:
string data;
string ftId, origPid;
unpack<M2F_FT_STATUS_UPDATE>(ftId, origPid, tid, state, data);
- if (!ftMsg->acceptMessageAck(origPid, ftId))
+ if (!ftMsg->acceptMessageAck(ftId, origPid))
break;
DLOG(INFO) << "FT: Received message with id: " << ftId;
@@ -254,7 +254,7 @@ protected:
string ftId, origPid;
unpack<M2F_FT_FRAMEWORK_MESSAGE>(ftId, origPid, msg);
- if (!ftMsg->acceptMessageAck(origPid, ftId))
+ if (!ftMsg->acceptMessageAck(ftId, origPid))
break;
DLOG(INFO) << "FT: Received message with id: " << ftId;
@@ -288,8 +288,13 @@ protected:
case PROCESS_EXIT: {
const char* message = "Connection to master failed";
- LOG(INFO) << message;
- // invoke(bind(&Scheduler::error, sched, driver, -1, message));
+ if (isFT)
+ LOG(WARNING) << "Connection to master failed. Waiting for a new master to be elected.";
+ else
+ {
+ LOG(ERROR) << "Connection to master failed. Exiting. Consider running Nexus in FT mode!";
+ invoke(bind(&Scheduler::error, sched, driver, -1, message));
+ }
break;
}
@@ -309,10 +314,22 @@ protected:
send(master, pack<F2M_REREGISTER_FRAMEWORK>(fid, frameworkName, user, execInfo));
break;
}
+
+ case FT_RELAY_ACK: {
+ string ftId, senderStr;
+ unpack<FT_RELAY_ACK>(ftId, senderStr);
+
+ DLOG(INFO) << "FT: got final ack for " << ftId;
+
+ ftMsg->gotAck(ftId);
+ break;
+ }
+
case PROCESS_TIMEOUT: {
ftMsg->sendOutstanding();
break;
}
+
default: {
ostringstream oss;
oss << "SchedulerProcess received unknown message " << msgid()
@@ -515,6 +532,7 @@ void NexusSchedulerDriver::replyToOffer(
new SchedulerProcess::TimeoutListener(process, tasks);
string ftId = process->ftMsg->getNextId();
+ DLOG(INFO) << "Sending reliably reply to slot offer for msg " << ftId;
process->ftMsg->reliableSend( ftId,
process->pack<F2M_FT_SLOT_OFFER_REPLY>(ftId,
process->self(),
Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131606&r1=1131605&r2=1131606&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun 5 03:28:52 2011
@@ -258,7 +258,7 @@ void Slave::operator () ()
string ftId, origPid;
unpack<M2S_FT_FRAMEWORK_MESSAGE>(ftId, origPid, fid, message);
- if (!ftMsg->acceptMessageAck(origPid, ftId))
+ if (!ftMsg->acceptMessageAck(ftId, origPid))
break;
DLOG(INFO) << "FT: Received message with id: " << ftId;
@@ -361,12 +361,16 @@ void Slave::operator () ()
if (from() == master) {
// TODO: Fault tolerance!
- LOG(ERROR) << "Master disconnected! Committing suicide ...";
- // TODO(matei): Add support for factory style destroy of objects!
- // if (isolationModule != NULL)
- // delete isolationModule;
- // // TODO: Shut down executors?
- // return;
+ if (isFT)
+ LOG(WARNING) << "FT: Master disconnected! Waiting for a new master to be elected.";
+ else
+ {
+ LOG(ERROR) << "Master disconnected! Exiting. Consider running Nexus in FT mode!";
+ if (isolationModule != NULL)
+ delete isolationModule;
+ // TODO: Shut down executors?
+ return;
+ }
}
foreachpair (_, Executor *ex, executors) {
@@ -436,9 +440,9 @@ void Slave::operator () ()
break;
}
- case FT_ACK: {
- string ftId;
- unpack<FT_ACK>(ftId);
+ case FT_RELAY_ACK: {
+ string ftId, senderStr;
+ unpack<FT_RELAY_ACK>(ftId, senderStr);
DLOG(INFO) << "FT: got final ack for " << ftId;