You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/05/24 01:06:58 UTC

[1/2] git commit: Fixed scheduler driver to call disconnected() when master fails over.

Updated Branches:
  refs/heads/master 9438fd5d3 -> 68ccbf1d9


Fixed scheduler driver to call disconnected() when master fails over.

Review: https://reviews.apache.org/r/11191


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/a45fd2d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/a45fd2d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/a45fd2d3

Branch: refs/heads/master
Commit: a45fd2d3b897887dbdcb1f53a1e66f0a14bd1802
Parents: 9438fd5
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon May 13 15:09:10 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Thu May 23 15:54:57 2013 -0700

----------------------------------------------------------------------
 src/sched/sched.cpp                 |   14 ++++++++++++--
 src/tests/fault_tolerance_tests.cpp |    6 ++++++
 src/tests/master_tests.cpp          |    6 ++++++
 3 files changed, 24 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/a45fd2d3/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index a553294..b3593b1 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -184,6 +184,12 @@ protected:
     master = pid;
     link(master);
 
+    // If master failed over, inform the scheduler about the
+    // disconnection.
+    if (connected) {
+      scheduler->disconnected(driver);
+    }
+
     connected = false;
     doReliableRegistration();
   }
@@ -192,12 +198,16 @@ protected:
   {
     VLOG(1) << "No master detected, waiting for another master";
 
+    // Inform the scheduler about the disconnection if the driver
+    // was previously registered with the master.
+    if (connected) {
+      scheduler->disconnected(driver);
+    }
+
     // In this case, we don't actually invoke Scheduler::error
     // since we might get reconnected to a master imminently.
     connected = false;
     master = UPID();
-
-    scheduler->disconnected(driver);
   }
 
   void registered(const FrameworkID& frameworkId, const MasterInfo& masterInfo)

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/a45fd2d3/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 68cd5fc..d41bef7 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -753,6 +753,10 @@ TEST_F(FaultToleranceTest, FrameworkReregister)
   AWAIT_READY(message); // Framework registered message, to get the pid.
   AWAIT_READY(registered); // Framework registered call.
 
+  Future<Nothing> disconnected;
+  EXPECT_CALL(sched, disconnected(&driver))
+    .WillOnce(FutureSatisfy(&disconnected));
+
   Future<Nothing> reregistered;
   EXPECT_CALL(sched, reregistered(&driver, _))
     .WillOnce(FutureSatisfy(&reregistered));
@@ -767,6 +771,8 @@ TEST_F(FaultToleranceTest, FrameworkReregister)
 
   process::post(message.get().to, newMasterDetectedMsg);
 
+  AWAIT_READY(disconnected);
+
   AWAIT_READY(reregistered);
 
   driver.stop();

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/a45fd2d3/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 1b062d9..fe823f6 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -855,12 +855,18 @@ TEST_F(MasterTest, MasterInfoOnReElection)
   NewMasterDetectedMessage newMasterDetectedMsg;
   newMasterDetectedMsg.set_pid(master.get());
 
+  Future<Nothing> disconnected;
+  EXPECT_CALL(sched, disconnected(&driver))
+    .WillOnce(FutureSatisfy(&disconnected));
+
   Future<MasterInfo> masterInfo;
   EXPECT_CALL(sched, reregistered(&driver, _))
     .WillOnce(FutureArg<1>(&masterInfo));
 
   process::post(message.get().to, newMasterDetectedMsg);
 
+  AWAIT_READY(disconnected);
+
   AWAIT_READY(masterInfo);
   EXPECT_EQ(master.get().port, masterInfo.get().port());
   EXPECT_EQ(master.get().ip, masterInfo.get().ip());


[2/2] git commit: Fixed master to send a FrameworkReregistered message when the framework re-registers with a failed over master.

Posted by vi...@apache.org.
Fixed master to send a FrameworkReregistered message when the
framework re-registers with a failed over master.

Review: https://reviews.apache.org/r/11348


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/68ccbf1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/68ccbf1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/68ccbf1d

Branch: refs/heads/master
Commit: 68ccbf1d9ab1fee2bef933a27368e5460f9be42e
Parents: a45fd2d
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri May 10 16:21:44 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Thu May 23 16:06:04 2013 -0700

----------------------------------------------------------------------
 src/master/master.cpp                   |   26 +++++++++------
 src/master/master.hpp                   |    2 +-
 src/tests/allocator_zookeeper_tests.cpp |    4 +-
 src/tests/fault_tolerance_tests.cpp     |   44 ++++++++++++++++++++++++++
 4 files changed, 63 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/68ccbf1d/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c44f2b7..d5e5804 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -658,7 +658,7 @@ void Master::reregisterFramework(const FrameworkInfo& frameworkInfo,
     // N.B. Need to add the framwwork _after_ we add it's tasks
     // (above) so that we can properly determine the resources it's
     // currently using!
-    addFramework(framework);
+    addFramework(framework, true);
   }
 
   CHECK(frameworks.count(frameworkInfo.id()) > 0);
@@ -913,7 +913,7 @@ void Master::reregisterSlave(const SlaveID& slaveId,
     // partitioned, we don't allow the slave to re-register, as we've
     // already informed frameworks that the tasks were lost.
     LOG(ERROR) << "Slave " << slaveId << " at " << from
-               << "attempted to re-register after deactivation";
+               << " attempted to re-register after deactivation";
     reply(ShutdownMessage());
   } else {
     Slave* slave = getSlave(slaveId);
@@ -1632,7 +1632,7 @@ Resources Master::launchTask(const TaskInfo& task,
 }
 
 
-void Master::addFramework(Framework* framework)
+void Master::addFramework(Framework* framework, bool reregister)
 {
   CHECK(frameworks.count(framework->id) == 0);
 
@@ -1640,14 +1640,20 @@ void Master::addFramework(Framework* framework)
 
   link(framework->pid);
 
-  FrameworkRegisteredMessage message;
-  message.mutable_framework_id()->MergeFrom(framework->id);
-  message.mutable_master_info()->MergeFrom(info);
-  send(framework->pid, message);
+  if (reregister) {
+    FrameworkReregisteredMessage message;
+    message.mutable_framework_id()->MergeFrom(framework->id);
+    message.mutable_master_info()->MergeFrom(info);
+    send(framework->pid, message);
+  } else {
+    FrameworkRegisteredMessage message;
+    message.mutable_framework_id()->MergeFrom(framework->id);
+    message.mutable_master_info()->MergeFrom(info);
+    send(framework->pid, message);
+  }
 
-  allocator->frameworkAdded(framework->id,
-                            framework->info,
-                            framework->resources);
+  allocator->frameworkAdded(
+      framework->id, framework->info, framework->resources);
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/68ccbf1d/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d3790dc..0a130d9 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -138,7 +138,7 @@ protected:
                     const Filters& filters);
 
   // Add a framework.
-  void addFramework(Framework* framework);
+  void addFramework(Framework* framework, bool reregister = false);
 
   // Replace the scheduler for a framework with a new process ID, in
   // the event of a scheduler failover.

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/68ccbf1d/src/tests/allocator_zookeeper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/allocator_zookeeper_tests.cpp b/src/tests/allocator_zookeeper_tests.cpp
index 2c7deb1..39b4627 100644
--- a/src/tests/allocator_zookeeper_tests.cpp
+++ b/src/tests/allocator_zookeeper_tests.cpp
@@ -181,7 +181,7 @@ TYPED_TEST(AllocatorZooKeeperTest, FrameworkReregistersFirst)
     .WillOnce(DoAll(InvokeFrameworkAdded(&this->allocator2),
                     FutureSatisfy(&frameworkAdded)));
 
-  EXPECT_CALL(sched, registered(&driver, _, _));
+  EXPECT_CALL(sched, reregistered(&driver, _));
 
   AWAIT_READY(frameworkAdded);
 
@@ -318,7 +318,7 @@ TYPED_TEST(AllocatorZooKeeperTest, SlaveReregistersFirst)
     .WillOnce(DoAll(InvokeSlaveAdded(&this->allocator2),
                     FutureSatisfy(&slaveAdded)));
 
-  EXPECT_CALL(sched, registered(&driver, _, _));
+  EXPECT_CALL(sched, reregistered(&driver, _));
 
   AWAIT_READY(slaveAdded);
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/68ccbf1d/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index d41bef7..cc379f0 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -616,6 +616,50 @@ TEST_F(FaultToleranceClusterTest, PartitionedSlaveExitedExecutor)
 }
 
 
+// This test ensures that a framework connecting with a
+// failed over master gets a re-registered callback.
+TEST_F(FaultToleranceClusterTest, MasterFailover)
+{
+  Try<PID<Master> > master = cluster.masters.start();
+  ASSERT_SOME(master);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+  Future<process::Message> frameworkRegisteredMessage =
+    FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  driver.start();
+
+  AWAIT_READY(frameworkRegisteredMessage);
+
+  // Simulate failed over master by restarting the master.
+  ASSERT_SOME(cluster.masters.stop(master.get()));
+  master = cluster.masters.start();
+  ASSERT_SOME(master);
+
+  Future<Nothing> reregistered;
+  EXPECT_CALL(sched, reregistered(&driver, _))
+    .WillOnce(FutureSatisfy(&reregistered));
+
+  // Simulate a new master detected message to the scheduler.
+  NewMasterDetectedMessage newMasterDetectedMsg;
+  newMasterDetectedMsg.set_pid(master.get());
+
+  process::post(frameworkRegisteredMessage.get().to, newMasterDetectedMsg);
+
+  // Framework should get a re-register callback.
+  AWAIT_READY(reregistered);
+
+  driver.stop();
+  driver.join();
+
+  cluster.shutdown();
+}
+
+
 TEST_F(FaultToleranceTest, SchedulerFailover)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);