You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/07/17 19:44:35 UTC
[1/9] mesos git commit: Added FUTURE_CALL, DROP_CALL,
DROP_CALLS and EXPECT_NO_FUTURE_CALLS.
Repository: mesos
Updated Branches:
refs/heads/master 6b1b66df7 -> 51c2b523d
Added FUTURE_CALL, DROP_CALL, DROP_CALLS and EXPECT_NO_FUTURE_CALLS.
Review: https://reviews.apache.org/r/36462
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/415f7b15
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/415f7b15
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/415f7b15
Branch: refs/heads/master
Commit: 415f7b15028a1d367836599fac7df73690d5c499
Parents: 5329851
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Jul 10 15:50:38 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:43:58 2015 -0700
----------------------------------------------------------------------
src/tests/mesos.hpp | 66 ++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 66 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/415f7b15/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 9157ac0..23d9841 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1324,6 +1324,39 @@ ACTION_P(SendStatusUpdateFromTaskID, state)
#define EXPECT_NO_FUTURE_PROTOBUFS(message, from, to) \
ExpectNoFutureProtobufs(message, from, to)
+
+// These are specialized versions of {FUTURE,DROP}_PROTOBUF that
+// capture a scheduler/executor Call protobuf of the given 'type'.
+// Note that we name methods as '*ProtobufUnion()' because these could
+// be reused for macros that capture any protobufs that are described
+// using the standard protocol buffer "union" trick (e.g.,
+// FUTURE_EVENT to capture scheduler::Event), see
+// https://developers.google.com/protocol-buffers/docs/techniques#union.
+
+#define FUTURE_CALL(message, unionType, from, to) \
+ FutureUnionProtobuf(message, unionType, from, to)
+
+
+#define DROP_CALL(message, unionType, from, to) \
+ FutureUnionProtobuf(message, unionType, from, to, true)
+
+
+#define DROP_CALLS(message, unionType, from, to) \
+ DropUnionProtobufs(message, unionType, from, to)
+
+
+#define EXPECT_NO_FUTURE_CALLS(message, unionType, from, to) \
+ ExpectNoFutureUnionProtobufs(message, unionType, from, to)
+
+
+#define FUTURE_CALL_MESSAGE(message, unionType, from, to) \
+ process::FutureUnionMessage(message, unionType, from, to)
+
+
+#define DROP_CALL_MESSAGE(message, unionType, from, to) \
+ process::FutureUnionMessage(message, unionType, from, to, true)
+
+
// Forward declaration.
template <typename T>
T _FutureProtobuf(const process::Message& message);
@@ -1340,6 +1373,18 @@ process::Future<T> FutureProtobuf(T t, From from, To to, bool drop = false)
}
+template <typename Message, typename UnionType, typename From, typename To>
+process::Future<Message> FutureUnionProtobuf(
+ Message message, UnionType unionType, From from, To to, bool drop = false)
+{
+ // Help debugging by adding some "type constraints".
+ { google::protobuf::Message* m = &message; (void) m; }
+
+ return process::FutureUnionMessage(message, unionType, from, to, drop)
+ .then(lambda::bind(&_FutureProtobuf<Message>, lambda::_1));
+}
+
+
template <typename T>
T _FutureProtobuf(const process::Message& message)
{
@@ -1359,6 +1404,16 @@ void DropProtobufs(T t, From from, To to)
}
+template <typename Message, typename UnionType, typename From, typename To>
+void DropUnionProtobufs(Message message, UnionType unionType, From from, To to)
+{
+ // Help debugging by adding some "type constraints".
+ { google::protobuf::Message* m = &message; (void) m; }
+
+ process::DropUnionMessages(message, unionType, from, to);
+}
+
+
template <typename T, typename From, typename To>
void ExpectNoFutureProtobufs(T t, From from, To to)
{
@@ -1369,6 +1424,17 @@ void ExpectNoFutureProtobufs(T t, From from, To to)
}
+template <typename Message, typename UnionType, typename From, typename To>
+void ExpectNoFutureUnionProtobufs(
+ Message message, UnionType unionType, From from, To to)
+{
+ // Help debugging by adding some "type constraints".
+ { google::protobuf::Message* m = &message; (void) m; }
+
+ process::ExpectNoFutureUnionMessages(message, unionType, from, to);
+}
+
+
// This matcher is used to match the task ids of TaskStatus messages.
// Suppose we set up N futures for LaunchTasks and N futures for StatusUpdates.
// (This is a common pattern). We get into a situation where all StatusUpdates
[8/9] mesos git commit: Updated scheduler driver to send MESSAGE call.
Posted by vi...@apache.org.
Updated scheduler driver to send MESSAGE call.
Review: https://reviews.apache.org/r/36469
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cf485e29
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cf485e29
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cf485e29
Branch: refs/heads/master
Commit: cf485e29413a4d24567f82f9a47c30e59bd2c522
Parents: 5717eab
Author: Vinod Kone <vi...@gmail.com>
Authored: Mon Jul 13 13:03:13 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:44:00 2015 -0700
----------------------------------------------------------------------
src/sched/sched.cpp | 20 +++++--
src/tests/fault_tolerance_tests.cpp | 95 +++++++++++++++++++++++++++++++-
src/tests/slave_recovery_tests.cpp | 4 +-
3 files changed, 110 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/cf485e29/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 613e40b..c563c44 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -1259,6 +1259,8 @@ protected:
UPID slave = savedSlavePids[slaveId];
CHECK(slave != UPID());
+ // TODO(vinod): Send a Call directly to the slave once that
+ // support is added.
FrameworkToExecutorMessage message;
message.mutable_slave_id()->MergeFrom(slaveId);
message.mutable_framework_id()->MergeFrom(framework.id());
@@ -1269,13 +1271,19 @@ protected:
VLOG(1) << "Cannot send directly to slave " << slaveId
<< "; sending through master";
- FrameworkToExecutorMessage message;
- message.mutable_slave_id()->MergeFrom(slaveId);
- message.mutable_framework_id()->MergeFrom(framework.id());
- message.mutable_executor_id()->MergeFrom(executorId);
- message.set_data(data);
+ Call call;
+
+ CHECK(framework.has_id());
+ call.mutable_framework_id()->CopyFrom(framework.id());
+ call.set_type(Call::MESSAGE);
+
+ Call::Message* message = call.mutable_message();
+ message->mutable_slave_id()->CopyFrom(slaveId);
+ message->mutable_executor_id()->CopyFrom(executorId);
+ message->set_data(data);
+
CHECK_SOME(master);
- send(master.get(), message);
+ send(master.get(), call);
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/cf485e29/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index f64c797..60ca523 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -18,6 +18,7 @@
#include <gmock/gmock.h>
+#include <string>
#include <vector>
#include <mesos/executor.hpp>
@@ -70,6 +71,7 @@ using process::UPID;
using process::http::OK;
using process::http::Response;
+using std::string;
using std::vector;
using testing::_;
@@ -1156,7 +1158,7 @@ TEST_F(FaultToleranceTest, ForwardStatusUpdateUnknownExecutor)
}
-TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkMessage)
+TEST_F(FaultToleranceTest, SchedulerFailoverExecutorToFrameworkMessage)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
@@ -1260,6 +1262,97 @@ TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkMessage)
}
+TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkToExecutorMessage)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ Try<PID<Slave> > slave = StartSlave(&exec);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched1;
+ MesosSchedulerDriver driver1(
+ &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ FrameworkID frameworkId;
+ EXPECT_CALL(sched1, registered(&driver1, _, _))
+ .WillOnce(SaveArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver1.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ ExecutorDriver* execDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(SaveArg<0>(&execDriver));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+
+ driver1.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ MockScheduler sched2;
+
+ FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line.
+ framework2 = DEFAULT_FRAMEWORK_INFO;
+ framework2.mutable_id()->MergeFrom(frameworkId);
+
+ MesosSchedulerDriver driver2(
+ &sched2, framework2, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<Nothing> registered;
+ EXPECT_CALL(sched2, registered(&driver2, frameworkId, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ Future<Nothing> error;
+ EXPECT_CALL(sched1, error(&driver1, "Framework failed over"))
+ .WillOnce(FutureSatisfy(&error));
+
+ driver2.start();
+
+ AWAIT_READY(error);
+
+ AWAIT_READY(registered);
+
+ Future<string> frameworkMessage;
+ EXPECT_CALL(exec, frameworkMessage(_, _))
+ .WillOnce(FutureArg<1>(&frameworkMessage));
+
+ driver2.sendFrameworkMessage(
+ DEFAULT_EXECUTOR_ID, offers.get()[0].slave_id(), "hello world");
+
+ AWAIT_EQ("hello world", frameworkMessage);
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver1.stop();
+ driver2.stop();
+
+ driver1.join();
+ driver2.join();
+
+ Shutdown();
+}
+
+
// This test verifies that a partitioned framework that still
// thinks it is registered with the master cannot kill a task because
// the master has re-registered another instance of the framework.
http://git-wip-us.apache.org/repos/asf/mesos/blob/cf485e29/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 7708cf6..ff7aaf9 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -310,13 +310,13 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
.tasks[task.task_id()]
.updates.front().uuid());
+ const UUID& uuid = UUID::fromBytes(ack.get().acknowledge().uuid());
ASSERT_TRUE(state
.frameworks[frameworkId]
.executors[executorId]
.runs[containerId.get()]
.tasks[task.task_id()]
- .acks.contains(
- UUID::fromBytes(ack.get().acknowledge().uuid())));
+ .acks.contains(uuid));
// Shut down the executor manually so that it doesn't hang around
// after the test finishes.
[6/9] mesos git commit: Updated scheduler driver to send ACKNOWLEDGE
call.
Posted by vi...@apache.org.
Updated scheduler driver to send ACKNOWLEDGE call.
Review: https://reviews.apache.org/r/36467
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5717eabf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5717eabf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5717eabf
Branch: refs/heads/master
Commit: 5717eabf93dc47430492858d4f781f5ac7bb5373
Parents: e3bbdf8
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Jul 10 15:48:48 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:44:00 2015 -0700
----------------------------------------------------------------------
src/sched/sched.cpp | 41 +++++++++++++++++---------
src/tests/fault_tolerance_tests.cpp | 6 ++--
src/tests/reconciliation_tests.cpp | 5 +++-
src/tests/scheduler_event_call_tests.cpp | 8 +++--
src/tests/scheduler_tests.cpp | 37 ++++++++++++++++-------
src/tests/slave_recovery_tests.cpp | 9 ++++--
src/tests/slave_tests.cpp | 2 ++
src/tests/status_update_manager_tests.cpp | 5 +++-
8 files changed, 78 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 2dfa5c5..613e40b 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -883,12 +883,19 @@ protected:
VLOG(2) << "Sending ACK for status update " << update
<< " to " << master.get();
- StatusUpdateAcknowledgementMessage message;
- message.mutable_framework_id()->MergeFrom(framework.id());
- message.mutable_slave_id()->MergeFrom(update.slave_id());
- message.mutable_task_id()->MergeFrom(update.status().task_id());
- message.set_uuid(update.uuid());
- send(master.get(), message);
+ Call call;
+
+ CHECK(framework.has_id());
+ call.mutable_framework_id()->CopyFrom(framework.id());
+ call.set_type(Call::ACKNOWLEDGE);
+
+ Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+ acknowledge->mutable_slave_id()->CopyFrom(update.slave_id());
+ acknowledge->mutable_task_id()->CopyFrom(update.status().task_id());
+ acknowledge->set_uuid(update.uuid());
+
+ CHECK_SOME(master);
+ send(master.get(), call);
}
}
}
@@ -1191,8 +1198,6 @@ protected:
return;
}
- CHECK_SOME(master);
-
// NOTE: By ignoring the volatile 'running' here, we ensure that
// all acknowledgements requested before the driver was stopped
// or aborted are processed. Any acknowledgement that is requested
@@ -1204,17 +1209,25 @@ protected:
// ensures that master-generated and driver-generated updates
// will not have a 'uuid' set.
if (status.has_uuid() && status.has_slave_id()) {
+ CHECK_SOME(master);
+
VLOG(2) << "Sending ACK for status update " << status.uuid()
<< " of task " << status.task_id()
<< " on slave " << status.slave_id()
<< " to " << master.get();
- StatusUpdateAcknowledgementMessage message;
- message.mutable_framework_id()->CopyFrom(framework.id());
- message.mutable_slave_id()->CopyFrom(status.slave_id());
- message.mutable_task_id()->CopyFrom(status.task_id());
- message.set_uuid(status.uuid());
- send(master.get(), message);
+ Call call;
+
+ CHECK(framework.has_id());
+ call.mutable_framework_id()->CopyFrom(framework.id());
+ call.set_type(Call::ACKNOWLEDGE);
+
+ Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+ acknowledge->mutable_slave_id()->CopyFrom(status.slave_id());
+ acknowledge->mutable_task_id()->CopyFrom(status.task_id());
+ acknowledge->set_uuid(status.uuid());
+
+ send(master.get(), call);
} else {
VLOG(2) << "Received ACK for status update"
<< (status.has_uuid() ? " " + status.uuid() : "")
http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index f218dc6..f64c797 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -1296,8 +1296,8 @@ TEST_F(FaultToleranceTest, IgnoreKillTaskFromUnregisteredFramework)
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
.WillOnce(FutureArg<1>(&status));
- Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage
- = FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+ Future<mesos::scheduler::Call> acknowledgeCall = FUTURE_CALL(
+ mesos::scheduler::Call(), mesos::scheduler::Call::ACKNOWLEDGE, _, _);
ExecutorDriver* execDriver;
EXPECT_CALL(exec, registered(_, _, _, _))
@@ -1314,7 +1314,7 @@ TEST_F(FaultToleranceTest, IgnoreKillTaskFromUnregisteredFramework)
// Wait for the status update acknowledgement to be sent. This
// ensures the slave doesn't resend the TASK_RUNNING update to the
// failed over scheduler (below).
- AWAIT_READY(statusUpdateAcknowledgementMessage);
+ AWAIT_READY(acknowledgeCall);
// Now start the second failed over scheduler.
MockScheduler sched2;
http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index aef245a..6940b6a 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -745,7 +745,10 @@ TEST_F(ReconciliationTest, UnacknowledgedTerminalTask)
// Drop the status update acknowledgements to ensure that the
// task remains terminal and unacknowledged in the master.
- DROP_PROTOBUFS(StatusUpdateAcknowledgementMessage(), _, _);
+ DROP_CALLS(mesos::scheduler::Call(),
+ mesos::scheduler::Call::ACKNOWLEDGE,
+ _,
+ master.get());
driver.start();
http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/scheduler_event_call_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_event_call_tests.cpp b/src/tests/scheduler_event_call_tests.cpp
index cf6aa19..fe15f05 100644
--- a/src/tests/scheduler_event_call_tests.cpp
+++ b/src/tests/scheduler_event_call_tests.cpp
@@ -20,6 +20,8 @@
#include <mesos/scheduler.hpp>
+#include <mesos/scheduler/scheduler.hpp>
+
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
@@ -152,13 +154,13 @@ TEST_F(SchedulerDriverEventTest, Update)
// Generate an update that requires acknowledgement.
event.mutable_update()->mutable_status()->set_uuid(UUID::random().toBytes());
- Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgement =
- DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+ Future<mesos::scheduler::Call> acknowledgement = DROP_CALL(
+ mesos::scheduler::Call(), mesos::scheduler::Call::ACKNOWLEDGE, _, _);
process::post(master.get(), frameworkPid, event);
AWAIT_READY(statusUpdate2);
- AWAIT_READY(statusUpdateAcknowledgement);
+ AWAIT_READY(acknowledgement);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 2ce280a..13fecb2 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -27,6 +27,8 @@
#include <mesos/scheduler.hpp>
#include <mesos/type_utils.hpp>
+#include <mesos/scheduler/scheduler.hpp>
+
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
@@ -1087,8 +1089,11 @@ TEST_F(MesosSchedulerDriverTest, DropAckIfStopCalledBeforeAbort)
// Ensure no status update acknowledgements are sent from the driver
// to the master.
- EXPECT_NO_FUTURE_PROTOBUFS(
- StatusUpdateAcknowledgementMessage(), _ , master.get());
+ EXPECT_NO_FUTURE_CALLS(
+ mesos::scheduler::Call(),
+ mesos::scheduler::Call::ACKNOWLEDGE,
+ _ ,
+ master.get());
EXPECT_CALL(exec, registered(_, _, _, _));
@@ -1144,8 +1149,11 @@ TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgements)
// Ensure no status update acknowledgements are sent from the driver
// to the master until the explicit acknowledgement is sent.
- EXPECT_NO_FUTURE_PROTOBUFS(
- StatusUpdateAcknowledgementMessage(), _ , master.get());
+ EXPECT_NO_FUTURE_CALLS(
+ mesos::scheduler::Call(),
+ mesos::scheduler::Call::ACKNOWLEDGE,
+ _ ,
+ master.get());
EXPECT_CALL(exec, registered(_, _, _, _));
@@ -1166,8 +1174,11 @@ TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgements)
Clock::settle();
// Now send the acknowledgement.
- Future<StatusUpdateAcknowledgementMessage> acknowledgement =
- FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _ , master.get());
+ Future<mesos::scheduler::Call> acknowledgement = FUTURE_CALL(
+ mesos::scheduler::Call(),
+ mesos::scheduler::Call::ACKNOWLEDGE,
+ _,
+ master.get());
driver.acknowledgeStatusUpdate(status.get());
@@ -1204,8 +1215,11 @@ TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsMasterGeneratedUpdate)
.WillRepeatedly(Return()); // Ignore subsequent offers.
// Ensure no status update acknowledgements are sent to the master.
- EXPECT_NO_FUTURE_PROTOBUFS(
- StatusUpdateAcknowledgementMessage(), _ , master.get());
+ EXPECT_NO_FUTURE_CALLS(
+ mesos::scheduler::Call(),
+ mesos::scheduler::Call::ACKNOWLEDGE,
+ _ ,
+ master.get());
driver.start();
@@ -1266,8 +1280,11 @@ TEST_F(MesosSchedulerDriverTest, ExplicitAcknowledgementsUnsetSlaveID)
.WillOnce(FutureSatisfy(®istered));
// Ensure no status update acknowledgements are sent to the master.
- EXPECT_NO_FUTURE_PROTOBUFS(
- StatusUpdateAcknowledgementMessage(), _ , master.get());
+ EXPECT_NO_FUTURE_CALLS(
+ mesos::scheduler::Call(),
+ mesos::scheduler::Call::ACKNOWLEDGE,
+ _ ,
+ master.get());
driver.start();
http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 2f882cf..7708cf6 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -27,6 +27,8 @@
#include <mesos/resources.hpp>
#include <mesos/scheduler.hpp>
+#include <mesos/scheduler/scheduler.hpp>
+
#include <process/dispatch.hpp>
#include <process/gmock.hpp>
#include <process/owned.hpp>
@@ -211,8 +213,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
Future<StatusUpdateMessage> update =
FUTURE_PROTOBUF(StatusUpdateMessage(), Eq(master.get()), _);
- Future<StatusUpdateAcknowledgementMessage> ack =
- FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+ Future<mesos::scheduler::Call> ack = FUTURE_CALL(
+ mesos::scheduler::Call(), mesos::scheduler::Call::ACKNOWLEDGE, _, _);
Future<Nothing> _ack =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
@@ -313,7 +315,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
.executors[executorId]
.runs[containerId.get()]
.tasks[task.task_id()]
- .acks.contains(UUID::fromBytes(ack.get().uuid())));
+ .acks.contains(
+ UUID::fromBytes(ack.get().acknowledge().uuid())));
// Shut down the executor manually so that it doesn't hang around
// after the test finishes.
http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 89cc7f6..a16e4f4 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -29,6 +29,8 @@
#include <mesos/executor.hpp>
#include <mesos/scheduler.hpp>
+#include <mesos/scheduler/scheduler.hpp>
+
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
http://git-wip-us.apache.org/repos/asf/mesos/blob/5717eabf/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 440b074..0224e50 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -420,7 +420,10 @@ TEST_F(StatusUpdateManagerTest, IgnoreUnexpectedStatusUpdateAck)
// Drop the ACKs, so that status update manager
// retries the update.
- DROP_PROTOBUFS(StatusUpdateAcknowledgementMessage(), _, _);
+ DROP_CALLS(mesos::scheduler::Call(),
+ mesos::scheduler::Call::ACKNOWLEDGE,
+ _,
+ master.get());
driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
[3/9] mesos git commit: Updated scheduler driver to send ACCEPT call.
Posted by vi...@apache.org.
Updated scheduler driver to send ACCEPT call.
Review: https://reviews.apache.org/r/36464
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0981c8d0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0981c8d0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0981c8d0
Branch: refs/heads/master
Commit: 0981c8d06f76f8856dc9e6365a476620d376f9eb
Parents: be0659b
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Jul 10 12:43:40 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:43:59 2015 -0700
----------------------------------------------------------------------
src/sched/sched.cpp | 60 ++++-------------------------------------
src/tests/master_tests.cpp | 6 ++---
2 files changed, 8 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0981c8d0/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 2fe1836..9da0782 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -1060,65 +1060,15 @@ protected:
const vector<TaskInfo>& tasks,
const Filters& filters)
{
- if (!connected) {
- VLOG(1) << "Ignoring launch tasks message as master is disconnected";
- // NOTE: Reply to the framework with TASK_LOST messages for each
- // task. This is a hack for now, to not let the scheduler
- // believe the tasks are launched, when actually the master
- // never received the launchTasks message. Also, realize that
- // this hack doesn't capture the case when the scheduler process
- // sends it but the master never receives it (message lost,
- // master failover etc). The correct way for schedulers to deal
- // with this situation is to use 'reconcileTasks()'.
- foreach (const TaskInfo& task, tasks) {
- StatusUpdate update = protobuf::createStatusUpdate(
- framework.id(),
- None(),
- task.task_id(),
- TASK_LOST,
- TaskStatus::SOURCE_MASTER,
- None(),
- "Master disconnected",
- TaskStatus::REASON_MASTER_DISCONNECTED);
-
- statusUpdate(UPID(), update, UPID());
- }
- return;
- }
-
- LaunchTasksMessage message;
- message.mutable_framework_id()->MergeFrom(framework.id());
- message.mutable_filters()->MergeFrom(filters);
-
- foreach (const OfferID& offerId, offerIds) {
- message.add_offer_ids()->MergeFrom(offerId);
-
- foreach (const TaskInfo& task, tasks) {
- // Keep only the slave PIDs where we run tasks so we can send
- // framework messages directly.
- if (savedOffers.contains(offerId)) {
- if (savedOffers[offerId].count(task.slave_id()) > 0) {
- savedSlavePids[task.slave_id()] =
- savedOffers[offerId][task.slave_id()];
- } else {
- LOG(WARNING) << "Attempting to launch task " << task.task_id()
- << " with the wrong slave id " << task.slave_id();
- }
- } else {
- LOG(WARNING) << "Attempting to launch task " << task.task_id()
- << " with an unknown offer " << offerId;
- }
- }
- // Remove the offer since we saved all the PIDs we might use.
- savedOffers.erase(offerId);
- }
+ Offer::Operation operation;
+ operation.set_type(Offer::Operation::LAUNCH);
+ Offer::Operation::Launch* launch = operation.mutable_launch();
foreach (const TaskInfo& task, tasks) {
- message.add_tasks()->MergeFrom(task);
+ launch->add_task_infos()->CopyFrom(task);
}
- CHECK_SOME(master);
- send(master.get(), message);
+ acceptOffers(offerIds, {operation}, filters);
}
void acceptOffers(
http://git-wip-us.apache.org/repos/asf/mesos/blob/0981c8d0/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 13babee..fdee267 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -2562,8 +2562,8 @@ TEST_F(MasterTest, OfferNotRescindedOnceDeclined)
EXPECT_CALL(sched, resourceOffers(_, _))
.WillRepeatedly(DeclineOffers()); // Decline all offers.
- Future<LaunchTasksMessage> launchTasksMessage =
- FUTURE_PROTOBUF(LaunchTasksMessage(), _, _);
+ Future<mesos::scheduler::Call> acceptCall = FUTURE_CALL(
+ mesos::scheduler::Call(), mesos::scheduler::Call::ACCEPT, _, _);
EXPECT_CALL(sched, offerRescinded(&driver, _))
.Times(0);
@@ -2572,7 +2572,7 @@ TEST_F(MasterTest, OfferNotRescindedOnceDeclined)
AWAIT_READY(registered);
// Wait for the framework to decline the offers.
- AWAIT_READY(launchTasksMessage);
+ AWAIT_READY(acceptCall);
// Now advance to the offer timeout, we need to settle the clock to
// ensure that the offer rescind timeout would be processed
[4/9] mesos git commit: Updated scheduler driver to send Kill Call.
Posted by vi...@apache.org.
Updated scheduler driver to send Kill Call.
Review: https://reviews.apache.org/r/36463
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/be0659b8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/be0659b8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/be0659b8
Branch: refs/heads/master
Commit: be0659b85da24b8e0d312ef7cc784288e1962a62
Parents: 415f7b1
Author: Vinod Kone <vi...@gmail.com>
Authored: Thu Jul 9 18:06:19 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:43:59 2015 -0700
----------------------------------------------------------------------
src/sched/sched.cpp | 14 ++++++++++----
src/tests/fault_tolerance_tests.cpp | 8 +++++---
src/tests/master_tests.cpp | 9 +++++----
3 files changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/be0659b8/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index de76803..2fe1836 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -1027,11 +1027,17 @@ protected:
return;
}
- KillTaskMessage message;
- message.mutable_framework_id()->MergeFrom(framework.id());
- message.mutable_task_id()->MergeFrom(taskId);
+ Call call;
+
+ CHECK(framework.has_id());
+ call.mutable_framework_id()->CopyFrom(framework.id());
+ call.set_type(Call::KILL);
+
+ Call::Kill* kill = call.mutable_kill();
+ kill->mutable_task_id()->CopyFrom(taskId);
+
CHECK_SOME(master);
- send(master.get(), message);
+ send(master.get(), call);
}
void requestResources(const vector<Request>& requests)
http://git-wip-us.apache.org/repos/asf/mesos/blob/be0659b8/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 1070ccf..f218dc6 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -28,6 +28,8 @@
#include <mesos/master/allocator.hpp>
+#include <mesos/scheduler/scheduler.hpp>
+
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/http.hpp>
@@ -1354,12 +1356,12 @@ TEST_F(FaultToleranceTest, IgnoreKillTaskFromUnregisteredFramework)
EXPECT_CALL(sched2, statusUpdate(&driver2, _))
.WillOnce(FutureArg<1>(&status2));
- Future<KillTaskMessage> killTaskMessage =
- FUTURE_PROTOBUF(KillTaskMessage(), _, _);
+ Future<mesos::scheduler::Call> killCall = FUTURE_CALL(
+ mesos::scheduler::Call(), mesos::scheduler::Call::KILL, _, _);
driver1.killTask(status.get().task_id());
- AWAIT_READY(killTaskMessage);
+ AWAIT_READY(killCall);
// By this point the master must have processed and ignored the
// 'killTask' message from the first framework. To verify this,
http://git-wip-us.apache.org/repos/asf/mesos/blob/be0659b8/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 767c86c..13babee 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -29,6 +29,8 @@
#include <mesos/master/allocator.hpp>
+#include <mesos/scheduler/scheduler.hpp>
+
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
@@ -540,9 +542,8 @@ TEST_F(MasterTest, KillUnknownTaskSlaveInTransition)
EXPECT_CALL(sched, statusUpdate(&driver, _))
.Times(0);
- // Set expectation that Master receives killTask message.
- Future<KillTaskMessage> killTaskMessage =
- FUTURE_PROTOBUF(KillTaskMessage(), _, master.get());
+ Future<mesos::scheduler::Call> killCall = FUTURE_CALL(
+ mesos::scheduler::Call(), mesos::scheduler::Call::KILL, _, _);
// Attempt to kill unknown task while slave is transitioning.
TaskID unknownTaskId;
@@ -554,7 +555,7 @@ TEST_F(MasterTest, KillUnknownTaskSlaveInTransition)
driver.killTask(unknownTaskId);
- AWAIT_READY(killTaskMessage);
+ AWAIT_READY(killCall);
// Wait for all messages to be dispatched and processed completely to satisfy
// the expectation that we didn't receive a status update.
[5/9] mesos git commit: Updated scheduler driver to send REVIVE call.
Posted by vi...@apache.org.
Updated scheduler driver to send REVIVE call.
Review: https://reviews.apache.org/r/36465
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/90ed30d4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/90ed30d4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/90ed30d4
Branch: refs/heads/master
Commit: 90ed30d4d9f99ad9a469e6aaf9fd0fc2ea781772
Parents: 0981c8d
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Jul 10 12:59:51 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:43:59 2015 -0700
----------------------------------------------------------------------
src/sched/sched.cpp | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/90ed30d4/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 9da0782..4099bce 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -1167,10 +1167,14 @@ protected:
return;
}
- ReviveOffersMessage message;
- message.mutable_framework_id()->MergeFrom(framework.id());
+ Call call;
+
+ CHECK(framework.has_id());
+ call.mutable_framework_id()->CopyFrom(framework.id());
+ call.set_type(Call::REVIVE);
+
CHECK_SOME(master);
- send(master.get(), message);
+ send(master.get(), call);
}
void acknowledgeStatusUpdate(
[2/9] mesos git commit: Added FutureUnionMessage,
DropUnionMessages and ExpectNoFutureUnionMessages.
Posted by vi...@apache.org.
Added FutureUnionMessage, DropUnionMessages and
ExpectNoFutureUnionMessages.
Review: https://reviews.apache.org/r/36461
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/53298516
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/53298516
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/53298516
Branch: refs/heads/master
Commit: 53298516d88fa2060990de5254f4c5ad3ca1ada0
Parents: 6b1b66d
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Jul 10 16:10:25 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:43:58 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/gmock.hpp | 64 ++++++++++++++++++++++
1 file changed, 64 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/53298516/3rdparty/libprocess/include/process/gmock.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/gmock.hpp b/3rdparty/libprocess/include/process/gmock.hpp
index e878161..d449952 100644
--- a/3rdparty/libprocess/include/process/gmock.hpp
+++ b/3rdparty/libprocess/include/process/gmock.hpp
@@ -330,6 +330,23 @@ MATCHER_P3(MessageMatcher, name, from, to, "")
}
+// This matches protobuf messages that are described using the
+// standard protocol buffer "union" trick, see:
+// https://developers.google.com/protocol-buffers/docs/techniques#union.
+MATCHER_P4(UnionMessageMatcher, message, unionType, from, to, "")
+{
+ const process::MessageEvent& event = ::std::tr1::get<0>(arg);
+ message_type message;
+
+ return (testing::Matcher<std::string>(message.GetTypeName()).Matches(
+ event.message->name) &&
+ message.ParseFromString(event.message->body) &&
+ testing::Matcher<unionType_type>(unionType).Matches(message.type()) &&
+ testing::Matcher<process::UPID>(from).Matches(event.message->from) &&
+ testing::Matcher<process::UPID>(to).Matches(event.message->to));
+}
+
+
MATCHER_P2(DispatchMatcher, pid, method, "")
{
const DispatchEvent& event = ::std::tr1::get<0>(arg);
@@ -358,6 +375,28 @@ Future<Message> FutureMessage(Name name, From from, To to, bool drop = false)
}
+template <typename Message, typename UnionType, typename From, typename To>
+Future<process::Message> FutureUnionMessage(
+ Message message, UnionType unionType, From from, To to, bool drop = false)
+{
+ TestsFilter* filter =
+ FilterTestEventListener::instance()->install();
+
+ Future<process::Message> future;
+ synchronized (filter->mutex) {
+ EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
+ .With(UnionMessageMatcher(message, unionType, from, to))
+ .WillOnce(testing::DoAll(FutureArgField<0>(
+ &MessageEvent::message,
+ &future),
+ testing::Return(drop)))
+ .RetiresOnSaturation(); // Don't impose any subsequent expectations.
+ }
+
+ return future;
+}
+
+
template <typename PID, typename Method>
Future<Nothing> FutureDispatch(PID pid, Method method, bool drop = false)
{
@@ -387,6 +426,18 @@ void DropMessages(Name name, From from, To to)
}
+template <typename Message, typename UnionType, typename From, typename To>
+void DropUnionMessages(Message message, UnionType unionType, From from, To to)
+{
+ TestsFilter* filter = FilterTestEventListener::instance()->install();
+ synchronized (filter->mutex) {
+ EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
+ .With(UnionMessageMatcher(message, unionType, from, to))
+ .WillRepeatedly(testing::Return(true));
+ }
+}
+
+
template <typename Name, typename From, typename To>
void ExpectNoFutureMessages(Name name, From from, To to)
{
@@ -399,6 +450,19 @@ void ExpectNoFutureMessages(Name name, From from, To to)
}
+template <typename Message, typename UnionType, typename From, typename To>
+void ExpectNoFutureUnionMessages(
+ Message message, UnionType unionType, From from, To to)
+{
+ TestsFilter* filter = FilterTestEventListener::instance()->install();
+ synchronized (filter->mutex) {
+ EXPECT_CALL(filter->mock, filter(testing::A<const MessageEvent&>()))
+ .With(UnionMessageMatcher(message, unionType, from, to))
+ .Times(0);
+ }
+}
+
+
template <typename PID, typename Method>
void DropDispatches(PID pid, Method method)
{
[9/9] mesos git commit: Updated scheduler driver to send RECONCILE
call.
Posted by vi...@apache.org.
Updated scheduler driver to send RECONCILE call.
Review: https://reviews.apache.org/r/36466
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e3bbdf88
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e3bbdf88
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e3bbdf88
Branch: refs/heads/master
Commit: e3bbdf882209178ee9a945fb851728b424d51e25
Parents: 90ed30d
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Jul 10 15:36:13 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:44:00 2015 -0700
----------------------------------------------------------------------
src/sched/sched.cpp | 17 +++++++++++++----
src/tests/reconciliation_tests.cpp | 18 ++++++++++--------
2 files changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e3bbdf88/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 4099bce..2dfa5c5 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -1273,15 +1273,24 @@ protected:
return;
}
- ReconcileTasksMessage message;
- message.mutable_framework_id()->MergeFrom(framework.id());
+ Call call;
+
+ CHECK(framework.has_id());
+ call.mutable_framework_id()->CopyFrom(framework.id());
+ call.set_type(Call::RECONCILE);
+
+ Call::Reconcile* reconcile = call.mutable_reconcile();
foreach (const TaskStatus& status, statuses) {
- message.add_statuses()->MergeFrom(status);
+ Call::Reconcile::Task* task = reconcile->add_tasks();
+ task->mutable_task_id()->CopyFrom(status.task_id());
+ if (status.has_slave_id()) {
+ task->mutable_slave_id()->CopyFrom(status.slave_id());
+ }
}
CHECK_SOME(master);
- send(master.get(), message);
+ send(master.get(), call);
}
private:
http://git-wip-us.apache.org/repos/asf/mesos/blob/e3bbdf88/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 6042d8c..aef245a 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -26,6 +26,8 @@
#include <mesos/mesos.hpp>
#include <mesos/scheduler.hpp>
+#include <mesos/scheduler/scheduler.hpp>
+
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/pid.hpp>
@@ -446,15 +448,15 @@ TEST_F(ReconciliationTest, SlaveInTransition)
statuses.push_back(status);
- Future<ReconcileTasksMessage> reconcileTasksMessage =
- FUTURE_PROTOBUF(ReconcileTasksMessage(), _ , _);
+ Future<mesos::scheduler::Call> reconcileCall = FUTURE_CALL(
+ mesos::scheduler::Call(), mesos::scheduler::Call::RECONCILE, _ , _);
Clock::pause();
driver.reconcileTasks(statuses);
- // Make sure the master received the reconcile tasks message.
- AWAIT_READY(reconcileTasksMessage);
+ // Make sure the master received the reconcile call.
+ AWAIT_READY(reconcileCall);
// The Clock::settle() will ensure that framework would receive
// a status update if it is sent by the master. In this test it
@@ -587,8 +589,8 @@ TEST_F(ReconciliationTest, ImplicitTerminalTask)
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
- Future<ReconcileTasksMessage> reconcileTasksMessage =
- FUTURE_PROTOBUF(ReconcileTasksMessage(), _ , _);
+ Future<mesos::scheduler::Call> reconcileCall = FUTURE_CALL(
+ mesos::scheduler::Call(), mesos::scheduler::Call::RECONCILE, _ , _);
Clock::pause();
@@ -597,8 +599,8 @@ TEST_F(ReconciliationTest, ImplicitTerminalTask)
vector<TaskStatus> statuses;
driver.reconcileTasks(statuses);
- // Make sure the master received the reconcile tasks message.
- AWAIT_READY(reconcileTasksMessage);
+ // Make sure the master received the reconcile call.
+ AWAIT_READY(reconcileCall);
// The Clock::settle() will ensure that framework would receive
// a status update if it is sent by the master. In this test it
[7/9] mesos git commit: Updated scheduler driver to send TEARDOWN
call.
Posted by vi...@apache.org.
Updated scheduler driver to send TEARDOWN call.
Review: https://reviews.apache.org/r/36470
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/51c2b523
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/51c2b523
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/51c2b523
Branch: refs/heads/master
Commit: 51c2b523d0b548b5882371d3a87dab13f9aec4dc
Parents: cf485e2
Author: Vinod Kone <vi...@gmail.com>
Authored: Mon Jul 13 15:51:17 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Jul 17 10:44:00 2015 -0700
----------------------------------------------------------------------
src/sched/sched.cpp | 10 +++++++---
src/tests/exception_tests.cpp | 10 ++++++----
src/tests/fault_tolerance_tests.cpp | 13 +++++++++++--
src/tests/master_allocator_tests.cpp | 4 +++-
src/tests/master_tests.cpp | 23 ++++++++++-------------
src/tests/rate_limiting_tests.cpp | 10 +++++-----
src/tests/slave_recovery_tests.cpp | 8 ++++----
7 files changed, 46 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/51c2b523/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index c563c44..8163796 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -990,10 +990,14 @@ protected:
terminate(self());
if (connected && !failover) {
- UnregisterFrameworkMessage message;
- message.mutable_framework_id()->MergeFrom(framework.id());
+ Call call;
+
+ CHECK(framework.has_id());
+ call.mutable_framework_id()->CopyFrom(framework.id());
+ call.set_type(Call::TEARDOWN);
+
CHECK_SOME(master);
- send(master.get(), message);
+ send(master.get(), call);
}
synchronized (mutex) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/51c2b523/src/tests/exception_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/exception_tests.cpp b/src/tests/exception_tests.cpp
index 9af1674..658e485 100644
--- a/src/tests/exception_tests.cpp
+++ b/src/tests/exception_tests.cpp
@@ -21,6 +21,8 @@
#include <mesos/executor.hpp>
#include <mesos/scheduler.hpp>
+#include <mesos/scheduler/scheduler.hpp>
+
#include <process/gmock.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
@@ -190,13 +192,13 @@ TEST_F(ExceptionTest, DisallowSchedulerCallbacksOnAbort)
AWAIT_READY(rescindMsg);
- Future<UnregisterFrameworkMessage> unregisterMsg =
- FUTURE_PROTOBUF(UnregisterFrameworkMessage(), _, _);
+ Future<mesos::scheduler::Call> teardownCall = FUTURE_CALL(
+ mesos::scheduler::Call(), mesos::scheduler::Call::TEARDOWN, _, _);
driver.stop();
- //Ensures reception of RescindResourceOfferMessage.
- AWAIT_READY(unregisterMsg);
+ // Ensures reception of RescindResourceOfferMessage.
+ AWAIT_READY(teardownCall);
Shutdown();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/51c2b523/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 60ca523..72f4cab 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -1310,8 +1310,7 @@ TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkToExecutorMessage)
MockScheduler sched2;
- FrameworkInfo framework2; // Bug in gcc 4.1.*, must assign on next line.
- framework2 = DEFAULT_FRAMEWORK_INFO;
+ FrameworkInfo framework2 = DEFAULT_FRAMEWORK_INFO;
framework2.mutable_id()->MergeFrom(frameworkId);
MesosSchedulerDriver driver2(
@@ -1335,9 +1334,19 @@ TEST_F(FaultToleranceTest, SchedulerFailoverFrameworkToExecutorMessage)
EXPECT_CALL(exec, frameworkMessage(_, _))
.WillOnce(FutureArg<1>(&frameworkMessage));
+ // Since 'sched2' doesn't receive any offers the framework message
+ // should go through the master.
+ Future<mesos::scheduler::Call> messageCall = FUTURE_CALL(
+ mesos::scheduler::Call(),
+ mesos::scheduler::Call::MESSAGE,
+ _,
+ master.get());
+
driver2.sendFrameworkMessage(
DEFAULT_EXECUTOR_ID, offers.get()[0].slave_id(), "hello world");
+ AWAIT_READY(messageCall);
+
AWAIT_EQ("hello world", frameworkMessage);
EXPECT_CALL(exec, shutdown(_))
http://git-wip-us.apache.org/repos/asf/mesos/blob/51c2b523/src/tests/master_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_allocator_tests.cpp b/src/tests/master_allocator_tests.cpp
index 534b248..147f510 100644
--- a/src/tests/master_allocator_tests.cpp
+++ b/src/tests/master_allocator_tests.cpp
@@ -29,6 +29,8 @@
#include <mesos/module/allocator.hpp>
+#include <mesos/scheduler/scheduler.hpp>
+
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
@@ -440,7 +442,7 @@ TYPED_TEST(MasterAllocatorTest, SchedulerFailover)
// When we shut down the first framework, we don't want it to tell
// the master it's shutting down so that the master will wait to see
// if it fails over.
- DROP_PROTOBUFS(UnregisterFrameworkMessage(), _, _);
+ DROP_CALLS(mesos::scheduler::Call(), mesos::scheduler::Call::TEARDOWN, _, _);
Future<Nothing> deactivateFramework;
EXPECT_CALL(allocator, deactivateFramework(_))
http://git-wip-us.apache.org/repos/asf/mesos/blob/51c2b523/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index fdee267..9205ec4 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -241,13 +241,10 @@ TEST_F(MasterTest, ShutdownFrameworkWhileTaskRunning)
AWAIT_READY(update);
- // Set expectation that Master receives UnregisterFrameworkMessage,
- // which triggers marking running tasks as killed.
- UnregisterFrameworkMessage message;
- message.mutable_framework_id()->MergeFrom(offer.framework_id());
-
- Future<UnregisterFrameworkMessage> unregisterFrameworkMessage =
- FUTURE_PROTOBUF(message, _, master.get());
+ // Set expectation that Master receives teardown call, which
+ // triggers marking running tasks as killed.
+ Future<mesos::scheduler::Call> teardownCall = FUTURE_CALL(
+ mesos::scheduler::Call(), mesos::scheduler::Call::TEARDOWN, _, _);
// Set expectation that Executor's shutdown callback is invoked.
Future<Nothing> shutdown;
@@ -258,14 +255,14 @@ TEST_F(MasterTest, ShutdownFrameworkWhileTaskRunning)
driver.stop();
driver.join();
- // Wait for UnregisterFrameworkMessage message to be dispatched and
- // executor's shutdown callback to be called.
- AWAIT_READY(unregisterFrameworkMessage);
+ // Wait for teardown call to be dispatched and executor's shutdown
+ // callback to be called.
+ AWAIT_READY(teardownCall);
AWAIT_READY(shutdown);
- // We have to be sure the UnregisterFrameworkMessage is processed
- // completely and running tasks enter a terminal state before we
- // request the master state.
+ // We have to be sure the teardown call is processed completely and
+ // running tasks enter a terminal state before we request the master
+ // state.
Clock::pause();
Clock::settle();
Clock::resume();
http://git-wip-us.apache.org/repos/asf/mesos/blob/51c2b523/src/tests/rate_limiting_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/rate_limiting_tests.cpp b/src/tests/rate_limiting_tests.cpp
index 49d907b..6a93df0 100644
--- a/src/tests/rate_limiting_tests.cpp
+++ b/src/tests/rate_limiting_tests.cpp
@@ -178,10 +178,10 @@ TEST_F(RateLimitingTest, NoRateLimiting)
driver->join();
delete driver;
- // The fact that UnregisterFrameworkMessage (the 2nd message from
- // 'sched' that reaches Master after its registration) gets
- // processed without Clock advances proves that the framework is
- // given unlimited rate.
+ // The fact that the teardown call (the 2nd call from the scheduler
+ // that reaches Master after its registration) gets processed
+ // without Clock advances proves that the framework is given
+ // unlimited rate.
AWAIT_READY(removeFramework);
// For metrics endpoint.
@@ -724,7 +724,7 @@ TEST_F(RateLimitingTest, SamePrincipalFrameworks)
driver1->join();
delete driver1;
- // Advance to let UnregisterFrameworkMessage come through.
+ // Advance to let the teardown call come through.
Clock::settle();
Clock::advance(Seconds(1));
http://git-wip-us.apache.org/repos/asf/mesos/blob/51c2b523/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index ff7aaf9..de2fc28 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -2151,15 +2151,15 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework)
this->Stop(slave.get());
delete containerizer1.get();
- Future<UnregisterFrameworkMessage> unregisterFrameworkMessage =
- FUTURE_PROTOBUF(UnregisterFrameworkMessage(), _, _);
+ Future<mesos::scheduler::Call> teardownCall = FUTURE_CALL(
+ mesos::scheduler::Call(), mesos::scheduler::Call::TEARDOWN, _, _);
// Now stop the framework.
driver.stop();
driver.join();
- // Wait util the framework is removed.
- AWAIT_READY(unregisterFrameworkMessage);
+ // Wait until the framework is removed.
+ AWAIT_READY(teardownCall);
Future<ShutdownFrameworkMessage> shutdownFrameworkMessage =
FUTURE_PROTOBUF(ShutdownFrameworkMessage(), _, _);