You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/08/13 21:08:00 UTC

[1/5] git commit: Expose pending tasks during reconciliation.

Repository: mesos
Updated Branches:
  refs/heads/master 30fdabe1a -> 4700fadf0


Expose pending tasks during reconciliation.

Review: https://reviews.apache.org/r/24516


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4700fadf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4700fadf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4700fadf

Branch: refs/heads/master
Commit: 4700fadf0cb618489a91f48b6e57719b69ea0389
Parents: 85303d1
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Aug 8 16:19:27 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Aug 13 11:54:23 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp                    | 35 +++++++++---
 src/tests/master_authorization_tests.cpp | 78 ---------------------------
 src/tests/reconciliation_tests.cpp       | 44 ++++++---------
 3 files changed, 44 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4700fadf/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index f40a1cd..e948803 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3302,7 +3302,26 @@ void Master::reconcileTasks(
     LOG(INFO) << "Performing implicit task state reconciliation for framework "
               << frameworkId;
 
-    // TODO(bmahler): Consider sending completed tasks?
+    foreachvalue (const TaskInfo& task, framework->pendingTasks) {
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          frameworkId,
+          task.slave_id(),
+          task.task_id(),
+          TASK_STAGING,
+          "Reconciliation: Latest task state");
+
+      VLOG(1) << "Sending implicit reconciliation state "
+              << update.status().state()
+              << " for task " << update.status().task_id()
+              << " of framework " << frameworkId;
+
+      // TODO(bmahler): Consider using forward(); might lead to too
+      // much logging.
+      StatusUpdateMessage message;
+      message.mutable_update()->CopyFrom(update);
+      send(framework->pid, message);
+    }
+
     foreachvalue (Task* task, framework->tasks) {
       const StatusUpdate& update = protobuf::createStatusUpdate(
           frameworkId,
@@ -3331,7 +3350,7 @@ void Master::reconcileTasks(
             << statuses.size() << " tasks of framework " << frameworkId;
 
   // Explicit reconciliation occurs for the following cases:
-  //   (1) Task is known, but pending: no-op.
+  //   (1) Task is known, but pending: TASK_STAGING.
   //   (2) Task is known: send the latest state.
   //   (3) Task is unknown, slave is registered: TASK_LOST.
   //   (4) Task is unknown, slave is transitioning: no-op.
@@ -3353,10 +3372,14 @@ void Master::reconcileTasks(
     Task* task = framework->getTask(status.task_id());
 
     if (framework->pendingTasks.contains(status.task_id())) {
-      // (1) Task is known, but pending: no-op.
-      LOG(INFO) << "Ignoring reconciliation request of task "
-                << status.task_id() << " from framework " << frameworkId
-                << " because the task is pending";
+      // (1) Task is known, but pending: TASK_STAGING.
+      const TaskInfo& task_ = framework->pendingTasks[status.task_id()];
+      update = protobuf::createStatusUpdate(
+          frameworkId,
+          task_.slave_id(),
+          task_.task_id(),
+          TASK_STAGING,
+          "Reconciliation: Latest task state");
     } else if (task != NULL) {
       // (2) Task is known: send the latest state.
       update = protobuf::createStatusUpdate(

http://git-wip-us.apache.org/repos/asf/mesos/blob/4700fadf/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
index f0f0648..b9aa7bf 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -523,84 +523,6 @@ TEST_F(MasterAuthorizationTest, FrameworkRemoved)
 }
 
 
-// This test verifies that a reconciliation request that comes before
-// '_launchTasks()' is ignored.
-TEST_F(MasterAuthorizationTest, ReconcileTask)
-{
-  MockAuthorizer authorizer;
-  Try<PID<Master> > master = StartMaster(&authorizer);
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-
-  Try<PID<Slave> > slave = StartSlave(&exec);
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _))
-    .Times(1);
-
-  Future<vector<Offer> > offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  driver.start();
-
-  AWAIT_READY(offers);
-  EXPECT_NE(0u, offers.get().size());
-
-  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
-  vector<TaskInfo> tasks;
-  tasks.push_back(task);
-
-  // Return a pending future from authorizer.
-  Future<Nothing> future;
-  Promise<bool> promise;
-  EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTask&>()))
-    .WillOnce(DoAll(FutureSatisfy(&future),
-                    Return(promise.future())));
-
-  driver.launchTasks(offers.get()[0].id(), tasks);
-
-  // Wait until authorization is in progress.
-  AWAIT_READY(future);
-
-  // Scheduler shouldn't get an update from reconciliation.
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .Times(0);
-
-  Future<ReconcileTasksMessage> reconcileTasksMessage =
-    FUTURE_PROTOBUF(ReconcileTasksMessage(), _, _);
-
-  vector<TaskStatus> statuses;
-
-  TaskStatus status;
-  status.mutable_task_id()->CopyFrom(task.task_id());
-  status.mutable_slave_id()->CopyFrom(offers.get()[0].slave_id());
-  status.set_state(TASK_STAGING);
-
-  statuses.push_back(status);
-
-  driver.reconcileTasks(statuses);
-
-  AWAIT_READY(reconcileTasksMessage);
-
-  // Make sure the framework doesn't receive any update.
-  Clock::pause();
-  Clock::settle();
-
-  // Now stop the framework.
-  driver.stop();
-  driver.join();
-
-  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
-}
-
-
 // This test verifies that two tasks each launched on a different
 // slave with same executor id but different executor info are
 // allowed even when the first task is pending due to authorization.

http://git-wip-us.apache.org/repos/asf/mesos/blob/4700fadf/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 3c4d7ed..8c66659 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -578,8 +578,7 @@ TEST_F(ReconciliationTest, ImplicitTerminalTask)
 
 
 // This test ensures that reconciliation requests for tasks that are
-// pending (due to validation/authorization) do not result in status
-// updates.
+// pending are exposed in reconciliation.
 TEST_F(ReconciliationTest, PendingTask)
 {
   MockAuthorizer authorizer;
@@ -615,10 +614,6 @@ TEST_F(ReconciliationTest, PendingTask)
   AWAIT_READY(offers);
   EXPECT_NE(0u, offers.get().size());
 
-  // Framework should not receive any update.
-  EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .Times(0);
-
   // Return a pending future from authorizer.
   Future<Nothing> future;
   Promise<bool> promise;
@@ -635,43 +630,34 @@ TEST_F(ReconciliationTest, PendingTask)
   // Wait until authorization is in progress.
   AWAIT_READY(future);
 
-  // First send an implicit reconciliation request for this task,
-  // there should be no updates.
-  Future<ReconcileTasksMessage> reconcileTasksMessage =
-    FUTURE_PROTOBUF(ReconcileTasksMessage(), _ , _);
-
-  Clock::pause();
+  // First send an implicit reconciliation request for this task.
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&update));
 
   vector<TaskStatus> statuses;
   driver.reconcileTasks(statuses);
 
-  // Make sure the master received the reconcile tasks message.
-  AWAIT_READY(reconcileTasksMessage);
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_STAGING, update.get().state());
+  EXPECT_TRUE(update.get().has_slave_id());
 
-  // The Clock::settle() will ensure that framework would receive
-  // a status update if it is sent by the master. In this test it
-  // shouldn't receive any.
-  Clock::settle();
+  // Now send an explicit reconciliation request for this task.
+  Future<TaskStatus> update2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&update2));
 
-  // Now send an explicit reconciliation request for this task;
-  // there should be no updates.
   TaskStatus status;
   status.mutable_task_id()->CopyFrom(task.task_id());
   status.mutable_slave_id()->CopyFrom(slaveId);
   status.set_state(TASK_STAGING);
   statuses.push_back(status);
 
-  reconcileTasksMessage = FUTURE_PROTOBUF(ReconcileTasksMessage(), _ , _);
-
   driver.reconcileTasks(statuses);
 
-  // Make sure the master received the reconcile tasks message.
-  AWAIT_READY(reconcileTasksMessage);
-
-  // The Clock::settle() will ensure that framework would receive
-  // a status update if it is sent by the master. In this test it
-  // shouldn't receive any.
-  Clock::settle();
+  AWAIT_READY(update2);
+  EXPECT_EQ(TASK_STAGING, update2.get().state());
+  EXPECT_TRUE(update2.get().has_slave_id());
 
   driver.stop();
   driver.join();


[4/5] git commit: Model pending tasks in the Master's metrics and JSON.

Posted by bm...@apache.org.
Model pending tasks in the Master's metrics and JSON.

Review: https://reviews.apache.org/r/24515


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/85303d17
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/85303d17
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/85303d17

Branch: refs/heads/master
Commit: 85303d17e264fdceb9dd814018384e8759aa93b7
Parents: 3ff1180
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Aug 8 15:46:08 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Aug 13 11:54:23 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am                 |   3 +-
 src/common/http.cpp             |  54 +++++++++++++----
 src/common/http.hpp             |  15 +++++
 src/master/http.cpp             |   7 +++
 src/master/master.cpp           |  21 ++++++-
 src/tests/common/http_tests.cpp | 113 +++++++++++++++++++++++++++++++++++
 6 files changed, 200 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 60f89ed..0ac95b4 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1099,7 +1099,8 @@ mesos_tests_SOURCES =				\
   tests/status_update_manager_tests.cpp		\
   tests/utils.cpp				\
   tests/values_tests.cpp			\
-  tests/zookeeper_url_tests.cpp
+  tests/zookeeper_url_tests.cpp			\
+  tests/common/http_tests.cpp
 
 mesos_tests_CPPFLAGS = $(MESOS_CPPFLAGS)
 mesos_tests_CPPFLAGS += -DSOURCE_DIR=\"$(abs_top_srcdir)\"

http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/common/http.cpp
----------------------------------------------------------------------
diff --git a/src/common/http.cpp b/src/common/http.cpp
index d27fe21..58050e9 100644
--- a/src/common/http.cpp
+++ b/src/common/http.cpp
@@ -16,12 +16,8 @@
  * limitations under the License.
  */
 
-#include <map>
-#include <string>
+#include <vector>
 
-#include <glog/logging.h>
-
-#include <mesos/mesos.hpp>
 #include <mesos/resources.hpp>
 
 #include <stout/foreach.hpp>
@@ -32,14 +28,13 @@
 
 #include "messages/messages.hpp"
 
-using std::map;
-using std::string;
+using std::vector;
 
 namespace mesos {
 namespace internal {
 
-// TODO(bmahler): Kill these in favor of automatic Proto->JSON Conversion (when
-// it becomes available).
+// TODO(bmahler): Kill these in favor of automatic Proto->JSON
+// Conversion (when it becomes available).
 
 JSON::Object model(const Resources& resources)
 {
@@ -118,7 +113,13 @@ JSON::Object model(const Task& task)
   object.values["id"] = task.task_id().value();
   object.values["name"] = task.name();
   object.values["framework_id"] = task.framework_id().value();
-  object.values["executor_id"] = task.executor_id().value();
+
+  if (task.has_executor_id()) {
+    object.values["executor_id"] = task.executor_id().value();
+  } else {
+    object.values["executor_id"] = "";
+  }
+
   object.values["slave_id"] = task.slave_id().value();
   object.values["state"] = TaskState_Name(task.state());
   object.values["resources"] = model(task.resources());
@@ -132,5 +133,38 @@ JSON::Object model(const Task& task)
   return object;
 }
 
+
+// TODO(bmahler): Expose the executor name / source.
+JSON::Object model(
+    const TaskInfo& task,
+    const FrameworkID& frameworkId,
+    const TaskState& state,
+    const vector<TaskStatus>& statuses)
+{
+  JSON::Object object;
+  object.values["id"] = task.task_id().value();
+  object.values["name"] = task.name();
+  object.values["framework_id"] = frameworkId.value();
+
+  if (task.has_executor()) {
+    object.values["executor_id"] = task.executor().executor_id().value();
+  } else {
+    object.values["executor_id"] = "";
+  }
+
+  object.values["slave_id"] = task.slave_id().value();
+  object.values["state"] = TaskState_Name(state);
+  object.values["resources"] = model(task.resources());
+
+  JSON::Array array;
+  foreach (const TaskStatus& status, statuses) {
+    array.values.push_back(model(status));
+  }
+  object.values["statuses"] = array;
+
+  return object;
+}
+
+
 }  // namespace internal {
 }  // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/common/http.hpp
----------------------------------------------------------------------
diff --git a/src/common/http.hpp b/src/common/http.hpp
index 8216401..afce7fe 100644
--- a/src/common/http.hpp
+++ b/src/common/http.hpp
@@ -19,18 +19,33 @@
 #ifndef __COMMON_HTTP_HPP__
 #define __COMMON_HTTP_HPP__
 
+#include <vector>
+
+#include <mesos/mesos.hpp>
+
 #include <stout/json.hpp>
 
 namespace mesos {
+
 class Resources;
 
 namespace internal {
+
 class Attributes;
 class Task;
 
+
 JSON::Object model(const Resources& resources);
 JSON::Object model(const Attributes& attributes);
+
+// These are the two identical ways to model a task, depending on
+// whether you have a 'Task' or a 'TaskInfo' available.
 JSON::Object model(const Task& task);
+JSON::Object model(
+    const TaskInfo& task,
+    const FrameworkID& frameworkId,
+    const TaskState& state,
+    const std::vector<TaskStatus>& statuses);
 
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 9317a95..6dd11fe 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -127,6 +127,12 @@ JSON::Object model(const Framework& framework)
   // Model all of the tasks associated with a framework.
   {
     JSON::Array array;
+
+    foreachvalue (const TaskInfo& task, framework.pendingTasks) {
+      vector<TaskStatus> statuses;
+      array.values.push_back(model(task, framework.id, TASK_STAGING, statuses));
+    }
+
     foreachvalue (Task* task, framework.tasks) {
       array.values.push_back(model(*task));
     }
@@ -176,6 +182,7 @@ JSON::Object model(const Slave& slave)
   return object;
 }
 
+
 // Returns a JSON object modeled after a Role.
 JSON::Object model(const Role& role)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 72494b5..f40a1cd 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2155,6 +2155,9 @@ void Master::launchTasks(
           TASK_LOST,
           "Task launched with invalid offers: " + error.get().message);
 
+      metrics.tasks_lost++;
+      stats.tasks[TASK_LOST]++;
+
       forward(update, UPID(), framework);
     }
     return;
@@ -2177,6 +2180,8 @@ void Master::launchTasks(
     // NOTE: We need to do this here after validation because of the
     // way task validators work.
     framework->pendingTasks[task.task_id()] = task;
+
+    stats.tasks[TASK_STAGING]++;
   }
 
   // Wait for all the tasks to be validated.
@@ -2322,8 +2327,6 @@ void Master::launchTask(
   message.mutable_task()->MergeFrom(task);
   send(slave->pid, message);
 
-  stats.tasks[TASK_STAGING]++;
-
   return;
 }
 
@@ -2361,6 +2364,9 @@ void Master::_launchTasks(
           TASK_LOST,
           (slave == NULL ? "Slave removed" : "Slave disconnected"));
 
+      metrics.tasks_lost++;
+      stats.tasks[TASK_LOST]++;
+
       forward(update, UPID(), framework);
     }
 
@@ -2397,6 +2403,9 @@ void Master::_launchTasks(
           TASK_LOST,
           error);
 
+      metrics.tasks_lost++;
+      stats.tasks[TASK_LOST]++;
+
       forward(update, UPID(), framework);
 
       continue;
@@ -2423,6 +2432,9 @@ void Master::_launchTasks(
           TASK_LOST,
           error);
 
+      metrics.tasks_lost++;
+      stats.tasks[TASK_LOST]++;
+
       forward(update, UPID(), framework);
 
       continue;
@@ -4471,6 +4483,11 @@ double Master::_tasks_staging()
 {
   double count = 0.0;
 
+  // Add the tasks pending validation / authorization.
+  foreachvalue (Framework* framework, frameworks.registered) {
+    count += framework->pendingTasks.size();
+  }
+
   foreachvalue (Slave* slave, slaves.registered) {
     typedef hashmap<TaskID, Task*> TaskMap;
     foreachvalue (const TaskMap& tasks, slave->tasks) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/tests/common/http_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/common/http_tests.cpp b/src/tests/common/http_tests.cpp
new file mode 100644
index 0000000..5fa51bf
--- /dev/null
+++ b/src/tests/common/http_tests.cpp
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include <vector>
+
+#include <mesos/mesos.hpp>
+
+#include <stout/gtest.hpp>
+#include <stout/json.hpp>
+#include <stout/stringify.hpp>
+
+#include "common/http.hpp"
+#include "common/protobuf_utils.hpp"
+
+#include "messages/messages.hpp"
+
+using std::vector;
+
+using namespace mesos;
+using namespace mesos::internal;
+
+// TODO(bmahler): Add tests for other JSON models.
+
+// This test ensures we don't break the API when it comes to JSON
+// representation of tasks. Also, we want to ensure that tasks are
+// modeled the same way when using 'Task' vs. 'TaskInfo'.
+TEST(HTTP, ModelTask)
+{
+  TaskID taskId;
+  taskId.set_value("t");
+
+  SlaveID slaveId;
+  slaveId.set_value("s");
+
+  ExecutorID executorId;
+  executorId.set_value("e");
+
+  FrameworkID frameworkId;
+  frameworkId.set_value("f");
+
+  TaskState state = TASK_RUNNING;
+
+  vector<TaskStatus> statuses;
+
+  TaskStatus status;
+  status.mutable_task_id()->CopyFrom(taskId);
+  status.set_state(state);
+  status.mutable_slave_id()->CopyFrom(slaveId);
+  status.mutable_executor_id()->CopyFrom(executorId);
+  status.set_timestamp(0.0);
+
+  statuses.push_back(status);
+
+  TaskInfo task;
+  task.set_name("task");
+  task.mutable_task_id()->CopyFrom(taskId);
+  task.mutable_slave_id()->CopyFrom(slaveId);
+  task.mutable_command()->set_value("echo hello");
+
+  Task task_ = protobuf::createTask(task, state, executorId, frameworkId);
+  task_.add_statuses()->CopyFrom(statuses[0]);
+
+  JSON::Value object = model(task, frameworkId, state, statuses);
+  JSON::Value object_ = model(task_);
+
+  Try<JSON::Value> expected = JSON::parse(
+      "{"
+      "  \"executor_id\":\"\","
+      "  \"framework_id\":\"f\","
+      "  \"id\":\"t\","
+      "  \"name\":\"task\","
+      "  \"resources\":"
+      "  {"
+      "    \"cpus\":0,"
+      "    \"disk\":0,"
+      "    \"mem\":0"
+      "  },"
+      "  \"slave_id\":\"s\","
+      "  \"state\":\"TASK_RUNNING\","
+      "  \"statuses\":"
+      "  ["
+      "    {"
+      "      \"state\":\"TASK_RUNNING\","
+      "      \"timestamp\":0"
+      "    }"
+      "  ]"
+      "}");
+
+  ASSERT_SOME(expected);
+
+  EXPECT_EQ(expected.get(), object);
+  EXPECT_EQ(expected.get(), object_);
+
+  // Ensure both are modeled the same.
+  EXPECT_EQ(object, object_);
+}


[2/5] git commit: Some Master cleanups.

Posted by bm...@apache.org.
Some Master cleanups.

Review: https://reviews.apache.org/r/24576


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3ff1180d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3ff1180d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3ff1180d

Branch: refs/heads/master
Commit: 3ff1180d2ee0e1d219335dbad38b7537ecaf9311
Parents: 0e8fa7b
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Mon Aug 11 13:44:13 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Aug 13 11:54:23 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 142 ++++++++++++++++-----------------------------
 1 file changed, 51 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3ff1180d/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a8cf9ba..72494b5 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2098,87 +2098,54 @@ void Master::launchTasks(
     return;
   }
 
-  // TODO(bmahler): This case can be caught during offer validation.
-  if (offerIds.empty()) {
-    LOG(WARNING) << "No offers to launch tasks on";
-
-    foreach (const TaskInfo& task, tasks) {
-      const StatusUpdate& update = protobuf::createStatusUpdate(
-          framework->id,
-          task.slave_id(),
-          task.task_id(),
-          TASK_LOST,
-          "Task launched without offers");
-
-      forward(update, UPID(), framework);
-    }
-    return;
-  }
-
-  // Common slave id for task validation.
-  Option<SlaveID> slaveId;
-
-  // Create offer visitors.
-  list<OfferVisitor*> offerVisitors;
-  offerVisitors.push_back(new ValidOfferChecker());
-  offerVisitors.push_back(new FrameworkChecker());
-  offerVisitors.push_back(new SlaveChecker());
-  offerVisitors.push_back(new UniqueOfferIDChecker());
-
-  // Verify and aggregate all offers.
-  // Abort offer and task processing if any offer validation failed.
-  Resources totalResources;
+  // TODO(bmahler): We currently only support using multiple offers
+  // for a single slave.
+  Resources used;
+  Option<SlaveID> slaveId = None();
   Option<Error> error = None();
-  foreach (const OfferID& offerId, offerIds) {
-    foreach (OfferVisitor* visitor, offerVisitors) {
-      error = (*visitor)(offerId, *framework, this);
-      if (error.isSome()) {
-        break;
-      }
-    }
-    // Offer validation error needs to be propagated from visitor
-    // loop above.
-    if (error.isSome()) {
-      break;
-    }
 
-    // If offer validation succeeds, we need to pass along the common
-    // slave. So optimistically, we store the first slave id we see.
-    // In case of invalid offers (different slaves for example), we
-    // report error and return from launchTask before slaveId is used.
-    if (slaveId.isNone()) {
-      slaveId = getOffer(offerId)->slave_id();
+  if (offerIds.empty()) {
+    error = Error("No offers specified");
+  } else {
+    list<Owned<OfferVisitor> > offerVisitors;
+    offerVisitors.push_back(Owned<OfferVisitor>(new ValidOfferChecker()));
+    offerVisitors.push_back(Owned<OfferVisitor>(new FrameworkChecker()));
+    offerVisitors.push_back(Owned<OfferVisitor>(new SlaveChecker()));
+    offerVisitors.push_back(Owned<OfferVisitor>(new UniqueOfferIDChecker()));
+
+    // Validate the offers.
+    foreach (const OfferID& offerId, offerIds) {
+      foreach (const Owned<OfferVisitor>& visitor, offerVisitors) {
+        if (error.isNone()) {
+          error = (*visitor)(offerId, *framework, this);
+        }
+      }
     }
 
-    totalResources += getOffer(offerId)->resources();
-  }
-
-  // Cleanup visitors.
-  while (!offerVisitors.empty()) {
-    OfferVisitor* visitor = offerVisitors.front();
-    offerVisitors.pop_front();
-    delete visitor;
-  };
-
-  // Remove offers and recover resources if any of the offers are
-  // invalid.
-  foreach (const OfferID& offerId, offerIds) {
-    Offer* offer = getOffer(offerId);
-    if (offer != NULL) {
-      if (error.isSome()) {
-        allocator->resourcesRecovered(
-            offer->framework_id(),
-            offer->slave_id(),
-            offer->resources(),
-            None());
+    // Compute used resources and remove the offers. If the
+    // validation failed, return resources to the allocator.
+    foreach (const OfferID& offerId, offerIds) {
+      Offer* offer = getOffer(offerId);
+      if (offer != NULL) {
+        slaveId = offer->slave_id();
+        used += offer->resources();
+
+        if (error.isSome()) {
+          allocator->resourcesRecovered(
+              offer->framework_id(),
+              offer->slave_id(),
+              offer->resources(),
+              None());
+        }
+        removeOffer(offer);
       }
-      removeOffer(offer);
     }
   }
 
+  // If invalid, send TASK_LOST for the launch attempts.
   if (error.isSome()) {
-    LOG(WARNING) << "Failed to validate offer " << stringify(offerIds)
-                   << ": " << error.get().message;
+    LOG(WARNING) << "Launch tasks message used invalid offers '"
+                 << stringify(offerIds) << "': " << error.get().message;
 
     foreach (const TaskInfo& task, tasks) {
       const StatusUpdate& update = protobuf::createStatusUpdate(
@@ -2193,7 +2160,7 @@ void Master::launchTasks(
     return;
   }
 
-  CHECK(slaveId.isSome()) << "Slave id not found";
+  CHECK_SOME(slaveId);
   Slave* slave = CHECK_NOTNULL(getSlave(slaveId.get()));
 
   LOG(INFO) << "Processing reply for offers: "
@@ -2204,7 +2171,7 @@ void Master::launchTasks(
   // Validate each task and launch if valid.
   list<Future<Option<Error> > > futures;
   foreach (const TaskInfo& task, tasks) {
-    futures.push_back(validateTask(task, framework, slave, totalResources));
+    futures.push_back(validateTask(task, framework, slave, used));
 
     // Add to pending tasks.
     // NOTE: We need to do this here after validation because of the
@@ -2221,7 +2188,7 @@ void Master::launchTasks(
                  framework->id,
                  slaveId.get(),
                  tasks,
-                 totalResources,
+                 used,
                  filters,
                  lambda::_1));
 }
@@ -2237,15 +2204,15 @@ Future<Option<Error> > Master::validateTask(
   CHECK_NOTNULL(slave);
 
   // Create task visitors.
-  // TODO(vinod): Create the visitors on the heap and make the visit
+  // TODO(vinod): Create the visitors on the stack and make the visit
   // operation const.
-  list<TaskInfoVisitor*> taskVisitors;
-  taskVisitors.push_back(new TaskIDChecker());
-  taskVisitors.push_back(new SlaveIDChecker());
-  taskVisitors.push_back(new UniqueTaskIDChecker());
-  taskVisitors.push_back(new ResourceUsageChecker());
-  taskVisitors.push_back(new ExecutorInfoChecker());
-  taskVisitors.push_back(new CheckpointChecker());
+  list<Owned<TaskInfoVisitor> > taskVisitors;
+  taskVisitors.push_back(Owned<TaskInfoVisitor>(new TaskIDChecker()));
+  taskVisitors.push_back(Owned<TaskInfoVisitor>(new SlaveIDChecker()));
+  taskVisitors.push_back(Owned<TaskInfoVisitor>(new UniqueTaskIDChecker()));
+  taskVisitors.push_back(Owned<TaskInfoVisitor>(new ResourceUsageChecker()));
+  taskVisitors.push_back(Owned<TaskInfoVisitor>(new ExecutorInfoChecker()));
+  taskVisitors.push_back(Owned<TaskInfoVisitor>(new CheckpointChecker()));
 
   // TODO(benh): Add a HealthCheckChecker visitor.
 
@@ -2253,20 +2220,13 @@ Future<Option<Error> > Master::validateTask(
 
   // Invoke each visitor.
   Option<Error> error = None();
-  foreach (TaskInfoVisitor* visitor, taskVisitors) {
+  foreach (const Owned<TaskInfoVisitor>& visitor, taskVisitors) {
     error = (*visitor)(task, totalResources, *framework, *slave);
     if (error.isSome()) {
       break;
     }
   }
 
-  // Cleanup visitors.
-  while (!taskVisitors.empty()) {
-    TaskInfoVisitor* visitor = taskVisitors.front();
-    taskVisitors.pop_front();
-    delete visitor;
-  };
-
   if (error.isSome()) {
     return Error(error.get().message);
   }


[3/5] git commit: Removed unused test file 'process_spawn.cpp'.

Posted by bm...@apache.org.
Removed unused test file 'process_spawn.cpp'.

Review: https://reviews.apache.org/r/24582


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/30bc5477
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/30bc5477
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/30bc5477

Branch: refs/heads/master
Commit: 30bc547783741654a56d66b32040da521cfa6f25
Parents: 30fdabe
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Mon Aug 11 16:23:34 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Aug 13 11:54:23 2014 -0700

----------------------------------------------------------------------
 src/tests/process_spawn.cpp | 82 ----------------------------------------
 1 file changed, 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/30bc5477/src/tests/process_spawn.cpp
----------------------------------------------------------------------
diff --git a/src/tests/process_spawn.cpp b/src/tests/process_spawn.cpp
deleted file mode 100644
index db07ba0..0000000
--- a/src/tests/process_spawn.cpp
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <iostream>
-#include <climits>
-#include <cstdio>
-
-#include <stout/duration.hpp>
-#include <stout/os.hpp>
-
-using std::cout;
-using std::cin;
-using std::endl;
-
-
-void processinfo()
-{
-  cout << "pid: " << getpid()
-       << " pgid: " << getpgrp()
-       << " ppid: " << getppid()
-       << endl;
-}
-
-
-void dummywait()
-{
-  while(getchar() == EOF) {
-    os::sleep(Seconds(INT_MAX));
-  }
-
-  cout << "Error: Shouldn't come here" << endl;
-}
-
-
-// This program forks 2 processes in a chain. The child process exits after
-// forking its own child (the grandchild of the main). The grandchild is
-// parented by Init but is in the same group as the main process.
-int main()
-{
-  // Become session leader.
-  setsid();
-
-  pid_t pid = fork();
-
-  if (pid > 0) {
-    // Inside parent process.
-    processinfo();
-    dummywait();
-  } else {
-    // Inside child process.
-    processinfo();
-
-    pid_t pid2 = fork();
-
-    if (pid2 > 0) {
-      // Kill the child process so that the tree link is broken.
-      _exit(0);
-    } else {
-      // Inside grandchild process.
-      processinfo();
-      dummywait();
-    }
-  }
-
-  cout << "Error: Shouldn't come here!!" << endl;
-  return -1;
-}


[5/5] git commit: Added a missing test target in Makefile.am.

Posted by bm...@apache.org.
Added a missing test target in Makefile.am.

Review: https://reviews.apache.org/r/24583


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0e8fa7bf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0e8fa7bf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0e8fa7bf

Branch: refs/heads/master
Commit: 0e8fa7bf5213d235bdab524253c84a14d9e54fec
Parents: 30bc547
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Mon Aug 11 16:23:57 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Aug 13 11:54:23 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0e8fa7bf/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 39af036..60f89ed 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1059,11 +1059,12 @@ mesos_tests_SOURCES =				\
   tests/containerizer.cpp			\
   tests/containerizer_tests.cpp			\
   tests/credentials_tests.cpp			\
-  tests/docker_tests.cpp			\
   tests/docker_containerizer_tests.cpp          \
+  tests/docker_tests.cpp			\
   tests/environment.cpp				\
   tests/examples_tests.cpp			\
   tests/exception_tests.cpp			\
+  tests/external_containerizer_test.cpp		\
   tests/health_check_tests.cpp                  \
   tests/fault_tolerance_tests.cpp		\
   tests/fetcher_tests.cpp                       \
@@ -1071,7 +1072,6 @@ mesos_tests_SOURCES =				\
   tests/flags.cpp				\
   tests/gc_tests.cpp				\
   tests/isolator_tests.cpp			\
-  tests/external_containerizer_test.cpp		\
   tests/log_tests.cpp				\
   tests/logging_tests.cpp			\
   tests/main.cpp				\
@@ -1098,6 +1098,7 @@ mesos_tests_SOURCES =				\
   tests/state_tests.cpp				\
   tests/status_update_manager_tests.cpp		\
   tests/utils.cpp				\
+  tests/values_tests.cpp			\
   tests/zookeeper_url_tests.cpp
 
 mesos_tests_CPPFLAGS = $(MESOS_CPPFLAGS)