You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/02/04 23:42:34 UTC
[1/5] mesos git commit: Added a process ID argument to Slave
constructor.
Repository: mesos
Updated Branches:
refs/heads/master f2fda2d71 -> 2dd8dc85a
Added a process ID argument to Slave constructor.
This change modifies the existing `Slave` constructor to take in the process ID
string as an argument. This is necessary for testing recovery of HTTP based
executors. (Previously, every invocation of `StartSlave` used to generate a new
ID via `ID::generate`). There was no process delegate set via
`process::initialize` that led to problems for the existing HTTP based executor
to connect back to the slave.
Also, modified the tests to introduce a new `StartSlave` function that takes
this process ID as argument.
Review: https://reviews.apache.org/r/42181/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d1068c17
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d1068c17
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d1068c17
Branch: refs/heads/master
Commit: d1068c17981d04aa621ec557c73f2d060c26e2fe
Parents: f2fda2d
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Feb 4 14:39:47 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Feb 4 14:39:47 2016 -0800
----------------------------------------------------------------------
src/slave/slave.cpp | 5 +++--
src/slave/slave.hpp | 3 ++-
src/tests/cluster.cpp | 3 +++
src/tests/cluster.hpp | 1 +
src/tests/mesos.cpp | 22 ++++++++++++++++++++++
src/tests/mesos.hpp | 5 +++++
6 files changed, 36 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d1068c17/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 1f4c836..9dda3a2 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -116,7 +116,8 @@ namespace slave {
using namespace state;
-Slave::Slave(const slave::Flags& _flags,
+Slave::Slave(const std::string& id,
+ const slave::Flags& _flags,
MasterDetector* _detector,
Containerizer* _containerizer,
Files* _files,
@@ -124,7 +125,7 @@ Slave::Slave(const slave::Flags& _flags,
StatusUpdateManager* _statusUpdateManager,
ResourceEstimator* _resourceEstimator,
QoSController* _qosController)
- : ProcessBase(process::ID::generate("slave")),
+ : ProcessBase(id),
state(RECOVERING),
flags(_flags),
completedFrameworks(MAX_COMPLETED_FRAMEWORKS),
http://git-wip-us.apache.org/repos/asf/mesos/blob/d1068c17/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 6326402..a3830ff 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -99,7 +99,8 @@ struct HttpConnection;
class Slave : public ProtobufProcess<Slave>
{
public:
- Slave(const Flags& flags,
+ Slave(const std::string& id,
+ const Flags& flags,
MasterDetector* detector,
Containerizer* containerizer,
Files* files,
http://git-wip-us.apache.org/repos/asf/mesos/blob/d1068c17/src/tests/cluster.cpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.cpp b/src/tests/cluster.cpp
index 1a3a038..084fb1c 100644
--- a/src/tests/cluster.cpp
+++ b/src/tests/cluster.cpp
@@ -20,6 +20,7 @@
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
+#include <process/id.hpp>
#include <process/limiter.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
@@ -346,6 +347,7 @@ void Cluster::Slaves::shutdown()
Try<process::PID<slave::Slave>> Cluster::Slaves::start(
const slave::Flags& flags,
+ const Option<std::string>& id,
const Option<slave::Containerizer*>& containerizer,
const Option<MasterDetector*>& detector,
const Option<slave::GarbageCollector*>& gc,
@@ -406,6 +408,7 @@ Try<process::PID<slave::Slave>> Cluster::Slaves::start(
slave.flags = flags;
slave.slave = new slave::Slave(
+ id.isSome() ? id.get() : process::ID::generate("slave"),
flags,
detector.getOrElse(slave.detector.get()),
slave.containerizer,
http://git-wip-us.apache.org/repos/asf/mesos/blob/d1068c17/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 576dcb8..99a785a 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -171,6 +171,7 @@ public:
// Start a new slave with the provided flags and injections.
Try<process::PID<slave::Slave>> start(
const slave::Flags& flags = slave::Flags(),
+ const Option<std::string>& id = None(),
const Option<slave::Containerizer*>& containerizer = None(),
const Option<MasterDetector*>& detector = None(),
const Option<slave::GarbageCollector*>& gc = None(),
http://git-wip-us.apache.org/repos/asf/mesos/blob/d1068c17/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 8f1f8d6..06e774b 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -265,6 +265,19 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
{
return cluster.slaves.start(
flags.isNone() ? CreateSlaveFlags() : flags.get(),
+ None(),
+ containerizer);
+}
+
+
+Try<PID<slave::Slave>> MesosTest::StartSlave(
+ slave::Containerizer* containerizer,
+ const std::string& id,
+ const Option<slave::Flags>& flags)
+{
+ return cluster.slaves.start(
+ flags.isNone() ? CreateSlaveFlags() : flags.get(),
+ id,
containerizer);
}
@@ -276,6 +289,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
{
return cluster.slaves.start(
flags.isNone() ? CreateSlaveFlags() : flags.get(),
+ None(),
containerizer,
detector);
}
@@ -288,6 +302,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
return cluster.slaves.start(
flags.isNone() ? CreateSlaveFlags() : flags.get(),
None(),
+ None(),
detector);
}
@@ -300,6 +315,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
return cluster.slaves.start(
flags.isNone() ? CreateSlaveFlags() : flags.get(),
None(),
+ None(),
detector,
gc);
}
@@ -314,6 +330,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
Try<PID<slave::Slave>> pid = cluster.slaves.start(
flags.isNone() ? CreateSlaveFlags() : flags.get(),
+ None(),
containerizer,
detector);
@@ -338,6 +355,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
None(),
None(),
None(),
+ None(),
resourceEstimator);
}
@@ -351,6 +369,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
Try<PID<slave::Slave>> pid = cluster.slaves.start(
flags.isNone() ? CreateSlaveFlags() : flags.get(),
+ None(),
containerizer,
None(),
None(),
@@ -375,6 +394,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
{
return cluster.slaves.start(
flags.isNone() ? CreateSlaveFlags() : flags.get(),
+ None(),
containerizer,
None(),
None(),
@@ -394,6 +414,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
None(),
None(),
None(),
+ None(),
qoSController);
}
@@ -405,6 +426,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
{
return cluster.slaves.start(
flags.isNone() ? CreateSlaveFlags() : flags.get(),
+ None(),
containerizer,
None(),
None(),
http://git-wip-us.apache.org/repos/asf/mesos/blob/d1068c17/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index c2bae47..e07d8aa 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -164,6 +164,11 @@ protected:
slave::Containerizer* containerizer,
const Option<slave::Flags>& flags = None());
+ virtual Try<process::PID<slave::Slave> > StartSlave(
+ slave::Containerizer* containerizer,
+ const std::string& id,
+ const Option<slave::Flags>& flags = None());
+
// Starts a slave with the specified containerizer, detector and flags.
virtual Try<process::PID<slave::Slave> > StartSlave(
slave::Containerizer* containerizer,
[2/5] mesos git commit: Modified existing usage of Slave constructor.
Posted by vi...@apache.org.
Modified existing usage of Slave constructor.
This change adds the argument process ID wherever the `Slave` object is
constructed.
Review: https://reviews.apache.org/r/43131/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4a211e5f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4a211e5f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4a211e5f
Branch: refs/heads/master
Commit: 4a211e5f9f95dbffb819d5802ffcb8617be202b8
Parents: d1068c1
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Feb 4 14:40:56 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Feb 4 14:40:56 2016 -0800
----------------------------------------------------------------------
src/local/local.cpp | 1 +
src/slave/main.cpp | 5 ++++-
src/tests/mesos.cpp | 1 +
3 files changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a211e5f/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 2688f9d..359fc54 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -384,6 +384,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
// NOTE: At this point detector is already initialized by the
// Master.
Slave* slave = new Slave(
+ process::ID::generate("slave"),
flags,
detector,
containerizer.get(),
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a211e5f/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index a412ceb..22b8330 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -208,7 +208,9 @@ int main(int argc, char** argv)
os::setenv("LIBPROCESS_ADVERTISE_PORT", advertise_port.get());
}
- process::initialize("slave(1)");
+ const string id = process::ID::generate("slave"); // Process ID.
+
+ process::initialize(id);
logging::initialize(argv[0], flags, true); // Catch signals.
@@ -304,6 +306,7 @@ int main(int argc, char** argv)
LOG(INFO) << "Starting Mesos slave";
Slave* slave = new Slave(
+ id,
flags,
detector.get(),
containerizer.get(),
http://git-wip-us.apache.org/repos/asf/mesos/blob/4a211e5f/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 06e774b..8fe28ae 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -555,6 +555,7 @@ MockSlave::MockSlave(const slave::Flags& flags,
slave::Containerizer* containerizer,
const Option<mesos::slave::QoSController*>& _qosController)
: slave::Slave(
+ process::ID::generate("slave"),
flags,
detector,
containerizer,
[3/5] mesos git commit: Drop `404 NotFound` responses in the executor
library.
Posted by vi...@apache.org.
Drop `404 NotFound` responses in the executor library.
Previously, we did not use to drop `404 NotFound` responses in the library and
send `Event::Error` to executor. However, this can be trigerred upon an agent
restart when it has not yet set up HTTP routes.
Review: https://reviews.apache.org/r/42844/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e3c3cf88
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e3c3cf88
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e3c3cf88
Branch: refs/heads/master
Commit: e3c3cf88688d08db5699d5c70616702d92c361e5
Parents: 4a211e5
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Feb 4 14:41:15 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Feb 4 14:41:15 2016 -0800
----------------------------------------------------------------------
src/executor/executor.cpp | 8 ++++++++
1 file changed, 8 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e3c3cf88/src/executor/executor.cpp
----------------------------------------------------------------------
diff --git a/src/executor/executor.cpp b/src/executor/executor.cpp
index 92334ff..c5f22e7 100644
--- a/src/executor/executor.cpp
+++ b/src/executor/executor.cpp
@@ -503,6 +503,14 @@ protected:
return;
}
+ if (response->code == process::http::Status::NOT_FOUND) {
+ // This could happen if the agent libprocess process has not yet set up
+ // HTTP routes.
+ LOG(WARNING) << "Received '" << response->status << "' ("
+ << response->body << ") for " << call.type();
+ return;
+ }
+
// We should not be able to get here since we already do validation
// of calls before sending them to the agent.
error("Received unexpected '" + response->status + "' (" +
[4/5] mesos git commit: Added an example executor based on the new V1
API.
Posted by vi...@apache.org.
Added an example executor based on the new V1 API.
This change adds a custom executor based on the new executor library.
Review: https://reviews.apache.org/r/42185/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8bb86676
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8bb86676
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8bb86676
Branch: refs/heads/master
Commit: 8bb8667602b78da74374e669a6ebfdb8e5d56972
Parents: e3c3cf8
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Feb 4 14:41:33 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Feb 4 14:41:33 2016 -0800
----------------------------------------------------------------------
src/Makefile.am | 5 +
src/examples/test_http_executor.cpp | 241 +++++++++++++++++++++++++++++++
2 files changed, 246 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8bb86676/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 420aa40..22f5131 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1601,6 +1601,11 @@ test_executor_SOURCES = examples/test_executor.cpp
test_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
test_executor_LDADD = libmesos.la $(LDADD)
+check_PROGRAMS += test-http-executor
+test_http_executor_SOURCES = examples/test_http_executor.cpp
+test_http_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
+test_http_executor_LDADD = libmesos.la $(LDADD)
+
check_PROGRAMS += long-lived-framework
long_lived_framework_SOURCES = examples/long_lived_framework.cpp
long_lived_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
http://git-wip-us.apache.org/repos/asf/mesos/blob/8bb86676/src/examples/test_http_executor.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_http_executor.cpp b/src/examples/test_http_executor.cpp
new file mode 100644
index 0000000..4916e0e
--- /dev/null
+++ b/src/examples/test_http_executor.cpp
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <iostream>
+#include <queue>
+#include <string>
+
+#include <mesos/http.hpp>
+
+#include <mesos/v1/executor.hpp>
+#include <mesos/v1/mesos.hpp>
+
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include <stout/exit.hpp>
+#include <stout/linkedhashmap.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/uuid.hpp>
+
+using std::cout;
+using std::endl;
+using std::queue;
+using std::string;
+
+using mesos::v1::ExecutorID;
+using mesos::v1::FrameworkID;
+using mesos::v1::TaskID;
+using mesos::v1::TaskInfo;
+using mesos::v1::TaskState;
+using mesos::v1::TaskStatus;
+
+using mesos::v1::executor::Call;
+using mesos::v1::executor::Event;
+using mesos::v1::executor::Mesos;
+
+using process::spawn;
+using process::wait;
+
+
+class TestExecutor: public process::Process<TestExecutor>
+{
+public:
+ TestExecutor(const FrameworkID& _frameworkId, const ExecutorID& _executorId)
+ : frameworkId(_frameworkId),
+ executorId(_executorId),
+ mesos(mesos::ContentType::PROTOBUF,
+ process::defer(self(), &Self::connected),
+ process::defer(self(), &Self::disconnected),
+ process::defer(self(), &Self::received, lambda::_1)),
+ state(DISCONNECTED) {}
+
+ void connected()
+ {
+ state = CONNECTED;
+
+ doReliableRegistration();
+ }
+
+ void doReliableRegistration()
+ {
+ if (state == SUBSCRIBED || state == DISCONNECTED) {
+ return;
+ }
+
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.mutable_executor_id()->CopyFrom(executorId);
+
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+
+ // Send all unacknowledged updates.
+ foreach (const Call::Update& update, updates.values()) {
+ subscribe->add_unacknowledged_updates()->MergeFrom(update);
+ }
+
+ // Send all unacknowledged tasks.
+ foreach (const TaskInfo& task, tasks.values()) {
+ subscribe->add_unacknowledged_tasks()->MergeFrom(task);
+ }
+
+ mesos.send(call);
+
+ process::delay(Seconds(1), self(), &Self::doReliableRegistration);
+ }
+
+ void disconnected()
+ {
+ state = DISCONNECTED;
+ }
+
+ void sendStatusUpdate(const TaskInfo& task, const TaskState& state)
+ {
+ UUID uuid = UUID::random();
+
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(task.task_id());
+ status.mutable_executor_id()->CopyFrom(executorId);
+ status.set_state(state);
+ status.set_source(TaskStatus::SOURCE_EXECUTOR);
+ status.set_uuid(uuid.toBytes());
+
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.mutable_executor_id()->CopyFrom(executorId);
+
+ call.set_type(Call::UPDATE);
+
+ call.mutable_update()->mutable_status()->CopyFrom(status);
+
+ // Capture the status update.
+ updates[uuid] = call.update();
+
+ mesos.send(call);
+ }
+
+ void received(queue<Event> events)
+ {
+ while (!events.empty()) {
+ Event event = events.front();
+ events.pop();
+
+ switch (event.type()) {
+ case Event::SUBSCRIBED: {
+ cout << "Received a SUBSCRIBED event" << endl;
+
+ state = SUBSCRIBED;
+ break;
+ }
+
+ case Event::LAUNCH: {
+ const TaskInfo& task = event.launch().task();
+ tasks[task.task_id()] = task;
+
+ cout << "Starting task " << task.task_id().value() << endl;
+
+ sendStatusUpdate(task, TaskState::TASK_RUNNING);
+
+ // This is where one would perform the requested task.
+
+ cout << "Finishing task " << task.task_id().value() << endl;
+
+ sendStatusUpdate(task, TaskState::TASK_FINISHED);
+ break;
+ }
+
+ case Event::KILL: {
+ cout << "Received a KILL event" << endl;
+ break;
+ }
+
+ case Event::ACKNOWLEDGED: {
+ cout << "Received an ACKNOWLEDGED event" << endl;
+
+ // Remove the corresponding update.
+ updates.erase(UUID::fromBytes(event.acknowledged().uuid()));
+
+ // Remove the corresponding task.
+ tasks.erase(event.acknowledged().task_id());
+ break;
+ }
+
+ case Event::MESSAGE: {
+ cout << "Received a MESSAGE event" << endl;
+ break;
+ }
+
+ case Event::SHUTDOWN: {
+ cout << "Received a SHUTDOWN event" << endl;
+ break;
+ }
+
+ case Event::ERROR: {
+ cout << "Received an ERROR event" << endl;
+ break;
+ }
+ }
+ }
+ }
+
+private:
+ const FrameworkID frameworkId;
+ const ExecutorID executorId;
+ Mesos mesos;
+ enum State
+ {
+ CONNECTED,
+ DISCONNECTED,
+ SUBSCRIBED
+ } state;
+
+ LinkedHashMap<UUID, Call::Update> updates; // Unacknowledged updates.
+ LinkedHashMap<TaskID, TaskInfo> tasks; // Unacknowledged tasks.
+};
+
+
+int main()
+{
+ FrameworkID frameworkId;
+ ExecutorID executorId;
+
+ Option<string> value;
+
+ value = os::getenv("MESOS_FRAMEWORK_ID");
+ if (value.isNone()) {
+ EXIT(1) << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment";
+ }
+ frameworkId.set_value(value.get());
+
+ value = os::getenv("MESOS_EXECUTOR_ID");
+ if (value.isNone()) {
+ EXIT(1) << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment";
+ }
+ executorId.set_value(value.get());
+
+ process::Owned<TestExecutor> executor(
+ new TestExecutor(frameworkId, executorId));
+
+ process::spawn(executor.get());
+ process::wait(executor.get());
+
+ return EXIT_SUCCESS;
+}
[5/5] mesos git commit: Added tests for recovery for HTTP based
executors.
Posted by vi...@apache.org.
Added tests for recovery for HTTP based executors.
Review: https://reviews.apache.org/r/42186/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2dd8dc85
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2dd8dc85
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2dd8dc85
Branch: refs/heads/master
Commit: 2dd8dc85a1443b2192c548cabeab41aad6da3e17
Parents: 8bb8667
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Feb 4 14:41:42 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Feb 4 14:41:42 2016 -0800
----------------------------------------------------------------------
src/tests/slave_recovery_tests.cpp | 228 ++++++++++++++++++++++++++++++++
1 file changed, 228 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2dd8dc85/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 6683a08..bccdf37 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -21,7 +21,10 @@
#include <gtest/gtest.h>
+#include <mesos/v1/executor/executor.hpp>
+
#include <mesos/executor.hpp>
+#include <mesos/http.hpp>
#include <mesos/resources.hpp>
#include <mesos/scheduler.hpp>
@@ -58,6 +61,7 @@
#include "tests/allocator.hpp"
#include "tests/containerizer.hpp"
+#include "tests/flags.hpp"
#include "tests/mesos.hpp"
#include "tests/utils.hpp"
@@ -69,6 +73,8 @@ using google::protobuf::RepeatedPtrField;
using mesos::internal::master::Master;
+using mesos::v1::executor::Call;
+
using std::map;
using std::string;
using std::vector;
@@ -411,6 +417,111 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
// The slave is stopped before the first update for a task is received
+// from the HTTP based executor. When it comes back up with recovery=reconnect,
+// make sure the executor subscribes and the slave properly sends the update.
+TYPED_TEST(SlaveRecoveryTest, ReconnectHTTPExecutor)
+{
+ Try<PID<Master> > master = this->StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = this->CreateSlaveFlags();
+
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
+ ASSERT_SOME(containerizer1);
+
+ // Start the slave with a static process ID. This allows the executor to
+ // reconnect with the slave upon a process restart.
+ const std::string id("slave");
+
+ Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), id, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+
+ // Enable checkpointing for the framework.
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true);
+
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ // TODO(anand): Use the HTTP based command executor once MESOS-3558 is
+ // resolved.
+ ExecutorInfo executorInfo =
+ CREATE_EXECUTOR_INFO(
+ "http",
+ path::join(tests::flags.build_dir, "src", "test-http-executor"));
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->CopyFrom(offers.get()[0].resources());
+ task.mutable_executor()->CopyFrom(executorInfo);
+
+ Future<v1::executor::Call> updateCall1 =
+ DROP_HTTP_CALL(Call(), Call::UPDATE, _, ContentType::PROTOBUF);
+
+ Future<v1::executor::Call> updateCall2 =
+ DROP_HTTP_CALL(Call(), Call::UPDATE, _, ContentType::PROTOBUF);
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ // Stop the slave before the status updates are received.
+ AWAIT_READY(updateCall1);
+ AWAIT_READY(updateCall2);
+
+ this->Stop(slave.get());
+ delete containerizer1.get();
+
+ Future<v1::executor::Call> subscribeCall =
+ FUTURE_HTTP_CALL(Call(), Call::SUBSCRIBE, _, ContentType::PROTOBUF);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ // Restart the slave (use same flags) with a new containerizer.
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
+ ASSERT_SOME(containerizer2);
+
+ slave = this->StartSlave(containerizer2.get(), id, flags);
+ ASSERT_SOME(slave);
+
+ // Ensure that the executor subscribes again.
+ AWAIT_READY(subscribeCall);
+
+ ASSERT_EQ(2, subscribeCall.get().subscribe().unacknowledged_updates().size());
+ ASSERT_EQ(1, subscribeCall.get().subscribe().unacknowledged_tasks().size());
+
+ // Scheduler should receive the recovered update.
+ AWAIT_READY(status);
+ ASSERT_EQ(TASK_RUNNING, status.get().state());
+
+ driver.stop();
+ driver.join();
+
+ this->Shutdown();
+ delete containerizer2.get();
+}
+
+
+// The slave is stopped before the first update for a task is received
// from the executor. When it comes back up with recovery=reconnect, make
// sure the executor re-registers and the slave properly sends the update.
TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
@@ -925,6 +1036,123 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
}
+// The slave is stopped before a terminal update is received from the HTTP
+// based executor. The slave is then restarted in recovery=cleanup mode.
+// It kills the executor, and terminates. Master should then send TASK_LOST.
+TYPED_TEST(SlaveRecoveryTest, CleanupHTTPExecutor)
+{
+ Try<PID<Master> > master = this->StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = this->CreateSlaveFlags();
+
+ Fetcher fetcher;
+
+ Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
+ ASSERT_SOME(containerizer1);
+
+ // Start the slave with a static process ID. This allows the executor to
+ // reconnect with the slave upon a process restart.
+ const std::string id("slave");
+
+ Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), id, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+
+ // Enable checkpointing for the framework.
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true);
+
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ // TODO(anand): Use the HTTP based command executor once MESOS-3558 is
+ // resolved.
+ ExecutorInfo executorInfo =
+ CREATE_EXECUTOR_INFO(
+ "http",
+ path::join(tests::flags.build_dir, "src", "test-http-executor"));
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->CopyFrom(offers.get()[0].resources());
+ task.mutable_executor()->CopyFrom(executorInfo);
+
+ Future<v1::executor::Call> updateCall1 =
+ DROP_HTTP_CALL(Call(), Call::UPDATE, _, ContentType::PROTOBUF);
+
+ Future<v1::executor::Call> updateCall2 =
+ DROP_HTTP_CALL(Call(), Call::UPDATE, _, ContentType::PROTOBUF);
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ // Stop the slave before the status updates are received.
+ AWAIT_READY(updateCall1);
+ AWAIT_READY(updateCall2);
+
+ this->Stop(slave.get());
+ delete containerizer1.get();
+
+ // Slave in cleanup mode shouldn't re-register with the master and
+ // hence no offers should be made by the master.
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .Times(0);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
+
+ EXPECT_CALL(sched, slaveLost(_, _))
+ .Times(AtMost(1));
+
+ // Restart the slave in 'cleanup' recovery mode with a new isolator.
+ Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
+ ASSERT_SOME(containerizer2);
+
+ flags.recover = "cleanup";
+ slave = this->StartSlave(containerizer2.get(), id, flags);
+ ASSERT_SOME(slave);
+
+ Clock::pause();
+
+ // Now advance time until the reaper reaps the executor.
+ while (status.isPending()) {
+ Clock::advance(process::MAX_REAP_INTERVAL());
+ Clock::settle();
+ }
+
+ // Wait for recovery to complete.
+ AWAIT_READY(__recover);
+
+ // Scheduler should receive the TASK_LOST update.
+ AWAIT_READY(status);
+ ASSERT_EQ(TASK_LOST, status.get().state());
+
+ driver.stop();
+ driver.join();
+
+ this->Shutdown();
+ delete containerizer2.get();
+}
+
+
// The slave is stopped after a non-terminal update is received.
// Slave is restarted in recovery=cleanup mode. It kills the command
// executor, and terminates. Master should then send TASK_LOST.