You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/07/17 19:44:35 UTC

[1/9] mesos git commit: Added FUTURE_CALL, DROP_CALL, DROP_CALLS and EXPECT_NO_FUTURE_CALLS.

Repository: mesos
Updated Branches:
  refs/heads/master 6b1b66df7 -> 51c2b523d


Added FUTURE_CALL, DROP_CALL, DROP_CALLS and EXPECT_NO_FUTURE_CALLS.

Review: https://reviews.apache.org/r/36462


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/415f7b15
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/415f7b15
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/415f7b15

Branch: refs/heads/master
Commit: 415f7b15028a1d367836599fac7df73690d5c499
Parents: 5329851
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Jul 10 15:50:38 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:43:58 2015 -0700

----------------------------------------------------------------------
 src/tests/mesos.hpp | 66 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 66 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/415f7b15/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 9157ac0..23d9841 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1324,6 +1324,39 @@ ACTION_P(SendStatusUpdateFromTaskID, state)
 #define EXPECT_NO_FUTURE_PROTOBUFS(message, from, to)              \
   ExpectNoFutureProtobufs(message, from, to)
 
+
+// These are specialized versions of {FUTURE,DROP}_PROTOBUF that
+// capture a scheduler/executor Call protobuf of the given 'type'.
+// Note that we name methods as '*ProtobufUnion()' because these could
+// be reused for macros that capture any protobufs that are described
+// using the standard protocol buffer "union" trick (e.g.,
+// FUTURE_EVENT to capture scheduler::Event), see
+// https://developers.google.com/protocol-buffers/docs/techniques#union.
+
+#define FUTURE_CALL(message, unionType, from, to)              \
+  FutureUnionProtobuf(message, unionType, from, to)
+
+
+#define DROP_CALL(message, unionType, from, to)                \
+  FutureUnionProtobuf(message, unionType, from, to, true)
+
+
+#define DROP_CALLS(message, unionType, from, to)               \
+  DropUnionProtobufs(message, unionType, from, to)
+
+
+#define EXPECT_NO_FUTURE_CALLS(message, unionType, from, to)   \
+  ExpectNoFutureUnionProtobufs(message, unionType, from, to)
+
+
+#define FUTURE_CALL_MESSAGE(message, unionType, from, to)          \
+  process::FutureUnionMessage(message, unionType, from, to)
+
+
+#define DROP_CALL_MESSAGE(message, unionType, from, to)            \
+  process::FutureUnionMessage(message, unionType, from, to, true)
+
+
 // Forward declaration.
 template <typename T>
 T _FutureProtobuf(const process::Message& message);
@@ -1340,6 +1373,18 @@ process::Future<T> FutureProtobuf(T t, From from, To to, bool drop = false)
 }
 
 
+template <typename Message, typename UnionType, typename From, typename To>
+process::Future<Message> FutureUnionProtobuf(
+    Message message, UnionType unionType, From from, To to, bool drop = false)
+{
+  // Help debugging by adding some "type constraints".
+  { google::protobuf::Message* m = &message; (void) m; }
+
+  return process::FutureUnionMessage(message, unionType, from, to, drop)
+    .then(lambda::bind(&_FutureProtobuf<Message>, lambda::_1));
+}
+
+
 template <typename T>
 T _FutureProtobuf(const process::Message& message)
 {
@@ -1359,6 +1404,16 @@ void DropProtobufs(T t, From from, To to)
 }
 
 
+template <typename Message, typename UnionType, typename From, typename To>
+void DropUnionProtobufs(Message message, UnionType unionType, From from, To to)
+{
+  // Help debugging by adding some "type constraints".
+  { google::protobuf::Message* m = &message; (void) m; }
+
+  process::DropUnionMessages(message, unionType, from, to);
+}
+
+
 template <typename T, typename From, typename To>
 void ExpectNoFutureProtobufs(T t, From from, To to)
 {
@@ -1369,6 +1424,17 @@ void ExpectNoFutureProtobufs(T t, From from, To to)
 }
 
 
+template <typename Message, typename UnionType, typename From, typename To>
+void ExpectNoFutureUnionProtobufs(
+    Message message, UnionType unionType, From from, To to)
+{
+  // Help debugging by adding some "type constraints".
+  { google::protobuf::Message* m = &message; (void) m; }
+
+  process::ExpectNoFutureUnionMessages(message, unionType, from, to);
+}
+
+
 // This matcher is used to match the task ids of TaskStatus messages.
 // Suppose we set up N futures for LaunchTasks and N futures for StatusUpdates.
 // (This is a common pattern). We get into a situation where all StatusUpdates


[8/9] mesos git commit: Updated scheduler driver to send MESSAGE call.

Posted by vi...@apache.org.
Updated scheduler driver to send MESSAGE call.

Review: https://reviews.apache.org/r/36469


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cf485e29
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cf485e29
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cf485e29

Branch: refs/heads/master
Commit: cf485e29413a4d24567f82f9a47c30e59bd2c522
Parents: 5717eab
Author: Vinod Kone <vi...@gmail.com>
Authored: Mon Jul 13 13:03:13 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:44:00 2015 -0700

----------------------------------------------------------------------
 src/sched/sched.cpp                 | 20 +++++--
 src/tests/fault_tolerance_tests.cpp | 95 +++++++++++++++++++++++++++++++-
 src/tests/slave_recovery_tests.cpp  |  4 +-
 3 files changed, 110 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cf485e29/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 613e40b..c563c44 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -1259,6 +1259,8 @@ protected:
       UPID slave = savedSlavePids[slaveId];
       CHECK(slave != UPID());
 
+      // TODO(vinod): Send a Call directly to the slave once that
+      // support is added.
       FrameworkToExecutorMessage message;
       message.mutable_slave_id()->MergeFrom(slaveId);
       message.mutable_framework_id()->MergeFrom(framework.id());
@@ -1269,13 +1271,19 @@ protected:
       VLOG(1) << "Cannot send directly to slave " << slaveId
               << "; sending through master";
 
-      FrameworkToExecutorMessage message;
-      message.mutable_slave_id()->MergeFrom(slaveId);
-      message.mutable_framework_id()->MergeFrom(framework.id());
-      message.mutable_executor_id()->MergeFrom(executorId);
-      message.set_data(data);
+      Call call;
+
+      CHECK(framework.has_id());
+      call.mutable_framework_id()->CopyFrom(framework.id());
+      call.set_type(Call::MESSAGE);
+
+      Call::Message* message = call.mutable_message();
+      message->mutable_slave_id()->CopyFrom(slaveId);
+      message->mutable_executor_id()->CopyFrom(executorId);
+      message->set_data(data);
+
       CHECK_SOME(master);
-      send(master.get(), message);
+      send(master.get(), call);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/cf485e29/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index f64c797..60ca523 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -18,6 +18,7 @@
 
 #include <gmock/gmock.h>
 
+#include <string>
 #include <vector>
 
 #include <mesos/executor.hpp>
@@ -70,6 +71,7 @@ using process::UPID;
 using process::http::OK;
 using process::http::Response;
 
+using std::string;
 using std::vector;
 
 using testing::_;
@@ -1156,7 +1158,7 @@ TEST_F(FaultToleranceTest, ForwardStatusUpdateUnknownExecutor)
 }
 
 
-TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkMessage)
+TEST_F(FaultToleranceTest, SchedulerFailoverExecutorToFrameworkMessage)
 {
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
@@ -1260,6 +1262,97 @@ TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkMessage)
 }
 
 
+TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkToExecutorMessage)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave> > slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(
+      &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched1, registered(&driver1, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver1.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+
+  driver1.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  MockScheduler sched2;
+
+  FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line.
+  framework2 = DEFAULT_FRAMEWORK_INFO;
+  framework2.mutable_id()->MergeFrom(frameworkId);
+
+  MesosSchedulerDriver driver2(
+      &sched2, framework2, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched2, registered(&driver2, frameworkId, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  Future<Nothing> error;
+  EXPECT_CALL(sched1, error(&driver1, "Framework failed over"))
+    .WillOnce(FutureSatisfy(&error));
+
+  driver2.start();
+
+  AWAIT_READY(error);
+
+  AWAIT_READY(registered);
+
+  Future<string> frameworkMessage;
+  EXPECT_CALL(exec, frameworkMessage(_, _))
+    .WillOnce(FutureArg<1>(&frameworkMessage));
+
+  driver2.sendFrameworkMessage(
+      DEFAULT_EXECUTOR_ID, offers.get()[0].slave_id(), "hello world");
+
+  AWAIT_EQ("hello world", frameworkMessage);
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver1.stop();
+  driver2.stop();
+
+  driver1.join();
+  driver2.join();
+
+  Shutdown();
+}
+
+
 // This test verifies that a partitioned framework that still
 // thinks it is registered with the master cannot kill a task because
 // the master has re-registered another instance of the framework.

http://git-wip-us.apache.org/repos/asf/mesos/blob/cf485e29/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 7708cf6..ff7aaf9 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -310,13 +310,13 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
         .tasks[task.task_id()]
         .updates.front().uuid());
 
+  const UUID& uuid = UUID::fromBytes(ack.get().acknowledge().uuid());
   ASSERT_TRUE(state
                 .frameworks[frameworkId]
                 .executors[executorId]
                 .runs[containerId.get()]
                 .tasks[task.task_id()]
-                .acks.contains(
-                    UUID::fromBytes(ack.get().acknowledge().uuid())));
+                .acks.contains(uuid));
 
   // Shut down the executor manually so that it doesn't hang around
   // after the test finishes.


[6/9] mesos git commit: Updated scheduler driver to send ACKNOWLEDGE call.

Posted by vi...@apache.org.
Updated scheduler driver to send ACKNOWLEDGE call.

Review: https://reviews.apache.org/r/36467


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5717eabf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5717eabf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5717eabf

Branch: refs/heads/master
Commit: 5717eabf93dc47430492858d4f781f5ac7bb5373
Parents: e3bbdf8
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Jul 10 15:48:48 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:44:00 2015 -0700

----------------------------------------------------------------------
 src/sched/sched.cpp                       | 41 +++++++++++++++++---------
 src/tests/fault_tolerance_tests.cpp       |  6 ++--
 src/tests/reconciliation_tests.cpp        |  5 +++-
 src/tests/scheduler_event_call_tests.cpp  |  8 +++--
 src/tests/scheduler_tests.cpp             | 37 ++++++++++++++++-------
 src/tests/slave_recovery_tests.cpp        |  9 ++++--
 src/tests/slave_tests.cpp                 |  2 ++
 src/tests/status_update_manager_tests.cpp |  5 +++-
 8 files changed, 78 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 2dfa5c5..613e40b 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -883,12 +883,19 @@ protected:
         VLOG(2) << "Sending ACK for status update " << update
             << " to " << master.get();
 
-        StatusUpdateAcknowledgementMessage message;
-        message.mutable_framework_id()->MergeFrom(framework.id());
-        message.mutable_slave_id()->MergeFrom(update.slave_id());
-        message.mutable_task_id()->MergeFrom(update.status().task_id());
-        message.set_uuid(update.uuid());
-        send(master.get(), message);
+        Call call;
+
+        CHECK(framework.has_id());
+        call.mutable_framework_id()->CopyFrom(framework.id());
+        call.set_type(Call::ACKNOWLEDGE);
+
+        Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+        acknowledge->mutable_slave_id()->CopyFrom(update.slave_id());
+        acknowledge->mutable_task_id()->CopyFrom(update.status().task_id());
+        acknowledge->set_uuid(update.uuid());
+
+        CHECK_SOME(master);
+        send(master.get(), call);
       }
     }
   }
@@ -1191,8 +1198,6 @@ protected:
       return;
     }
 
-    CHECK_SOME(master);
-
     // NOTE: By ignoring the volatile 'running' here, we ensure that
     // all acknowledgements requested before the driver was stopped
     // or aborted are processed. Any acknowledgement that is requested
@@ -1204,17 +1209,25 @@ protected:
     // ensures that master-generated and driver-generated updates
     // will not have a 'uuid' set.
     if (status.has_uuid() && status.has_slave_id()) {
+      CHECK_SOME(master);
+
       VLOG(2) << "Sending ACK for status update " << status.uuid()
               << " of task " << status.task_id()
               << " on slave " << status.slave_id()
               << " to " << master.get();
 
-      StatusUpdateAcknowledgementMessage message;
-      message.mutable_framework_id()->CopyFrom(framework.id());
-      message.mutable_slave_id()->CopyFrom(status.slave_id());
-      message.mutable_task_id()->CopyFrom(status.task_id());
-      message.set_uuid(status.uuid());
-      send(master.get(), message);
+      Call call;
+
+      CHECK(framework.has_id());
+      call.mutable_framework_id()->CopyFrom(framework.id());
+      call.set_type(Call::ACKNOWLEDGE);
+
+      Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+      acknowledge->mutable_slave_id()->CopyFrom(status.slave_id());
+      acknowledge->mutable_task_id()->CopyFrom(status.task_id());
+      acknowledge->set_uuid(status.uuid());
+
+      send(master.get(), call);
     } else {
       VLOG(2) << "Received ACK for status update"
               << (status.has_uuid() ? " " + status.uuid() : "")

http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index f218dc6..f64c797 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -1296,8 +1296,8 @@ TEST_F(FaultToleranceTest, IgnoreKillTaskFromUnregisteredFramework)
   EXPECT_CALL(sched1, statusUpdate(&driver1, _))
     .WillOnce(FutureArg<1>(&status));
 
-  Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage
-    = FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+  Future<mesos::scheduler::Call> acknowledgeCall = FUTURE_CALL(
+      mesos::scheduler::Call(), mesos::scheduler::Call::ACKNOWLEDGE, _, _);
 
   ExecutorDriver* execDriver;
   EXPECT_CALL(exec, registered(_, _, _, _))
@@ -1314,7 +1314,7 @@ TEST_F(FaultToleranceTest, IgnoreKillTaskFromUnregisteredFramework)
   // Wait for the status update acknowledgement to be sent. This
   // ensures the slave doesn't resend the TASK_RUNNING update to the
   // failed over scheduler (below).
-  AWAIT_READY(statusUpdateAcknowledgementMessage);
+  AWAIT_READY(acknowledgeCall);
 
   // Now start the second failed over scheduler.
   MockScheduler sched2;

http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index aef245a..6940b6a 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -745,7 +745,10 @@ TEST_F(ReconciliationTest, UnacknowledgedTerminalTask)
 
   // Drop the status update acknowledgements to ensure that the
   // task remains terminal and unacknowledged in the master.
-  DROP_PROTOBUFS(StatusUpdateAcknowledgementMessage(), _, _);
+  DROP_CALLS(mesos::scheduler::Call(),
+             mesos::scheduler::Call::ACKNOWLEDGE,
+             _,
+             master.get());
 
   driver.start();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/scheduler_event_call_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_event_call_tests.cpp b/src/tests/scheduler_event_call_tests.cpp
index cf6aa19..fe15f05 100644
--- a/src/tests/scheduler_event_call_tests.cpp
+++ b/src/tests/scheduler_event_call_tests.cpp
@@ -20,6 +20,8 @@
 
 #include <mesos/scheduler.hpp>
 
+#include <mesos/scheduler/scheduler.hpp>
+
 #include <process/future.hpp>
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
@@ -152,13 +154,13 @@ TEST_F(SchedulerDriverEventTest, Update)
   // Generate an update that requires acknowledgement.
   event.mutable_update()->mutable_status()->set_uuid(UUID::random().toBytes());
 
-  Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgement =
-    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+  Future<mesos::scheduler::Call> acknowledgement = DROP_CALL(
+      mesos::scheduler::Call(), mesos::scheduler::Call::ACKNOWLEDGE, _, _);
 
   process::post(master.get(), frameworkPid, event);
 
   AWAIT_READY(statusUpdate2);
-  AWAIT_READY(statusUpdateAcknowledgement);
+  AWAIT_READY(acknowledgement);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 2ce280a..13fecb2 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -27,6 +27,8 @@
 #include <mesos/scheduler.hpp>
 #include <mesos/type_utils.hpp>
 
+#include <mesos/scheduler/scheduler.hpp>
+
 #include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
@@ -1087,8 +1089,11 @@ TEST_F(MesosSchedulerDriverTest, DropAckIfStopCalledBeforeAbort)
 
   // Ensure no status update acknowledgements are sent from the driver
   // to the master.
-  EXPECT_NO_FUTURE_PROTOBUFS(
-      StatusUpdateAcknowledgementMessage(), _ , master.get());
+  EXPECT_NO_FUTURE_CALLS(
+      mesos::scheduler::Call(),
+      mesos::scheduler::Call::ACKNOWLEDGE,
+      _ ,
+      master.get());
 
   EXPECT_CALL(exec, registered(_, _, _, _));
 
@@ -1144,8 +1149,11 @@ TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgements)
 
   // Ensure no status update acknowledgements are sent from the driver
   // to the master until the explicit acknowledgement is sent.
-  EXPECT_NO_FUTURE_PROTOBUFS(
-      StatusUpdateAcknowledgementMessage(), _ , master.get());
+  EXPECT_NO_FUTURE_CALLS(
+      mesos::scheduler::Call(),
+      mesos::scheduler::Call::ACKNOWLEDGE,
+      _ ,
+      master.get());
 
   EXPECT_CALL(exec, registered(_, _, _, _));
 
@@ -1166,8 +1174,11 @@ TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgements)
   Clock::settle();
 
   // Now send the acknowledgement.
-  Future<StatusUpdateAcknowledgementMessage> acknowledgement =
-    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _ , master.get());
+  Future<mesos::scheduler::Call> acknowledgement = FUTURE_CALL(
+      mesos::scheduler::Call(),
+      mesos::scheduler::Call::ACKNOWLEDGE,
+      _,
+      master.get());
 
   driver.acknowledgeStatusUpdate(status.get());
 
@@ -1204,8 +1215,11 @@ TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsMasterGeneratedUpdate)
     .WillRepeatedly(Return()); // Ignore subsequent offers.
 
   // Ensure no status update acknowledgements are sent to the master.
-  EXPECT_NO_FUTURE_PROTOBUFS(
-      StatusUpdateAcknowledgementMessage(), _ , master.get());
+  EXPECT_NO_FUTURE_CALLS(
+      mesos::scheduler::Call(),
+      mesos::scheduler::Call::ACKNOWLEDGE,
+      _ ,
+      master.get());
 
   driver.start();
 
@@ -1266,8 +1280,11 @@ TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsUnsetSlaveID)
     .WillOnce(FutureSatisfy(&registered));
 
   // Ensure no status update acknowledgements are sent to the master.
-  EXPECT_NO_FUTURE_PROTOBUFS(
-      StatusUpdateAcknowledgementMessage(), _ , master.get());
+  EXPECT_NO_FUTURE_CALLS(
+      mesos::scheduler::Call(),
+      mesos::scheduler::Call::ACKNOWLEDGE,
+      _ ,
+      master.get());
 
   driver.start();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 2f882cf..7708cf6 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -27,6 +27,8 @@
 #include <mesos/resources.hpp>
 #include <mesos/scheduler.hpp>
 
+#include <mesos/scheduler/scheduler.hpp>
+
 #include <process/dispatch.hpp>
 #include <process/gmock.hpp>
 #include <process/owned.hpp>
@@ -211,8 +213,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
   Future<StatusUpdateMessage> update =
     FUTURE_PROTOBUF(StatusUpdateMessage(), Eq(master.get()), _);
 
-  Future<StatusUpdateAcknowledgementMessage> ack =
-    FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+  Future<mesos::scheduler::Call> ack = FUTURE_CALL(
+      mesos::scheduler::Call(), mesos::scheduler::Call::ACKNOWLEDGE, _, _);
 
   Future<Nothing> _ack =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
@@ -313,7 +315,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
                 .executors[executorId]
                 .runs[containerId.get()]
                 .tasks[task.task_id()]
-                .acks.contains(UUID::fromBytes(ack.get().uuid())));
+                .acks.contains(
+                    UUID::fromBytes(ack.get().acknowledge().uuid())));
 
   // Shut down the executor manually so that it doesn't hang around
   // after the test finishes.

http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 89cc7f6..a16e4f4 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -29,6 +29,8 @@
 #include <mesos/executor.hpp>
 #include <mesos/scheduler.hpp>
 
+#include <mesos/scheduler/scheduler.hpp>
+
 #include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>

http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 440b074..0224e50 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -420,7 +420,10 @@ TEST_F(StatusUpdateManagerTest, IgnoreUnexpectedStatusUpdateAck)
 
   // Drop the ACKs, so that status update manager
   // retries the update.
-  DROP_PROTOBUFS(StatusUpdateAcknowledgementMessage(), _, _);
+  DROP_CALLS(mesos::scheduler::Call(),
+             mesos::scheduler::Call::ACKNOWLEDGE,
+             _,
+             master.get());
 
   driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
 


[3/9] mesos git commit: Updated scheduler driver to send ACCEPT call.

Posted by vi...@apache.org.
Updated scheduler driver to send ACCEPT call.

Review: https://reviews.apache.org/r/36464


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0981c8d0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0981c8d0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0981c8d0

Branch: refs/heads/master
Commit: 0981c8d06f76f8856dc9e6365a476620d376f9eb
Parents: be0659b
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Jul 10 12:43:40 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:43:59 2015 -0700

----------------------------------------------------------------------
 src/sched/sched.cpp        | 60 ++++-------------------------------------
 src/tests/master_tests.cpp |  6 ++---
 2 files changed, 8 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0981c8d0/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 2fe1836..9da0782 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -1060,65 +1060,15 @@ protected:
                    const vector<TaskInfo>& tasks,
                    const Filters& filters)
   {
-    if (!connected) {
-      VLOG(1) << "Ignoring launch tasks message as master is disconnected";
-      // NOTE: Reply to the framework with TASK_LOST messages for each
-      // task. This is a hack for now, to not let the scheduler
-      // believe the tasks are launched, when actually the master
-      // never received the launchTasks message. Also, realize that
-      // this hack doesn't capture the case when the scheduler process
-      // sends it but the master never receives it (message lost,
-      // master failover etc). The correct way for schedulers to deal
-      // with this situation is to use 'reconcileTasks()'.
-      foreach (const TaskInfo& task, tasks) {
-        StatusUpdate update = protobuf::createStatusUpdate(
-            framework.id(),
-            None(),
-            task.task_id(),
-            TASK_LOST,
-            TaskStatus::SOURCE_MASTER,
-            None(),
-            "Master disconnected",
-            TaskStatus::REASON_MASTER_DISCONNECTED);
-
-        statusUpdate(UPID(), update, UPID());
-      }
-      return;
-    }
-
-    LaunchTasksMessage message;
-    message.mutable_framework_id()->MergeFrom(framework.id());
-    message.mutable_filters()->MergeFrom(filters);
-
-    foreach (const OfferID& offerId, offerIds) {
-      message.add_offer_ids()->MergeFrom(offerId);
-
-      foreach (const TaskInfo& task, tasks) {
-        // Keep only the slave PIDs where we run tasks so we can send
-        // framework messages directly.
-        if (savedOffers.contains(offerId)) {
-          if (savedOffers[offerId].count(task.slave_id()) > 0) {
-            savedSlavePids[task.slave_id()] =
-              savedOffers[offerId][task.slave_id()];
-          } else {
-            LOG(WARNING) << "Attempting to launch task " << task.task_id()
-                         << " with the wrong slave id " << task.slave_id();
-          }
-        } else {
-          LOG(WARNING) << "Attempting to launch task " << task.task_id()
-                       << " with an unknown offer " << offerId;
-        }
-      }
-      // Remove the offer since we saved all the PIDs we might use.
-      savedOffers.erase(offerId);
-    }
+    Offer::Operation operation;
+    operation.set_type(Offer::Operation::LAUNCH);
 
+    Offer::Operation::Launch* launch = operation.mutable_launch();
     foreach (const TaskInfo& task, tasks) {
-      message.add_tasks()->MergeFrom(task);
+      launch->add_task_infos()->CopyFrom(task);
     }
 
-    CHECK_SOME(master);
-    send(master.get(), message);
+    acceptOffers(offerIds, {operation}, filters);
   }
 
   void acceptOffers(

http://git-wip-us.apache.org/repos/asf/mesos/blob/0981c8d0/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 13babee..fdee267 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -2562,8 +2562,8 @@ TEST_F(MasterTest, OfferNotRescindedOnceDeclined)
   EXPECT_CALL(sched, resourceOffers(_, _))
     .WillRepeatedly(DeclineOffers()); // Decline all offers.
 
-  Future<LaunchTasksMessage> launchTasksMessage =
-    FUTURE_PROTOBUF(LaunchTasksMessage(), _, _);
+  Future<mesos::scheduler::Call> acceptCall = FUTURE_CALL(
+      mesos::scheduler::Call(), mesos::scheduler::Call::ACCEPT, _, _);
 
   EXPECT_CALL(sched, offerRescinded(&driver, _))
     .Times(0);
@@ -2572,7 +2572,7 @@ TEST_F(MasterTest, OfferNotRescindedOnceDeclined)
   AWAIT_READY(registered);
 
   // Wait for the framework to decline the offers.
-  AWAIT_READY(launchTasksMessage);
+  AWAIT_READY(acceptCall);
 
   // Now advance to the offer timeout, we need to settle the clock to
   // ensure that the offer rescind timeout would be processed


[4/9] mesos git commit: Updated scheduler driver to send Kill Call.

Posted by vi...@apache.org.
Updated scheduler driver to send Kill Call.

Review: https://reviews.apache.org/r/36463


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/be0659b8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/be0659b8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/be0659b8

Branch: refs/heads/master
Commit: be0659b85da24b8e0d312ef7cc784288e1962a62
Parents: 415f7b1
Author: Vinod Kone <vi...@gmail.com>
Authored: Thu Jul 9 18:06:19 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:43:59 2015 -0700

----------------------------------------------------------------------
 src/sched/sched.cpp                 | 14 ++++++++++----
 src/tests/fault_tolerance_tests.cpp |  8 +++++---
 src/tests/master_tests.cpp          |  9 +++++----
 3 files changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/be0659b8/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index de76803..2fe1836 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -1027,11 +1027,17 @@ protected:
       return;
     }
 
-    KillTaskMessage message;
-    message.mutable_framework_id()->MergeFrom(framework.id());
-    message.mutable_task_id()->MergeFrom(taskId);
+    Call call;
+
+    CHECK(framework.has_id());
+    call.mutable_framework_id()->CopyFrom(framework.id());
+    call.set_type(Call::KILL);
+
+    Call::Kill* kill = call.mutable_kill();
+    kill->mutable_task_id()->CopyFrom(taskId);
+
     CHECK_SOME(master);
-    send(master.get(), message);
+    send(master.get(), call);
   }
 
   void requestResources(const vector<Request>& requests)

http://git-wip-us.apache.org/repos/asf/mesos/blob/be0659b8/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 1070ccf..f218dc6 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -28,6 +28,8 @@
 
 #include <mesos/master/allocator.hpp>
 
+#include <mesos/scheduler/scheduler.hpp>
+
 #include <process/future.hpp>
 #include <process/gmock.hpp>
 #include <process/http.hpp>
@@ -1354,12 +1356,12 @@ TEST_F(FaultToleranceTest, IgnoreKillTaskFromUnregisteredFramework)
   EXPECT_CALL(sched2, statusUpdate(&driver2, _))
     .WillOnce(FutureArg<1>(&status2));
 
-  Future<KillTaskMessage> killTaskMessage =
-    FUTURE_PROTOBUF(KillTaskMessage(), _, _);
+  Future<mesos::scheduler::Call> killCall = FUTURE_CALL(
+      mesos::scheduler::Call(), mesos::scheduler::Call::KILL, _, _);
 
   driver1.killTask(status.get().task_id());
 
-  AWAIT_READY(killTaskMessage);
+  AWAIT_READY(killCall);
 
   // By this point the master must have processed and ignored the
   // 'killTask' message from the first framework. To verify this,

http://git-wip-us.apache.org/repos/asf/mesos/blob/be0659b8/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 767c86c..13babee 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -29,6 +29,8 @@
 
 #include <mesos/master/allocator.hpp>
 
+#include <mesos/scheduler/scheduler.hpp>
+
 #include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
@@ -540,9 +542,8 @@ TEST_F(MasterTest, KillUnknownTaskSlaveInTransition)
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .Times(0);
 
-  // Set expectation that Master receives killTask message.
-  Future<KillTaskMessage> killTaskMessage =
-    FUTURE_PROTOBUF(KillTaskMessage(), _, master.get());
+  Future<mesos::scheduler::Call> killCall = FUTURE_CALL(
+      mesos::scheduler::Call(), mesos::scheduler::Call::KILL, _, _);
 
   // Attempt to kill unknown task while slave is transitioning.
   TaskID unknownTaskId;
@@ -554,7 +555,7 @@ TEST_F(MasterTest, KillUnknownTaskSlaveInTransition)
 
   driver.killTask(unknownTaskId);
 
-  AWAIT_READY(killTaskMessage);
+  AWAIT_READY(killCall);
 
   // Wait for all messages to be dispatched and processed completely to satisfy
   // the expectation that we didn't receive a status update.


[5/9] mesos git commit: Updated scheduler driver to send REVIVE call.

Posted by vi...@apache.org.
Updated scheduler driver to send REVIVE call.

Review: https://reviews.apache.org/r/36465


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/90ed30d4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/90ed30d4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/90ed30d4

Branch: refs/heads/master
Commit: 90ed30d4d9f99ad9a469e6aaf9fd0fc2ea781772
Parents: 0981c8d
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Jul 10 12:59:51 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:43:59 2015 -0700

----------------------------------------------------------------------
 src/sched/sched.cpp | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/90ed30d4/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 9da0782..4099bce 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -1167,10 +1167,14 @@ protected:
       return;
     }
 
-    ReviveOffersMessage message;
-    message.mutable_framework_id()->MergeFrom(framework.id());
+    Call call;
+
+    CHECK(framework.has_id());
+    call.mutable_framework_id()->CopyFrom(framework.id());
+    call.set_type(Call::REVIVE);
+
     CHECK_SOME(master);
-    send(master.get(), message);
+    send(master.get(), call);
   }
 
   void acknowledgeStatusUpdate(


[2/9] mesos git commit: Added FutureUnionMessage, DropUnionMessages and ExpectNoFutureUnionMessages.

Posted by vi...@apache.org.
Added FutureUnionMessage, DropUnionMessages and
ExpectNoFutureUnionMessages.

Review: https://reviews.apache.org/r/36461


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/53298516
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/53298516
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/53298516

Branch: refs/heads/master
Commit: 53298516d88fa2060990de5254f4c5ad3ca1ada0
Parents: 6b1b66d
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Jul 10 16:10:25 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:43:58 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/gmock.hpp | 64 ++++++++++++++++++++++
 1 file changed, 64 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/53298516/3rdparty/libprocess/include/process/gmock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/gmock.hpp b/3rdparty/libprocess/include/process/gmock.hpp
index e878161..d449952 100644
--- a/3rdparty/libprocess/include/process/gmock.hpp
+++ b/3rdparty/libprocess/include/process/gmock.hpp
@@ -330,6 +330,23 @@ MATCHER_P3(MessageMatcher, name, from, to, "")
 }
 
 
+// This matches protobuf messages that are described using the
+// standard protocol buffer "union" trick, see:
+// https://developers.google.com/protocol-buffers/docs/techniques#union.
+MATCHER_P4(UnionMessageMatcher, message, unionType, from, to, "")
+{
+  const process::MessageEvent& event = ::std::tr1::get<0>(arg);
+  message_type message;
+
+  return (testing::Matcher<std::string>(message.GetTypeName()).Matches(
+              event.message->name) &&
+          message.ParseFromString(event.message->body) &&
+          testing::Matcher<unionType_type>(unionType).Matches(message.type()) &&
+          testing::Matcher<process::UPID>(from).Matches(event.message->from) &&
+          testing::Matcher<process::UPID>(to).Matches(event.message->to));
+}
+
+
 MATCHER_P2(DispatchMatcher, pid, method, "")
 {
   const DispatchEvent& event = ::std::tr1::get<0>(arg);
@@ -358,6 +375,28 @@ Future<Message> FutureMessage(Name name, From from, To to, bool drop = false)
 }
 
 
+template <typename Message, typename UnionType, typename From, typename To>
+Future<process::Message> FutureUnionMessage(
+    Message message, UnionType unionType, From from, To to, bool drop = false)
+{
+  TestsFilter* filter =
+    FilterTestEventListener::instance()->install();
+
+  Future<process::Message> future;
+  synchronized (filter->mutex) {
+    EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
+      .With(UnionMessageMatcher(message, unionType, from, to))
+      .WillOnce(testing::DoAll(FutureArgField<0>(
+                                   &MessageEvent::message,
+                                   &future),
+                               testing::Return(drop)))
+      .RetiresOnSaturation(); // Don't impose any subsequent expectations.
+  }
+
+  return future;
+}
+
+
 template <typename PID, typename Method>
 Future<Nothing> FutureDispatch(PID pid, Method method, bool drop = false)
 {
@@ -387,6 +426,18 @@ void DropMessages(Name name, From from, To to)
 }
 
 
+template <typename Message, typename UnionType, typename From, typename To>
+void DropUnionMessages(Message message, UnionType unionType, From from, To to)
+{
+  TestsFilter* filter = FilterTestEventListener::instance()->install();
+  synchronized (filter->mutex) {
+    EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
+      .With(UnionMessageMatcher(message, unionType, from, to))
+      .WillRepeatedly(testing::Return(true));
+  }
+}
+
+
 template <typename Name, typename From, typename To>
 void ExpectNoFutureMessages(Name name, From from, To to)
 {
@@ -399,6 +450,19 @@ void ExpectNoFutureMessages(Name name, From from, To to)
 }
 
 
+template <typename Message, typename UnionType, typename From, typename To>
+void ExpectNoFutureUnionMessages(
+    Message message, UnionType unionType, From from, To to)
+{
+  TestsFilter* filter = FilterTestEventListener::instance()->install();
+  synchronized (filter->mutex) {
+    EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
+      .With(UnionMessageMatcher(message, unionType, from, to))
+      .Times(0);
+  }
+}
+
+
 template <typename PID, typename Method>
 void DropDispatches(PID pid, Method method)
 {


[9/9] mesos git commit: Updated scheduler driver to send RECONCILE call.

Posted by vi...@apache.org.
Updated scheduler driver to send RECONCILE call.

Review: https://reviews.apache.org/r/36466


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e3bbdf88
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e3bbdf88
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e3bbdf88

Branch: refs/heads/master
Commit: e3bbdf882209178ee9a945fb851728b424d51e25
Parents: 90ed30d
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Jul 10 15:36:13 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:44:00 2015 -0700

----------------------------------------------------------------------
 src/sched/sched.cpp                | 17 +++++++++++++----
 src/tests/reconciliation_tests.cpp | 18 ++++++++++--------
 2 files changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e3bbdf88/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 4099bce..2dfa5c5 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -1273,15 +1273,24 @@ protected:
      return;
     }
 
-    ReconcileTasksMessage message;
-    message.mutable_framework_id()->MergeFrom(framework.id());
+    Call call;
+
+    CHECK(framework.has_id());
+    call.mutable_framework_id()->CopyFrom(framework.id());
+    call.set_type(Call::RECONCILE);
+
+    Call::Reconcile* reconcile = call.mutable_reconcile();
 
     foreach (const TaskStatus& status, statuses) {
-      message.add_statuses()->MergeFrom(status);
+      Call::Reconcile::Task* task = reconcile->add_tasks();
+      task->mutable_task_id()->CopyFrom(status.task_id());
+      if (status.has_slave_id()) {
+        task->mutable_slave_id()->CopyFrom(status.slave_id());
+      }
     }
 
     CHECK_SOME(master);
-    send(master.get(), message);
+    send(master.get(), call);
   }
 
 private:

http://git-wip-us.apache.org/repos/asf/mesos/blob/e3bbdf88/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 6042d8c..aef245a 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -26,6 +26,8 @@
 #include <mesos/mesos.hpp>
 #include <mesos/scheduler.hpp>
 
+#include <mesos/scheduler/scheduler.hpp>
+
 #include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/pid.hpp>
@@ -446,15 +448,15 @@ TEST_F(ReconciliationTest, SlaveInTransition)
 
   statuses.push_back(status);
 
-  Future<ReconcileTasksMessage> reconcileTasksMessage =
-    FUTURE_PROTOBUF(ReconcileTasksMessage(), _ , _);
+  Future<mesos::scheduler::Call> reconcileCall = FUTURE_CALL(
+      mesos::scheduler::Call(), mesos::scheduler::Call::RECONCILE, _ , _);
 
   Clock::pause();
 
   driver.reconcileTasks(statuses);
 
-  // Make sure the master received the reconcile tasks message.
-  AWAIT_READY(reconcileTasksMessage);
+  // Make sure the master received the reconcile call.
+  AWAIT_READY(reconcileCall);
 
   // The Clock::settle() will ensure that framework would receive
   // a status update if it is sent by the master. In this test it
@@ -587,8 +589,8 @@ TEST_F(ReconciliationTest, ImplicitTerminalTask)
   EXPECT_CALL(exec, shutdown(_))
     .Times(AtMost(1));
 
-  Future<ReconcileTasksMessage> reconcileTasksMessage =
-    FUTURE_PROTOBUF(ReconcileTasksMessage(), _ , _);
+  Future<mesos::scheduler::Call> reconcileCall = FUTURE_CALL(
+      mesos::scheduler::Call(), mesos::scheduler::Call::RECONCILE, _ , _);
 
   Clock::pause();
 
@@ -597,8 +599,8 @@ TEST_F(ReconciliationTest, ImplicitTerminalTask)
   vector<TaskStatus> statuses;
   driver.reconcileTasks(statuses);
 
-  // Make sure the master received the reconcile tasks message.
-  AWAIT_READY(reconcileTasksMessage);
+  // Make sure the master received the reconcile call.
+  AWAIT_READY(reconcileCall);
 
   // The Clock::settle() will ensure that framework would receive
   // a status update if it is sent by the master. In this test it


[7/9] mesos git commit: Updated scheduler driver to send TEARDOWN call.

Posted by vi...@apache.org.
Updated scheduler driver to send TEARDOWN call.

Review: https://reviews.apache.org/r/36470


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/51c2b523
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/51c2b523
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/51c2b523

Branch: refs/heads/master
Commit: 51c2b523d0b548b5882371d3a87dab13f9aec4dc
Parents: cf485e2
Author: Vinod Kone <vi...@gmail.com>
Authored: Mon Jul 13 15:51:17 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:44:00 2015 -0700

----------------------------------------------------------------------
 src/sched/sched.cpp                  | 10 +++++++---
 src/tests/exception_tests.cpp        | 10 ++++++----
 src/tests/fault_tolerance_tests.cpp  | 13 +++++++++++--
 src/tests/master_allocator_tests.cpp |  4 +++-
 src/tests/master_tests.cpp           | 23 ++++++++++-------------
 src/tests/rate_limiting_tests.cpp    | 10 +++++-----
 src/tests/slave_recovery_tests.cpp   |  8 ++++----
 7 files changed, 46 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/51c2b523/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index c563c44..8163796 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -990,10 +990,14 @@ protected:
     terminate(self());
 
     if (connected && !failover) {
-      UnregisterFrameworkMessage message;
-      message.mutable_framework_id()->MergeFrom(framework.id());
+      Call call;
+
+      CHECK(framework.has_id());
+      call.mutable_framework_id()->CopyFrom(framework.id());
+      call.set_type(Call::TEARDOWN);
+
       CHECK_SOME(master);
-      send(master.get(), message);
+      send(master.get(), call);
     }
 
     synchronized (mutex) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/51c2b523/src/tests/exception_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/exception_tests.cpp b/src/tests/exception_tests.cpp
index 9af1674..658e485 100644
--- a/src/tests/exception_tests.cpp
+++ b/src/tests/exception_tests.cpp
@@ -21,6 +21,8 @@
 #include <mesos/executor.hpp>
 #include <mesos/scheduler.hpp>
 
+#include <mesos/scheduler/scheduler.hpp>
+
 #include <process/gmock.hpp>
 #include <process/pid.hpp>
 #include <process/process.hpp>
@@ -190,13 +192,13 @@ TEST_F(ExceptionTest, DisallowSchedulerCallbacksOnAbort)
 
   AWAIT_READY(rescindMsg);
 
-  Future<UnregisterFrameworkMessage> unregisterMsg =
-    FUTURE_PROTOBUF(UnregisterFrameworkMessage(), _, _);
+  Future<mesos::scheduler::Call> teardownCall = FUTURE_CALL(
+      mesos::scheduler::Call(), mesos::scheduler::Call::TEARDOWN, _, _);
 
   driver.stop();
 
-  //Ensures reception of RescindResourceOfferMessage.
-  AWAIT_READY(unregisterMsg);
+  // Ensures reception of RescindResourceOfferMessage.
+  AWAIT_READY(teardownCall);
 
   Shutdown();
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/51c2b523/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 60ca523..72f4cab 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -1310,8 +1310,7 @@ TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkToExecutorMessage)
 
   MockScheduler sched2;
 
-  FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line.
-  framework2 = DEFAULT_FRAMEWORK_INFO;
+  FrameworkInfo framework2 = DEFAULT_FRAMEWORK_INFO;
   framework2.mutable_id()->MergeFrom(frameworkId);
 
   MesosSchedulerDriver driver2(
@@ -1335,9 +1334,19 @@ TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkToExecutorMessage)
   EXPECT_CALL(exec, frameworkMessage(_, _))
     .WillOnce(FutureArg<1>(&frameworkMessage));
 
+  // Since 'sched2' doesn't receive any offers the framework message
+  // should go through the master.
+  Future<mesos::scheduler::Call> messageCall = FUTURE_CALL(
+      mesos::scheduler::Call(),
+      mesos::scheduler::Call::MESSAGE,
+      _,
+      master.get());
+
   driver2.sendFrameworkMessage(
       DEFAULT_EXECUTOR_ID, offers.get()[0].slave_id(), "hello world");
 
+  AWAIT_READY(messageCall);
+
   AWAIT_EQ("hello world", frameworkMessage);
 
   EXPECT_CALL(exec, shutdown(_))

http://git-wip-us.apache.org/repos/asf/mesos/blob/51c2b523/src/tests/master_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_allocator_tests.cpp b/src/tests/master_allocator_tests.cpp
index 534b248..147f510 100644
--- a/src/tests/master_allocator_tests.cpp
+++ b/src/tests/master_allocator_tests.cpp
@@ -29,6 +29,8 @@
 
 #include <mesos/module/allocator.hpp>
 
+#include <mesos/scheduler/scheduler.hpp>
+
 #include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
@@ -440,7 +442,7 @@ TYPED_TEST(MasterAllocatorTest, SchedulerFailover)
   // When we shut down the first framework, we don't want it to tell
   // the master it's shutting down so that the master will wait to see
   // if it fails over.
-  DROP_PROTOBUFS(UnregisterFrameworkMessage(), _, _);
+  DROP_CALLS(mesos::scheduler::Call(), mesos::scheduler::Call::TEARDOWN, _, _);
 
   Future<Nothing> deactivateFramework;
   EXPECT_CALL(allocator, deactivateFramework(_))

http://git-wip-us.apache.org/repos/asf/mesos/blob/51c2b523/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index fdee267..9205ec4 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -241,13 +241,10 @@ TEST_F(MasterTest, ShutdownFrameworkWhileTaskRunning)
 
   AWAIT_READY(update);
 
-  // Set expectation that Master receives UnregisterFrameworkMessage,
-  // which triggers marking running tasks as killed.
-  UnregisterFrameworkMessage message;
-  message.mutable_framework_id()->MergeFrom(offer.framework_id());
-
-  Future<UnregisterFrameworkMessage> unregisterFrameworkMessage =
-    FUTURE_PROTOBUF(message, _, master.get());
+  // Set expectation that Master receives teardown call, which
+  // triggers marking running tasks as killed.
+  Future<mesos::scheduler::Call> teardownCall = FUTURE_CALL(
+      mesos::scheduler::Call(), mesos::scheduler::Call::TEARDOWN, _, _);
 
   // Set expectation that Executor's shutdown callback is invoked.
   Future<Nothing> shutdown;
@@ -258,14 +255,14 @@ TEST_F(MasterTest, ShutdownFrameworkWhileTaskRunning)
   driver.stop();
   driver.join();
 
-  // Wait for UnregisterFrameworkMessage message to be dispatched and
-  // executor's shutdown callback to be called.
-  AWAIT_READY(unregisterFrameworkMessage);
+  // Wait for teardown call to be dispatched and executor's shutdown
+  // callback to be called.
+  AWAIT_READY(teardownCall);
   AWAIT_READY(shutdown);
 
-  // We have to be sure the UnregisterFrameworkMessage is processed
-  // completely and running tasks enter a terminal state before we
-  // request the master state.
+  // We have to be sure the teardown call is processed completely and
+  // running tasks enter a terminal state before we request the master
+  // state.
   Clock::pause();
   Clock::settle();
   Clock::resume();

http://git-wip-us.apache.org/repos/asf/mesos/blob/51c2b523/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index 49d907b..6a93df0 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -178,10 +178,10 @@ TEST_F(RateLimitingTest, NoRateLimiting)
   driver->join();
   delete driver;
 
-  // The fact that UnregisterFrameworkMessage (the 2nd message from
-  // 'sched' that reaches Master after its registration) gets
-  // processed without Clock advances proves that the framework is
-  // given unlimited rate.
+  // The fact that the teardown call (the 2nd call from the scheduler
+  // that reaches Master after its registration) gets processed
+  // without Clock advances proves that the framework is given
+  // unlimited rate.
   AWAIT_READY(removeFramework);
 
   // For metrics endpoint.
@@ -724,7 +724,7 @@ TEST_F(RateLimitingTest, SamePrincipalFrameworks)
   driver1->join();
   delete driver1;
 
-  // Advance to let UnregisterFrameworkMessage come through.
+  // Advance to let the teardown call come through.
   Clock::settle();
   Clock::advance(Seconds(1));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/51c2b523/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index ff7aaf9..de2fc28 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -2151,15 +2151,15 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework)
   this->Stop(slave.get());
   delete containerizer1.get();
 
-  Future<UnregisterFrameworkMessage> unregisterFrameworkMessage =
-    FUTURE_PROTOBUF(UnregisterFrameworkMessage(), _, _);
+  Future<mesos::scheduler::Call> teardownCall = FUTURE_CALL(
+      mesos::scheduler::Call(), mesos::scheduler::Call::TEARDOWN, _, _);
 
   // Now stop the framework.
   driver.stop();
   driver.join();
 
-  // Wait util the framework is removed.
-  AWAIT_READY(unregisterFrameworkMessage);
+  // Wait until the framework is removed.
+  AWAIT_READY(teardownCall);
 
   Future<ShutdownFrameworkMessage> shutdownFrameworkMessage =
     FUTURE_PROTOBUF(ShutdownFrameworkMessage(), _, _);