You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/04/19 22:44:32 UTC
svn commit: r1470033 - in /incubator/mesos/trunk/src: common/type_utils.hpp
slave/slave.cpp slave/status_update_manager.cpp
slave/status_update_manager.hpp tests/status_update_manager_tests.cpp
Author: vinodkone
Date: Fri Apr 19 20:44:32 2013
New Revision: 1470033
URL: http://svn.apache.org/r1470033
Log:
Fixed status udpate manager to ignore unexpected acknowledgements.
Review: https://reviews.apache.org/r/10660
Modified:
incubator/mesos/trunk/src/common/type_utils.hpp
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/status_update_manager.cpp
incubator/mesos/trunk/src/slave/status_update_manager.hpp
incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp
Modified: incubator/mesos/trunk/src/common/type_utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/type_utils.hpp?rev=1470033&r1=1470032&r2=1470033&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/type_utils.hpp (original)
+++ incubator/mesos/trunk/src/common/type_utils.hpp Fri Apr 19 20:44:32 2013
@@ -19,11 +19,13 @@
#ifndef __TYPE_UTILS_HPP__
#define __TYPE_UTILS_HPP__
+#include <boost/functional/hash.hpp>
+
#include <google/protobuf/descriptor.h>
#include <mesos/mesos.hpp>
-#include <boost/functional/hash.hpp>
+#include <stout/uuid.hpp>
#include "common/attributes.hpp"
#include "common/resources.hpp"
@@ -340,7 +342,8 @@ inline std::ostream& operator << (
{
return stream
<< update.status().state()
- << " from task " << update.status().task_id()
+ << " (UUID: " << UUID::fromBytes(update.uuid())
+ << ") for task " << update.status().task_id()
<< " of framework " << update.framework_id();
}
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1470033&r1=1470032&r2=1470033&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Fri Apr 19 20:44:32 2013
@@ -1289,7 +1289,7 @@ void Slave::statusUpdateAcknowledgement(
const TaskID& taskId,
const string& uuid)
{
- LOG(INFO) << "Got acknowledgement of status update"
+ LOG(INFO) << "Got status update acknowledgement " << UUID::fromBytes(uuid)
<< " for task " << taskId
<< " of framework " << frameworkId;
@@ -1311,7 +1311,7 @@ void Slave::_statusUpdateAcknowledgement
const UUID& uuid)
{
if (!future.isReady()) {
- LOG(FATAL) << "Failed to handle status update acknowledgement"
+ LOG(FATAL) << "Failed to handle status update acknowledgement " << uuid
<< " for task " << taskId
<< " of framework " << frameworkId << ": "
<< (future.isFailed() ? future.failure() : "future discarded");
@@ -1319,7 +1319,7 @@ void Slave::_statusUpdateAcknowledgement
}
if (future.get().isError()) {
- LOG(ERROR) << "Failed to handle the status update acknowledgement"
+ LOG(ERROR) << "Failed to handle the status update acknowledgement " << uuid
<< " for task " << taskId
<< " of framework " << frameworkId
<< ": " << future.get().error();
@@ -1327,13 +1327,14 @@ void Slave::_statusUpdateAcknowledgement
}
if (!future.get().get()) {
- LOG(WARNING) << "Ignoring status update acknowledgement"
+ LOG(WARNING) << "Ignoring status update acknowledgement " << uuid
<< " for task " << taskId
<< " of framework " << frameworkId;
}
LOG(INFO) << "Status update manager successfully handled status update"
- << " acknowledgement for task " << taskId
+ << " acknowledgement " << uuid
+ << " for task " << taskId
<< " of framework " << frameworkId;
CHECK(state == RECOVERING || state == DISCONNECTED ||
@@ -1342,7 +1343,8 @@ void Slave::_statusUpdateAcknowledgement
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
- LOG(ERROR) << "Status update acknowledgement for task " << taskId
+ LOG(ERROR) << "Status update acknowledgement " << uuid
+ << " for task " << taskId
<< " of unknown framework " << frameworkId;
return;
}
@@ -1354,7 +1356,8 @@ void Slave::_statusUpdateAcknowledgement
// Find the executor that has this update.
Executor* executor = framework->getExecutor(taskId);
if (executor == NULL) {
- LOG(ERROR) << "Status update acknowledgement for task " << taskId
+ LOG(ERROR) << "Status update acknowledgement " << uuid
+ << " for task " << taskId
<< " of unknown executor";
return;
}
@@ -1780,10 +1783,13 @@ void Slave::_statusUpdate(
return;
}
+ LOG(INFO) << "Status update manager successfully handled status update "
+ << update;
+
// Status update manager successfully handled the status update.
// Acknowledge the executor, if necessary.
if (pid.isSome()) {
- LOG(INFO) << "Sending ACK for status update " << update
+ LOG(INFO) << "Sending acknowledgement for status update " << update
<< " to executor " << pid.get();
StatusUpdateAcknowledgementMessage message;
message.mutable_framework_id()->MergeFrom(update.framework_id());
Modified: incubator/mesos/trunk/src/slave/status_update_manager.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/status_update_manager.cpp?rev=1470033&r1=1470032&r2=1470033&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/status_update_manager.cpp (original)
+++ incubator/mesos/trunk/src/slave/status_update_manager.cpp Fri Apr 19 20:44:32 2013
@@ -380,7 +380,7 @@ Try<bool> StatusUpdateManagerProcess::ac
// This might happen if we retried a status update and got back
// acknowledgments for both the original and the retried update.
if (update.isNone()) {
- LOG(WARNING) << "Ignoring duplicate status update acknowledgment " << uuid
+ LOG(WARNING) << "Ignoring unexpected status update acknowledgment " << uuid
<< " for task " << taskId
<< " of framework " << frameworkId;
return false;
Modified: incubator/mesos/trunk/src/slave/status_update_manager.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/status_update_manager.hpp?rev=1470033&r1=1470032&r2=1470033&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/status_update_manager.hpp (original)
+++ incubator/mesos/trunk/src/slave/status_update_manager.hpp Fri Apr 19 20:44:32 2013
@@ -237,15 +237,20 @@ struct StatusUpdateStream
if (acknowledged.contains(uuid)) {
LOG(WARNING) << "Ignoring duplicate status update acknowledgment " << uuid
- << " for task " << taskId
- << " of framework " << frameworkId;;
+ << " for update " << update;
return false;
}
- CHECK(uuid == UUID::fromBytes(update.uuid()))
- << "Unexpected UUID mismatch! (received " << uuid
- << ", expecting " << UUID::fromBytes(update.uuid()).toString()
- << ") for update " << stringify(update);
+ // This might happen if we retried a status update and got back
+ // acknowledgments for both the original and the retried update.
+ if (uuid != UUID::fromBytes(update.uuid())) {
+ LOG(WARNING)
+ << "Ignoring unexpected status update acknowledgement "
+ << "(received " << uuid
+ << ", expecting " << UUID::fromBytes(update.uuid())
+ << ") for update " << update;
+ return false;
+ }
// Handle the ACK, checkpointing if necessary.
Try<Nothing> result = handle(update, StatusUpdateRecord::ACK);
@@ -347,8 +352,6 @@ private:
{
CHECK(error.isNone());
- LOG(INFO) << "Handling " << type << " for status update " << update;
-
if (type == StatusUpdateRecord::UPDATE) {
// Record this update.
received.insert(UUID::fromBytes(update.uuid()));
Modified: incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp?rev=1470033&r1=1470032&r2=1470033&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp Fri Apr 19 20:44:32 2013
@@ -120,7 +120,7 @@ TEST_F(StatusUpdateManagerTest, Checkpoi
driver.start();
- AWAIT_UNTIL(offers);
+ AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
EXPECT_CALL(exec, registered(_, _, _, _))
@@ -138,10 +138,10 @@ TEST_F(StatusUpdateManagerTest, Checkpoi
driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
- AWAIT_UNTIL(status);
+ AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
- AWAIT_UNTIL(_statusUpdateAcknowledgement);
+ AWAIT_READY(_statusUpdateAcknowledgement);
// Ensure that both the status update and its acknowledgement are
// correctly checkpointed.
@@ -185,7 +185,7 @@ TEST_F(StatusUpdateManagerTest, Checkpoi
driver.stop();
driver.join();
- AWAIT_UNTIL(shutdown); // Ensures MockExecutor can be deallocated.
+ AWAIT_READY(shutdown); // Ensures MockExecutor can be deallocated.
cluster.shutdown();
}
@@ -223,7 +223,7 @@ TEST_F(StatusUpdateManagerTest, RetrySta
driver.start();
- AWAIT_UNTIL(offers);
+ AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
EXPECT_CALL(exec, registered(_, _, _, _))
@@ -239,7 +239,7 @@ TEST_F(StatusUpdateManagerTest, RetrySta
driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
- AWAIT_UNTIL(statusUpdateMessage);
+ AWAIT_READY(statusUpdateMessage);
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
@@ -247,7 +247,7 @@ TEST_F(StatusUpdateManagerTest, RetrySta
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
- AWAIT_UNTIL(status);
+ AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
@@ -260,7 +260,7 @@ TEST_F(StatusUpdateManagerTest, RetrySta
driver.stop();
driver.join();
- AWAIT_UNTIL(shutdown); // Ensures MockExecutor can be deallocated.
+ AWAIT_READY(shutdown); // Ensures MockExecutor can be deallocated.
cluster.shutdown();
}
@@ -303,7 +303,7 @@ TEST_F(StatusUpdateManagerTest, IgnoreDu
driver.start();
- AWAIT_UNTIL(offers);
+ AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
ExecutorDriver* execDriver;
@@ -322,7 +322,7 @@ TEST_F(StatusUpdateManagerTest, IgnoreDu
driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
- AWAIT_UNTIL(statusUpdateMessage);
+ AWAIT_READY(statusUpdateMessage);
StatusUpdate update = statusUpdateMessage.get().update();
Future<TaskStatus> status;
@@ -335,7 +335,7 @@ TEST_F(StatusUpdateManagerTest, IgnoreDu
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
- AWAIT_UNTIL(status);
+ AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
@@ -370,8 +370,7 @@ TEST_F(StatusUpdateManagerTest, IgnoreDu
// TODO(vinod): It would've been great to introspect the first
// argument of '_statusUpdateAcknowledement()' and ensure that
- // it is 'false'. All we do now is wait for the slave to not crash
- // due to CHECK failure in status update manager.
+ // it is 'false'.
AWAIT_READY(duplicateAck);
Clock::resume();
@@ -383,7 +382,104 @@ TEST_F(StatusUpdateManagerTest, IgnoreDu
driver.stop();
driver.join();
- AWAIT_UNTIL(shutdown); // Ensures MockExecutor can be deallocated.
+ AWAIT_READY(shutdown); // Ensures MockExecutor can be deallocated.
+
+ cluster.shutdown();
+}
+
+
+// This test verifies that status update manager ignores
+// unexpected ACK for an earlier update when it is waiting
+// for an ACK for another update. We do this by dropping ACKs
+// for the original update and sending a random ACK to the slave.
+TEST_F(StatusUpdateManagerTest, IgnoreUnexpectedStatusUpdateAck)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ Try<PID<Master> > master = cluster.masters.start();
+ ASSERT_SOME(master);
+
+ MockExecutor exec;
+
+ slave::Flags flags = cluster.slaves.flags;
+ flags.checkpoint = true;
+ Try<PID<Slave> > slave = cluster.slaves.start(
+ flags, DEFAULT_EXECUTOR_ID, &exec);
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+ frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+ FrameworkID frameworkId;
+ EXPECT_CALL(sched, registered(_, _, _))
+ .WillOnce(SaveArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ ExecutorDriver* execDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(SaveArg<0>(&execDriver));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Future<StatusUpdateMessage> statusUpdateMessage =
+ FUTURE_PROTOBUF(StatusUpdateMessage(), master.get(), _);
+
+ // Drop the ACKs, so that status update manager
+ // retries the update.
+ DROP_PROTOBUFS(StatusUpdateAcknowledgementMessage(), _, _);
+
+ driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+ AWAIT_READY(statusUpdateMessage);
+ StatusUpdate update = statusUpdateMessage.get().update();
+
+ AWAIT_READY(status);
+
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ Future<Nothing> unexpectedAck =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ // Now send an ACK with a random UUID.
+ process::dispatch(
+ slave.get(),
+ &Slave::statusUpdateAcknowledgement,
+ update.slave_id(),
+ frameworkId,
+ update.status().task_id(),
+ UUID::random().toBytes());
+
+ // TODO(vinod): It would've been great to introspect the first
+ // argument of '_statusUpdateAcknowledement()' and ensure that
+ // it is 'false'.
+ AWAIT_READY(unexpectedAck);
+
+ Future<Nothing> shutdown;
+ EXPECT_CALL(exec, shutdown(_))
+ .WillRepeatedly(FutureSatisfy(&shutdown));
+
+ driver.stop();
+ driver.join();
+
+ AWAIT_READY(shutdown); // Ensures MockExecutor can be deallocated.
cluster.shutdown();
}