You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/05/24 01:06:58 UTC
[1/2] git commit: Fixed scheduler driver to call disconnected() when
master fails over.
Updated Branches:
refs/heads/master 9438fd5d3 -> 68ccbf1d9
Fixed scheduler driver to call disconnected() when master fails over.
Review: https://reviews.apache.org/r/11191
Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/a45fd2d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/a45fd2d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/a45fd2d3
Branch: refs/heads/master
Commit: a45fd2d3b897887dbdcb1f53a1e66f0a14bd1802
Parents: 9438fd5
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon May 13 15:09:10 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Thu May 23 15:54:57 2013 -0700
----------------------------------------------------------------------
src/sched/sched.cpp | 14 ++++++++++++--
src/tests/fault_tolerance_tests.cpp | 6 ++++++
src/tests/master_tests.cpp | 6 ++++++
3 files changed, 24 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/a45fd2d3/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index a553294..b3593b1 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -184,6 +184,12 @@ protected:
master = pid;
link(master);
+ // If master failed over, inform the scheduler about the
+ // disconnection.
+ if (connected) {
+ scheduler->disconnected(driver);
+ }
+
connected = false;
doReliableRegistration();
}
@@ -192,12 +198,16 @@ protected:
{
VLOG(1) << "No master detected, waiting for another master";
+ // Inform the scheduler about the disconnection if the driver
+ // was previously registered with the master.
+ if (connected) {
+ scheduler->disconnected(driver);
+ }
+
// In this case, we don't actually invoke Scheduler::error
// since we might get reconnected to a master imminently.
connected = false;
master = UPID();
-
- scheduler->disconnected(driver);
}
void registered(const FrameworkID& frameworkId, const MasterInfo& masterInfo)
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/a45fd2d3/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 68cd5fc..d41bef7 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -753,6 +753,10 @@ TEST_F(FaultToleranceTest, FrameworkReregister)
AWAIT_READY(message); // Framework registered message, to get the pid.
AWAIT_READY(registered); // Framework registered call.
+ Future<Nothing> disconnected;
+ EXPECT_CALL(sched, disconnected(&driver))
+ .WillOnce(FutureSatisfy(&disconnected));
+
Future<Nothing> reregistered;
EXPECT_CALL(sched, reregistered(&driver, _))
.WillOnce(FutureSatisfy(&reregistered));
@@ -767,6 +771,8 @@ TEST_F(FaultToleranceTest, FrameworkReregister)
process::post(message.get().to, newMasterDetectedMsg);
+ AWAIT_READY(disconnected);
+
AWAIT_READY(reregistered);
driver.stop();
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/a45fd2d3/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 1b062d9..fe823f6 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -855,12 +855,18 @@ TEST_F(MasterTest, MasterInfoOnReElection)
NewMasterDetectedMessage newMasterDetectedMsg;
newMasterDetectedMsg.set_pid(master.get());
+ Future<Nothing> disconnected;
+ EXPECT_CALL(sched, disconnected(&driver))
+ .WillOnce(FutureSatisfy(&disconnected));
+
Future<MasterInfo> masterInfo;
EXPECT_CALL(sched, reregistered(&driver, _))
.WillOnce(FutureArg<1>(&masterInfo));
process::post(message.get().to, newMasterDetectedMsg);
+ AWAIT_READY(disconnected);
+
AWAIT_READY(masterInfo);
EXPECT_EQ(master.get().port, masterInfo.get().port());
EXPECT_EQ(master.get().ip, masterInfo.get().ip());
[2/2] git commit: Fixed master to send a FrameworkReregistered
message when the framework re-registers with a failed over master.
Posted by vi...@apache.org.
Fixed master to send a FrameworkReregistered message when the
framework re-registers with a failed over master.
Review: https://reviews.apache.org/r/11348
Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/68ccbf1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/68ccbf1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/68ccbf1d
Branch: refs/heads/master
Commit: 68ccbf1d9ab1fee2bef933a27368e5460f9be42e
Parents: a45fd2d
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri May 10 16:21:44 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Thu May 23 16:06:04 2013 -0700
----------------------------------------------------------------------
src/master/master.cpp | 26 +++++++++------
src/master/master.hpp | 2 +-
src/tests/allocator_zookeeper_tests.cpp | 4 +-
src/tests/fault_tolerance_tests.cpp | 44 ++++++++++++++++++++++++++
4 files changed, 63 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/68ccbf1d/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c44f2b7..d5e5804 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -658,7 +658,7 @@ void Master::reregisterFramework(const FrameworkInfo& frameworkInfo,
// N.B. Need to add the framwwork _after_ we add it's tasks
// (above) so that we can properly determine the resources it's
// currently using!
- addFramework(framework);
+ addFramework(framework, true);
}
CHECK(frameworks.count(frameworkInfo.id()) > 0);
@@ -913,7 +913,7 @@ void Master::reregisterSlave(const SlaveID& slaveId,
// partitioned, we don't allow the slave to re-register, as we've
// already informed frameworks that the tasks were lost.
LOG(ERROR) << "Slave " << slaveId << " at " << from
- << "attempted to re-register after deactivation";
+ << " attempted to re-register after deactivation";
reply(ShutdownMessage());
} else {
Slave* slave = getSlave(slaveId);
@@ -1632,7 +1632,7 @@ Resources Master::launchTask(const TaskInfo& task,
}
-void Master::addFramework(Framework* framework)
+void Master::addFramework(Framework* framework, bool reregister)
{
CHECK(frameworks.count(framework->id) == 0);
@@ -1640,14 +1640,20 @@ void Master::addFramework(Framework* framework)
link(framework->pid);
- FrameworkRegisteredMessage message;
- message.mutable_framework_id()->MergeFrom(framework->id);
- message.mutable_master_info()->MergeFrom(info);
- send(framework->pid, message);
+ if (reregister) {
+ FrameworkReregisteredMessage message;
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ message.mutable_master_info()->MergeFrom(info);
+ send(framework->pid, message);
+ } else {
+ FrameworkRegisteredMessage message;
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ message.mutable_master_info()->MergeFrom(info);
+ send(framework->pid, message);
+ }
- allocator->frameworkAdded(framework->id,
- framework->info,
- framework->resources);
+ allocator->frameworkAdded(
+ framework->id, framework->info, framework->resources);
}
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/68ccbf1d/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d3790dc..0a130d9 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -138,7 +138,7 @@ protected:
const Filters& filters);
// Add a framework.
- void addFramework(Framework* framework);
+ void addFramework(Framework* framework, bool reregister = false);
// Replace the scheduler for a framework with a new process ID, in
// the event of a scheduler failover.
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/68ccbf1d/src/tests/allocator_zookeeper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/allocator_zookeeper_tests.cpp b/src/tests/allocator_zookeeper_tests.cpp
index 2c7deb1..39b4627 100644
--- a/src/tests/allocator_zookeeper_tests.cpp
+++ b/src/tests/allocator_zookeeper_tests.cpp
@@ -181,7 +181,7 @@ TYPED_TEST(AllocatorZooKeeperTest, FrameworkReregistersFirst)
.WillOnce(DoAll(InvokeFrameworkAdded(&this->allocator2),
FutureSatisfy(&frameworkAdded)));
- EXPECT_CALL(sched, registered(&driver, _, _));
+ EXPECT_CALL(sched, reregistered(&driver, _));
AWAIT_READY(frameworkAdded);
@@ -318,7 +318,7 @@ TYPED_TEST(AllocatorZooKeeperTest, SlaveReregistersFirst)
.WillOnce(DoAll(InvokeSlaveAdded(&this->allocator2),
FutureSatisfy(&slaveAdded)));
- EXPECT_CALL(sched, registered(&driver, _, _));
+ EXPECT_CALL(sched, reregistered(&driver, _));
AWAIT_READY(slaveAdded);
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/68ccbf1d/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index d41bef7..cc379f0 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -616,6 +616,50 @@ TEST_F(FaultToleranceClusterTest, PartitionedSlaveExitedExecutor)
}
+// This test ensures that a framework connecting with a
+// failed over master gets a re-registered callback.
+TEST_F(FaultToleranceClusterTest, MasterFailover)
+{
+ Try<PID<Master> > master = cluster.masters.start();
+ ASSERT_SOME(master);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+ Future<process::Message> frameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ driver.start();
+
+ AWAIT_READY(frameworkRegisteredMessage);
+
+ // Simulate failed over master by restarting the master.
+ ASSERT_SOME(cluster.masters.stop(master.get()));
+ master = cluster.masters.start();
+ ASSERT_SOME(master);
+
+ Future<Nothing> reregistered;
+ EXPECT_CALL(sched, reregistered(&driver, _))
+ .WillOnce(FutureSatisfy(&reregistered));
+
+ // Simulate a new master detected message to the scheduler.
+ NewMasterDetectedMessage newMasterDetectedMsg;
+ newMasterDetectedMsg.set_pid(master.get());
+
+ process::post(frameworkRegisteredMessage.get().to, newMasterDetectedMsg);
+
+ // Framework should get a re-register callback.
+ AWAIT_READY(reregistered);
+
+ driver.stop();
+ driver.join();
+
+ cluster.shutdown();
+}
+
+
TEST_F(FaultToleranceTest, SchedulerFailover)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);