You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/02/04 23:42:34 UTC

[1/5] mesos git commit: Added a process ID argument to Slave constructor.

Repository: mesos
Updated Branches:
  refs/heads/master f2fda2d71 -> 2dd8dc85a


Added a process ID argument to Slave constructor.

This change modifies the existing `Slave` constructor to take in the process ID
string as an argument. This is necessary for testing recovery of HTTP based
executors. (Previously, every invocation of `StartSlave` used to generate a new
ID via `ID::generate`). There was no process delegate set via
`process::initialize` that led to problems for the existing HTTP based executor
to connect back to the slave.

Also, modified the tests to introduce a new `StartSlave` function that takes
this process ID as argument.

Review: https://reviews.apache.org/r/42181/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d1068c17
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d1068c17
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d1068c17

Branch: refs/heads/master
Commit: d1068c17981d04aa621ec557c73f2d060c26e2fe
Parents: f2fda2d
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Feb 4 14:39:47 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Feb 4 14:39:47 2016 -0800

----------------------------------------------------------------------
 src/slave/slave.cpp   |  5 +++--
 src/slave/slave.hpp   |  3 ++-
 src/tests/cluster.cpp |  3 +++
 src/tests/cluster.hpp |  1 +
 src/tests/mesos.cpp   | 22 ++++++++++++++++++++++
 src/tests/mesos.hpp   |  5 +++++
 6 files changed, 36 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d1068c17/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 1f4c836..9dda3a2 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -116,7 +116,8 @@ namespace slave {
 using namespace state;
 
 
-Slave::Slave(const slave::Flags& _flags,
+Slave::Slave(const std::string& id,
+             const slave::Flags& _flags,
              MasterDetector* _detector,
              Containerizer* _containerizer,
              Files* _files,
@@ -124,7 +125,7 @@ Slave::Slave(const slave::Flags& _flags,
              StatusUpdateManager* _statusUpdateManager,
              ResourceEstimator* _resourceEstimator,
              QoSController* _qosController)
-  : ProcessBase(process::ID::generate("slave")),
+  : ProcessBase(id),
     state(RECOVERING),
     flags(_flags),
     completedFrameworks(MAX_COMPLETED_FRAMEWORKS),

http://git-wip-us.apache.org/repos/asf/mesos/blob/d1068c17/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 6326402..a3830ff 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -99,7 +99,8 @@ struct HttpConnection;
 class Slave : public ProtobufProcess<Slave>
 {
 public:
-  Slave(const Flags& flags,
+  Slave(const std::string& id,
+        const Flags& flags,
         MasterDetector* detector,
         Containerizer* containerizer,
         Files* files,

http://git-wip-us.apache.org/repos/asf/mesos/blob/d1068c17/src/tests/cluster.cpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.cpp b/src/tests/cluster.cpp
index 1a3a038..084fb1c 100644
--- a/src/tests/cluster.cpp
+++ b/src/tests/cluster.cpp
@@ -20,6 +20,7 @@
 #include <process/future.hpp>
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
+#include <process/id.hpp>
 #include <process/limiter.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
@@ -346,6 +347,7 @@ void Cluster::Slaves::shutdown()
 
 Try<process::PID<slave::Slave>> Cluster::Slaves::start(
     const slave::Flags& flags,
+    const Option<std::string>& id,
     const Option<slave::Containerizer*>& containerizer,
     const Option<MasterDetector*>& detector,
     const Option<slave::GarbageCollector*>& gc,
@@ -406,6 +408,7 @@ Try<process::PID<slave::Slave>> Cluster::Slaves::start(
   slave.flags = flags;
 
   slave.slave = new slave::Slave(
+      id.isSome() ? id.get() : process::ID::generate("slave"),
       flags,
       detector.getOrElse(slave.detector.get()),
       slave.containerizer,

http://git-wip-us.apache.org/repos/asf/mesos/blob/d1068c17/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index 576dcb8..99a785a 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -171,6 +171,7 @@ public:
     // Start a new slave with the provided flags and injections.
     Try<process::PID<slave::Slave>> start(
         const slave::Flags& flags = slave::Flags(),
+        const Option<std::string>& id = None(),
         const Option<slave::Containerizer*>& containerizer = None(),
         const Option<MasterDetector*>& detector = None(),
         const Option<slave::GarbageCollector*>& gc = None(),

http://git-wip-us.apache.org/repos/asf/mesos/blob/d1068c17/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 8f1f8d6..06e774b 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -265,6 +265,19 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
 {
   return cluster.slaves.start(
       flags.isNone() ? CreateSlaveFlags() : flags.get(),
+      None(),
+      containerizer);
+}
+
+
+Try<PID<slave::Slave>> MesosTest::StartSlave(
+    slave::Containerizer* containerizer,
+    const std::string& id,
+    const Option<slave::Flags>& flags)
+{
+  return cluster.slaves.start(
+      flags.isNone() ? CreateSlaveFlags() : flags.get(),
+      id,
       containerizer);
 }
 
@@ -276,6 +289,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
 {
   return cluster.slaves.start(
       flags.isNone() ? CreateSlaveFlags() : flags.get(),
+      None(),
       containerizer,
       detector);
 }
@@ -288,6 +302,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
   return cluster.slaves.start(
       flags.isNone() ? CreateSlaveFlags() : flags.get(),
       None(),
+      None(),
       detector);
 }
 
@@ -300,6 +315,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
   return cluster.slaves.start(
       flags.isNone() ? CreateSlaveFlags() : flags.get(),
       None(),
+      None(),
       detector,
       gc);
 }
@@ -314,6 +330,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
 
   Try<PID<slave::Slave>> pid = cluster.slaves.start(
       flags.isNone() ? CreateSlaveFlags() : flags.get(),
+      None(),
       containerizer,
       detector);
 
@@ -338,6 +355,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
       None(),
       None(),
       None(),
+      None(),
       resourceEstimator);
 }
 
@@ -351,6 +369,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
 
   Try<PID<slave::Slave>> pid = cluster.slaves.start(
       flags.isNone() ? CreateSlaveFlags() : flags.get(),
+      None(),
       containerizer,
       None(),
       None(),
@@ -375,6 +394,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
 {
   return cluster.slaves.start(
       flags.isNone() ? CreateSlaveFlags() : flags.get(),
+      None(),
       containerizer,
       None(),
       None(),
@@ -394,6 +414,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
       None(),
       None(),
       None(),
+      None(),
       qoSController);
 }
 
@@ -405,6 +426,7 @@ Try<PID<slave::Slave>> MesosTest::StartSlave(
 {
   return cluster.slaves.start(
       flags.isNone() ? CreateSlaveFlags() : flags.get(),
+      None(),
       containerizer,
       None(),
       None(),

http://git-wip-us.apache.org/repos/asf/mesos/blob/d1068c17/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index c2bae47..e07d8aa 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -164,6 +164,11 @@ protected:
       slave::Containerizer* containerizer,
       const Option<slave::Flags>& flags = None());
 
+  virtual Try<process::PID<slave::Slave> > StartSlave(
+      slave::Containerizer* containerizer,
+      const std::string& id,
+      const Option<slave::Flags>& flags = None());
+
   // Starts a slave with the specified containerizer, detector and flags.
   virtual Try<process::PID<slave::Slave> > StartSlave(
       slave::Containerizer* containerizer,


[2/5] mesos git commit: Modified existing usage of Slave constructor.

Posted by vi...@apache.org.
Modified existing usage of Slave constructor.

This change adds the argument process ID wherever the `Slave` object is
constructed.

Review: https://reviews.apache.org/r/43131/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4a211e5f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4a211e5f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4a211e5f

Branch: refs/heads/master
Commit: 4a211e5f9f95dbffb819d5802ffcb8617be202b8
Parents: d1068c1
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Feb 4 14:40:56 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Feb 4 14:40:56 2016 -0800

----------------------------------------------------------------------
 src/local/local.cpp | 1 +
 src/slave/main.cpp  | 5 ++++-
 src/tests/mesos.cpp | 1 +
 3 files changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4a211e5f/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 2688f9d..359fc54 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -384,6 +384,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
     // NOTE: At this point detector is already initialized by the
     // Master.
     Slave* slave = new Slave(
+        process::ID::generate("slave"),
         flags,
         detector,
         containerizer.get(),

http://git-wip-us.apache.org/repos/asf/mesos/blob/4a211e5f/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index a412ceb..22b8330 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -208,7 +208,9 @@ int main(int argc, char** argv)
     os::setenv("LIBPROCESS_ADVERTISE_PORT", advertise_port.get());
   }
 
-  process::initialize("slave(1)");
+  const string id = process::ID::generate("slave"); // Process ID.
+
+  process::initialize(id);
 
   logging::initialize(argv[0], flags, true); // Catch signals.
 
@@ -304,6 +306,7 @@ int main(int argc, char** argv)
   LOG(INFO) << "Starting Mesos slave";
 
   Slave* slave = new Slave(
+      id,
       flags,
       detector.get(),
       containerizer.get(),

http://git-wip-us.apache.org/repos/asf/mesos/blob/4a211e5f/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 06e774b..8fe28ae 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -555,6 +555,7 @@ MockSlave::MockSlave(const slave::Flags& flags,
                      slave::Containerizer* containerizer,
                      const Option<mesos::slave::QoSController*>& _qosController)
   : slave::Slave(
+      process::ID::generate("slave"),
       flags,
       detector,
       containerizer,


[3/5] mesos git commit: Drop `404 NotFound` responses in the executor library.

Posted by vi...@apache.org.
Drop `404 NotFound` responses in the executor library.

Previously, we did not use to drop `404 NotFound` responses in the library and
send `Event::Error` to executor. However, this can be trigerred upon an agent
restart when it has not yet set up HTTP routes.

Review: https://reviews.apache.org/r/42844/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e3c3cf88
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e3c3cf88
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e3c3cf88

Branch: refs/heads/master
Commit: e3c3cf88688d08db5699d5c70616702d92c361e5
Parents: 4a211e5
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Feb 4 14:41:15 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Feb 4 14:41:15 2016 -0800

----------------------------------------------------------------------
 src/executor/executor.cpp | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e3c3cf88/src/executor/executor.cpp
----------------------------------------------------------------------
diff --git a/src/executor/executor.cpp b/src/executor/executor.cpp
index 92334ff..c5f22e7 100644
--- a/src/executor/executor.cpp
+++ b/src/executor/executor.cpp
@@ -503,6 +503,14 @@ protected:
       return;
     }
 
+    if (response->code == process::http::Status::NOT_FOUND) {
+      // This could happen if the agent libprocess process has not yet set up
+      // HTTP routes.
+      LOG(WARNING) << "Received '" << response->status << "' ("
+                   << response->body << ") for " << call.type();
+      return;
+    }
+
     // We should not be able to get here since we already do validation
     // of calls before sending them to the agent.
     error("Received unexpected '" + response->status + "' (" +


[4/5] mesos git commit: Added an example executor based on the new V1 API.

Posted by vi...@apache.org.
Added an example executor based on the new V1 API.

This change adds a custom executor based on the new executor library.

Review: https://reviews.apache.org/r/42185/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8bb86676
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8bb86676
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8bb86676

Branch: refs/heads/master
Commit: 8bb8667602b78da74374e669a6ebfdb8e5d56972
Parents: e3c3cf8
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Feb 4 14:41:33 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Feb 4 14:41:33 2016 -0800

----------------------------------------------------------------------
 src/Makefile.am                     |   5 +
 src/examples/test_http_executor.cpp | 241 +++++++++++++++++++++++++++++++
 2 files changed, 246 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8bb86676/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 420aa40..22f5131 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1601,6 +1601,11 @@ test_executor_SOURCES = examples/test_executor.cpp
 test_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
 test_executor_LDADD = libmesos.la $(LDADD)
 
+check_PROGRAMS += test-http-executor
+test_http_executor_SOURCES = examples/test_http_executor.cpp
+test_http_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
+test_http_executor_LDADD = libmesos.la $(LDADD)
+
 check_PROGRAMS += long-lived-framework
 long_lived_framework_SOURCES = examples/long_lived_framework.cpp
 long_lived_framework_CPPFLAGS = $(MESOS_CPPFLAGS)

http://git-wip-us.apache.org/repos/asf/mesos/blob/8bb86676/src/examples/test_http_executor.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_http_executor.cpp b/src/examples/test_http_executor.cpp
new file mode 100644
index 0000000..4916e0e
--- /dev/null
+++ b/src/examples/test_http_executor.cpp
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <iostream>
+#include <queue>
+#include <string>
+
+#include <mesos/http.hpp>
+
+#include <mesos/v1/executor.hpp>
+#include <mesos/v1/mesos.hpp>
+
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include <stout/exit.hpp>
+#include <stout/linkedhashmap.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/uuid.hpp>
+
+using std::cout;
+using std::endl;
+using std::queue;
+using std::string;
+
+using mesos::v1::ExecutorID;
+using mesos::v1::FrameworkID;
+using mesos::v1::TaskID;
+using mesos::v1::TaskInfo;
+using mesos::v1::TaskState;
+using mesos::v1::TaskStatus;
+
+using mesos::v1::executor::Call;
+using mesos::v1::executor::Event;
+using mesos::v1::executor::Mesos;
+
+using process::spawn;
+using process::wait;
+
+
+class TestExecutor: public process::Process<TestExecutor>
+{
+public:
+  TestExecutor(const FrameworkID& _frameworkId, const ExecutorID& _executorId)
+    : frameworkId(_frameworkId),
+      executorId(_executorId),
+      mesos(mesos::ContentType::PROTOBUF,
+            process::defer(self(), &Self::connected),
+            process::defer(self(), &Self::disconnected),
+            process::defer(self(), &Self::received, lambda::_1)),
+      state(DISCONNECTED) {}
+
+  void connected()
+  {
+    state = CONNECTED;
+
+    doReliableRegistration();
+  }
+
+  void doReliableRegistration()
+  {
+    if (state == SUBSCRIBED || state == DISCONNECTED) {
+      return;
+    }
+
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.mutable_executor_id()->CopyFrom(executorId);
+
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+
+    // Send all unacknowledged updates.
+    foreach (const Call::Update& update, updates.values()) {
+      subscribe->add_unacknowledged_updates()->MergeFrom(update);
+    }
+
+    // Send all unacknowledged tasks.
+    foreach (const TaskInfo& task, tasks.values()) {
+      subscribe->add_unacknowledged_tasks()->MergeFrom(task);
+    }
+
+    mesos.send(call);
+
+    process::delay(Seconds(1), self(), &Self::doReliableRegistration);
+  }
+
+  void disconnected()
+  {
+    state = DISCONNECTED;
+  }
+
+  void sendStatusUpdate(const TaskInfo& task, const TaskState& state)
+  {
+    UUID uuid = UUID::random();
+
+    TaskStatus status;
+    status.mutable_task_id()->CopyFrom(task.task_id());
+    status.mutable_executor_id()->CopyFrom(executorId);
+    status.set_state(state);
+    status.set_source(TaskStatus::SOURCE_EXECUTOR);
+    status.set_uuid(uuid.toBytes());
+
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.mutable_executor_id()->CopyFrom(executorId);
+
+    call.set_type(Call::UPDATE);
+
+    call.mutable_update()->mutable_status()->CopyFrom(status);
+
+    // Capture the status update.
+    updates[uuid] = call.update();
+
+    mesos.send(call);
+  }
+
+  void received(queue<Event> events)
+  {
+    while (!events.empty()) {
+      Event event = events.front();
+      events.pop();
+
+      switch (event.type()) {
+        case Event::SUBSCRIBED: {
+          cout << "Received a SUBSCRIBED event" << endl;
+
+          state = SUBSCRIBED;
+          break;
+        }
+
+        case Event::LAUNCH: {
+          const TaskInfo& task = event.launch().task();
+          tasks[task.task_id()] = task;
+
+          cout << "Starting task " << task.task_id().value() << endl;
+
+          sendStatusUpdate(task, TaskState::TASK_RUNNING);
+
+          // This is where one would perform the requested task.
+
+          cout << "Finishing task " << task.task_id().value() << endl;
+
+          sendStatusUpdate(task, TaskState::TASK_FINISHED);
+          break;
+        }
+
+        case Event::KILL: {
+          cout << "Received a KILL event" << endl;
+          break;
+        }
+
+        case Event::ACKNOWLEDGED: {
+          cout << "Received an ACKNOWLEDGED event" << endl;
+
+          // Remove the corresponding update.
+          updates.erase(UUID::fromBytes(event.acknowledged().uuid()));
+
+          // Remove the corresponding task.
+          tasks.erase(event.acknowledged().task_id());
+          break;
+        }
+
+        case Event::MESSAGE: {
+          cout << "Received a MESSAGE event" << endl;
+          break;
+        }
+
+        case Event::SHUTDOWN: {
+          cout << "Received a SHUTDOWN event" << endl;
+          break;
+        }
+
+        case Event::ERROR: {
+          cout << "Received an ERROR event" << endl;
+          break;
+        }
+      }
+    }
+  }
+
+private:
+  const FrameworkID frameworkId;
+  const ExecutorID executorId;
+  Mesos mesos;
+  enum State
+  {
+    CONNECTED,
+    DISCONNECTED,
+    SUBSCRIBED
+  } state;
+
+  LinkedHashMap<UUID, Call::Update> updates; // Unacknowledged updates.
+  LinkedHashMap<TaskID, TaskInfo> tasks; // Unacknowledged tasks.
+};
+
+
+int main()
+{
+  FrameworkID frameworkId;
+  ExecutorID executorId;
+
+  Option<string> value;
+
+  value = os::getenv("MESOS_FRAMEWORK_ID");
+  if (value.isNone()) {
+    EXIT(1) << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment";
+  }
+  frameworkId.set_value(value.get());
+
+  value = os::getenv("MESOS_EXECUTOR_ID");
+  if (value.isNone()) {
+    EXIT(1) << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment";
+  }
+  executorId.set_value(value.get());
+
+  process::Owned<TestExecutor> executor(
+      new TestExecutor(frameworkId, executorId));
+
+  process::spawn(executor.get());
+  process::wait(executor.get());
+
+  return EXIT_SUCCESS;
+}


[5/5] mesos git commit: Added tests for recovery for HTTP based executors.

Posted by vi...@apache.org.
Added tests for recovery for HTTP based executors.

Review: https://reviews.apache.org/r/42186/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2dd8dc85
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2dd8dc85
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2dd8dc85

Branch: refs/heads/master
Commit: 2dd8dc85a1443b2192c548cabeab41aad6da3e17
Parents: 8bb8667
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Feb 4 14:41:42 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Feb 4 14:41:42 2016 -0800

----------------------------------------------------------------------
 src/tests/slave_recovery_tests.cpp | 228 ++++++++++++++++++++++++++++++++
 1 file changed, 228 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2dd8dc85/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 6683a08..bccdf37 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -21,7 +21,10 @@
 
 #include <gtest/gtest.h>
 
+#include <mesos/v1/executor/executor.hpp>
+
 #include <mesos/executor.hpp>
+#include <mesos/http.hpp>
 #include <mesos/resources.hpp>
 #include <mesos/scheduler.hpp>
 
@@ -58,6 +61,7 @@
 
 #include "tests/allocator.hpp"
 #include "tests/containerizer.hpp"
+#include "tests/flags.hpp"
 #include "tests/mesos.hpp"
 #include "tests/utils.hpp"
 
@@ -69,6 +73,8 @@ using google::protobuf::RepeatedPtrField;
 
 using mesos::internal::master::Master;
 
+using mesos::v1::executor::Call;
+
 using std::map;
 using std::string;
 using std::vector;
@@ -411,6 +417,111 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
 
 
 // The slave is stopped before the first update for a task is received
+// from the HTTP based executor. When it comes back up with recovery=reconnect,
+// make sure the executor subscribes and the slave properly sends the update.
+TYPED_TEST(SlaveRecoveryTest, ReconnectHTTPExecutor)
+{
+  Try<PID<Master> > master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = this->CreateSlaveFlags();
+
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(containerizer1);
+
+  // Start the slave with a static process ID. This allows the executor to
+  // reconnect with the slave upon a process restart.
+  const std::string id("slave");
+
+  Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), id, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());      // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // TODO(anand): Use the HTTP based command executor once MESOS-3558 is
+  // resolved.
+  ExecutorInfo executorInfo =
+    CREATE_EXECUTOR_INFO(
+        "http",
+        path::join(tests::flags.build_dir, "src", "test-http-executor"));
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->CopyFrom(offers.get()[0].resources());
+  task.mutable_executor()->CopyFrom(executorInfo);
+
+  Future<v1::executor::Call> updateCall1 =
+    DROP_HTTP_CALL(Call(), Call::UPDATE, _, ContentType::PROTOBUF);
+
+  Future<v1::executor::Call> updateCall2 =
+    DROP_HTTP_CALL(Call(), Call::UPDATE, _, ContentType::PROTOBUF);
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  // Stop the slave before the status updates are received.
+  AWAIT_READY(updateCall1);
+  AWAIT_READY(updateCall2);
+
+  this->Stop(slave.get());
+  delete containerizer1.get();
+
+  Future<v1::executor::Call> subscribeCall =
+    FUTURE_HTTP_CALL(Call(), Call::SUBSCRIBE, _, ContentType::PROTOBUF);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  // Restart the slave (use same flags) with a new containerizer.
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(containerizer2);
+
+  slave = this->StartSlave(containerizer2.get(), id, flags);
+  ASSERT_SOME(slave);
+
+  // Ensure that the executor subscribes again.
+  AWAIT_READY(subscribeCall);
+
+  ASSERT_EQ(2, subscribeCall.get().subscribe().unacknowledged_updates().size());
+  ASSERT_EQ(1, subscribeCall.get().subscribe().unacknowledged_tasks().size());
+
+  // Scheduler should receive the recovered update.
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_RUNNING, status.get().state());
+
+  driver.stop();
+  driver.join();
+
+  this->Shutdown();
+  delete containerizer2.get();
+}
+
+
+// The slave is stopped before the first update for a task is received
 // from the executor. When it comes back up with recovery=reconnect, make
 // sure the executor re-registers and the slave properly sends the update.
 TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
@@ -925,6 +1036,123 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
 }
 
 
+// The slave is stopped before a terminal update is received from the HTTP
+// based executor. The slave is then restarted in recovery=cleanup mode.
+// It kills the executor, and terminates. Master should then send TASK_LOST.
+TYPED_TEST(SlaveRecoveryTest, CleanupHTTPExecutor)
+{
+  Try<PID<Master> > master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = this->CreateSlaveFlags();
+
+  Fetcher fetcher;
+
+  Try<TypeParam*> containerizer1 = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(containerizer1);
+
+  // Start the slave with a static process ID. This allows the executor to
+  // reconnect with the slave upon a process restart.
+  const std::string id("slave");
+
+  Try<PID<Slave> > slave = this->StartSlave(containerizer1.get(), id, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());        // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // TODO(anand): Use the HTTP based command executor once MESOS-3558 is
+  // resolved.
+  ExecutorInfo executorInfo =
+    CREATE_EXECUTOR_INFO(
+        "http",
+        path::join(tests::flags.build_dir, "src", "test-http-executor"));
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->CopyFrom(offers.get()[0].resources());
+  task.mutable_executor()->CopyFrom(executorInfo);
+
+  Future<v1::executor::Call> updateCall1 =
+    DROP_HTTP_CALL(Call(), Call::UPDATE, _, ContentType::PROTOBUF);
+
+  Future<v1::executor::Call> updateCall2 =
+    DROP_HTTP_CALL(Call(), Call::UPDATE, _, ContentType::PROTOBUF);
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  // Stop the slave before the status updates are received.
+  AWAIT_READY(updateCall1);
+  AWAIT_READY(updateCall2);
+
+  this->Stop(slave.get());
+  delete containerizer1.get();
+
+  // Slave in cleanup mode shouldn't re-register with the master and
+  // hence no offers should be made by the master.
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .Times(0);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
+
+  EXPECT_CALL(sched, slaveLost(_, _))
+    .Times(AtMost(1));
+
+  // Restart the slave in 'cleanup' recovery mode with a new isolator.
+  Try<TypeParam*> containerizer2 = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(containerizer2);
+
+  flags.recover = "cleanup";
+  slave = this->StartSlave(containerizer2.get(), id, flags);
+  ASSERT_SOME(slave);
+
+  Clock::pause();
+
+  // Now advance time until the reaper reaps the executor.
+  while (status.isPending()) {
+    Clock::advance(process::MAX_REAP_INTERVAL());
+    Clock::settle();
+  }
+
+  // Wait for recovery to complete.
+  AWAIT_READY(__recover);
+
+  // Scheduler should receive the TASK_LOST update.
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_LOST, status.get().state());
+
+  driver.stop();
+  driver.join();
+
+  this->Shutdown();
+  delete containerizer2.get();
+}
+
+
 // The slave is stopped after a non-terminal update is received.
 // Slave is restarted in recovery=cleanup mode. It kills the command
 // executor, and terminates. Master should then send TASK_LOST.