You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/12/05 00:27:31 UTC

[3/3] mesos git commit: Added test for the Subscribe->Subscribed workflow for the Executor HTTP API.

Added test for the Subscribe->Subscribed workflow for the Executor
HTTP API.

This change adds a basic test to validate the implementation for
Subscribe->Subscribed workflow on the `api/v1/executor` endpoint on
Agent.

Review: https://reviews.apache.org/r/38878


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/517e8466
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/517e8466
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/517e8466

Branch: refs/heads/master
Commit: 517e84669790f833f6cf0e7911968a3591c93419
Parents: a2c3faa
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Fri Dec 4 15:26:10 2015 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Dec 4 15:26:11 2015 -0800

----------------------------------------------------------------------
 src/tests/executor_http_api_tests.cpp | 108 +++++++++++++++++++++++++++++
 1 file changed, 108 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/517e8466/src/tests/executor_http_api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/executor_http_api_tests.cpp b/src/tests/executor_http_api_tests.cpp
index fe9df1f..1be657c 100644
--- a/src/tests/executor_http_api_tests.cpp
+++ b/src/tests/executor_http_api_tests.cpp
@@ -25,34 +25,45 @@
 #include <process/future.hpp>
 #include <process/gtest.hpp>
 #include <process/http.hpp>
+#include <process/message.hpp>
 #include <process/pid.hpp>
 
 #include "common/http.hpp"
+#include "common/recordio.hpp"
 
 #include "master/master.hpp"
 
+#include "tests/containerizer.hpp"
 #include "tests/mesos.hpp"
 
 using mesos::internal::master::Master;
 
+using mesos::internal::recordio::Reader;
+
 using mesos::internal::slave::Slave;
 
 using mesos::v1::executor::Call;
+using mesos::v1::executor::Event;
 
 using process::Clock;
 using process::Future;
+using process::Message;
 using process::PID;
 
 using process::http::BadRequest;
 using process::http::MethodNotAllowed;
 using process::http::NotAcceptable;
 using process::http::OK;
+using process::http::Pipe;
 using process::http::Response;
 using process::http::UnsupportedMediaType;
 
+using recordio::Decoder;
+
 using std::string;
 using std::vector;
 
+using testing::Eq;
 using testing::WithParamInterface;
 
 namespace mesos {
@@ -694,6 +705,103 @@ TEST_P(ExecutorHttpApiTest, StatusUpdateCallFailedValidation)
 }
 
 
+// This test verifies if the executor is able to receive a Subscribed
+// event in response to a Subscribe call request.
+TEST_P(ExecutorHttpApiTest, Subscribe)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  MockExecutor exec(executorId);
+
+  TestContainerizer containerizer(&exec);
+
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(1u, offers.get().size());
+
+  Future<Message> registerExecutorMessage =
+    DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
+
+  TaskInfo taskInfo = createTask(offers.get()[0], "", executorId);
+  driver.launchTasks(offers.get()[0].id(), {taskInfo});
+
+  // Drop the `RegisterExecutorMessage` and then send a `Subscribe` request
+  // from the HTTP based executor.
+  AWAIT_READY(registerExecutorMessage);
+
+  Call call;
+  call.mutable_framework_id()->CopyFrom(evolve(frameworkId.get()));
+  call.mutable_executor_id()->CopyFrom(evolve(executorId));
+
+  call.set_type(Call::SUBSCRIBE);
+
+  call.mutable_subscribe();
+
+  // Retrieve the parameter passed as content type to this test.
+  const ContentType contentType = GetParam();
+
+  process::http::Headers headers;
+  headers["Accept"] = stringify(contentType);
+
+  Future<Response> response = process::http::streaming::post(
+      slave.get(),
+      "api/v1/executor",
+      headers,
+      serialize(contentType, call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  EXPECT_SOME_EQ(stringify(contentType),
+                 response.get().headers.get("Content-Type"));
+
+  EXPECT_SOME_EQ("chunked",
+                 response.get().headers.get("Transfer-Encoding"));
+
+  ASSERT_EQ(Response::PIPE, response.get().type);
+
+  Option<Pipe::Reader> reader = response.get().reader;
+  ASSERT_SOME(reader);
+
+  auto deserializer =
+    lambda::bind(deserialize<Event>, contentType, lambda::_1);
+
+  Reader<Event> responseDecoder(
+      Decoder<Event>(deserializer),
+      reader.get());
+
+  Future<Result<Event>> event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  // Check event type is subscribed and if the ExecutorID matches.
+  ASSERT_EQ(Event::SUBSCRIBED, event.get().get().type());
+  ASSERT_EQ(event.get().get().subscribed().executor_info().executor_id(),
+            call.executor_id());
+
+  reader.get().close();
+  Shutdown();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {