You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/10/25 16:07:52 UTC

mesos git commit: Moved the framework/executor lookup in the Agent's executor endpoint.

Repository: mesos
Updated Branches:
  refs/heads/master ed29afc15 -> c1106b36b


Moved the framework/executor lookup in the Agent's executor endpoint.

Refactored the `api/v1/executor` endpoint to look up the framework and
executor before allowing subscription. This is expected since the slave
stores the framework and executor during the launch process. Updated the
tests to first spawn a dummy `cat` process and then make a `Subscribe`
call to ensure that the Agent knows about the framework / executor and
not reject it with a `BadRequest`.

Review: https://reviews.apache.org/r/38875


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c1106b36
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c1106b36
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c1106b36

Branch: refs/heads/master
Commit: c1106b36b2f7c24fabc6ebb76fd79637aded93d9
Parents: ed29afc
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Sun Oct 25 07:58:15 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Sun Oct 25 08:07:48 2015 -0700

----------------------------------------------------------------------
 src/slave/http.cpp                    |  34 +++++----
 src/tests/executor_http_api_tests.cpp | 116 ++++++++++++++++++++++-------
 2 files changed, 109 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c1106b36/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 80bda34..3f7f71b 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -263,11 +263,11 @@ Future<Response> Slave::Http::executor(const Request& request) const
   // TODO(anand): Validate the protobuf (MESOS-2906) before proceeding
   // further.
 
+  ContentType responseContentType;
+
   if (call.type() == executor::Call::SUBSCRIBE) {
     // We default to JSON since an empty 'Accept' header
     // results in all media types considered acceptable.
-    ContentType responseContentType;
-
     if (request.acceptsMediaType(APPLICATION_JSON)) {
       responseContentType = ContentType::JSON;
     } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
@@ -277,18 +277,8 @@ Future<Response> Slave::Http::executor(const Request& request) const
           string("Expecting 'Accept' to allow ") +
           "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
     }
-
-    Pipe pipe;
-    OK ok;
-    ok.headers["Content-Type"] = stringify(responseContentType);
-
-    ok.type = Response::PIPE;
-    ok.reader = pipe.reader();
-
-    return ok;
   }
 
-
   // We consolidate the framework/executor lookup logic here because
   // it is common for all the call handlers.
   Framework* framework = slave->getFramework(call.framework_id());
@@ -301,16 +291,30 @@ Future<Response> Slave::Http::executor(const Request& request) const
     return BadRequest("Executor cannot be found");
   }
 
-  if (executor->state == Executor::REGISTERING) {
+  if (executor->state == Executor::REGISTERING &&
+      call.type() != executor::Call::SUBSCRIBE) {
     return Forbidden("Executor is not subscribed");
   }
 
   switch (call.type()) {
-    case executor::Call::UPDATE:
+    case executor::Call::SUBSCRIBE: {
+      Pipe pipe;
+      OK ok;
+      ok.headers["Content-Type"] = stringify(responseContentType);
+
+      ok.type = Response::PIPE;
+      ok.reader = pipe.reader();
+
+      return ok;
+    }
+
+    case executor::Call::UPDATE: {
       return Accepted();
+    }
 
-    case executor::Call::MESSAGE:
+    case executor::Call::MESSAGE: {
       return Accepted();
+    }
 
     default:
       // Should be caught during call validation above.

http://git-wip-us.apache.org/repos/asf/mesos/blob/c1106b36/src/tests/executor_http_api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/executor_http_api_tests.cpp b/src/tests/executor_http_api_tests.cpp
index 31938c2..e429d84 100644
--- a/src/tests/executor_http_api_tests.cpp
+++ b/src/tests/executor_http_api_tests.cpp
@@ -17,6 +17,7 @@
  */
 
 #include <string>
+#include <vector>
 
 #include <mesos/v1/executor/executor.hpp>
 
@@ -52,6 +53,7 @@ using process::http::Response;
 using process::http::UnsupportedMediaType;
 
 using std::string;
+using std::vector;
 
 using testing::WithParamInterface;
 
@@ -306,38 +308,69 @@ TEST_F(ExecutorHttpApiTest, GetRequest)
 }
 
 
-// This test sends in a Accept:*/* header meaning it would Accept any
-// media type as response. We return the default "application/json"
-// media type as response.
+// This test sends in a Accept:*/* header meaning it would
+// accept any media type in the response. We expect the
+// default "application/json" media type.
 TEST_P(ExecutorHttpApiTest, DefaultAccept)
 {
   Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  MockExecutor exec(executorId);
 
-  Try<PID<Slave>> slave = StartSlave();
+  Try<PID<Slave>> slave = StartSlave(&exec);
   ASSERT_SOME(slave);
 
-  AWAIT_READY(__recover);
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
 
-  // Wait for recovery to be complete.
-  Clock::pause();
-  Clock::settle();
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
 
-  process::http::Headers headers;
-  headers["Accept"] = "*/*";
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  Future<Nothing> statusUpdate;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureSatisfy(&statusUpdate));
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(1, offers.get().size());
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  TaskInfo taskInfo = createTask(offers.get()[0], "", executorId);
+  driver.launchTasks(offers.get()[0].id(), {taskInfo});
+
+  // Wait until status update is received on the scheduler
+  // before sending an executor subscribe request.
+  AWAIT_READY(statusUpdate);
 
   // Only subscribe needs to 'Accept' JSON or protobuf.
   Call call;
   call.set_type(Call::SUBSCRIBE);
 
-  call.mutable_framework_id()->set_value("dummy_framework_id");
-  call.mutable_executor_id()->set_value("dummy_executor_id");
+  call.mutable_framework_id()->CopyFrom(evolve(frameworkId.get()));
+  call.mutable_executor_id()->CopyFrom(evolve(executorId));
 
   // Retrieve the parameter passed as content type to this test.
   const ContentType contentType = GetParam();
 
+  process::http::Headers headers;
+  headers["Accept"] = "*/*";
+
   Future<Response> response = process::http::streaming::post(
       slave.get(),
       "api/v1/executor",
@@ -360,30 +393,61 @@ TEST_P(ExecutorHttpApiTest, NoAcceptHeader)
   Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  MockExecutor exec(executorId);
 
-  Try<PID<Slave>> slave = StartSlave();
+  Try<PID<Slave>> slave = StartSlave(&exec);
   ASSERT_SOME(slave);
 
-  AWAIT_READY(__recover);
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
 
-  // Wait for recovery to be complete.
-  Clock::pause();
-  Clock::settle();
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
 
-  // Retrieve the parameter passed as content type to this test.
-  const ContentType contentType = GetParam();
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
 
-  // No 'Accept' header leads to all media types considered
-  // acceptable. JSON will be chosen by default.
-  process::http::Headers headers;
+  Future<Nothing> statusUpdate;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureSatisfy(&statusUpdate));
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(1, offers.get().size());
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  TaskInfo taskInfo = createTask(offers.get()[0], "", executorId);
+  driver.launchTasks(offers.get()[0].id(), {taskInfo});
+
+  // Wait until status update is received on the scheduler before sending
+  // an executor subscribe request.
+  AWAIT_READY(statusUpdate);
 
   // Only subscribe needs to 'Accept' JSON or protobuf.
   Call call;
   call.set_type(Call::SUBSCRIBE);
 
-  call.mutable_framework_id()->set_value("dummy_framework_id");
-  call.mutable_executor_id()->set_value("dummy_executor_id");
+  call.mutable_framework_id()->CopyFrom(evolve(frameworkId.get()));
+  call.mutable_executor_id()->CopyFrom(evolve(executorId));
+
+  // Retrieve the parameter passed as content type to this test.
+  const ContentType contentType = GetParam();
+
+  // No 'Accept' header leads to all media types considered
+  // acceptable. JSON will be chosen by default.
+  process::http::Headers headers;
 
   Future<Response> response = process::http::streaming::post(
       slave.get(),