You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/08/13 21:08:00 UTC
[1/5] git commit: Expose pending tasks during reconciliation.
Repository: mesos
Updated Branches:
refs/heads/master 30fdabe1a -> 4700fadf0
Expose pending tasks during reconciliation.
Review: https://reviews.apache.org/r/24516
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4700fadf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4700fadf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4700fadf
Branch: refs/heads/master
Commit: 4700fadf0cb618489a91f48b6e57719b69ea0389
Parents: 85303d1
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Aug 8 16:19:27 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Aug 13 11:54:23 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 35 +++++++++---
src/tests/master_authorization_tests.cpp | 78 ---------------------------
src/tests/reconciliation_tests.cpp | 44 ++++++---------
3 files changed, 44 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4700fadf/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index f40a1cd..e948803 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3302,7 +3302,26 @@ void Master::reconcileTasks(
LOG(INFO) << "Performing implicit task state reconciliation for framework "
<< frameworkId;
- // TODO(bmahler): Consider sending completed tasks?
+ foreachvalue (const TaskInfo& task, framework->pendingTasks) {
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ frameworkId,
+ task.slave_id(),
+ task.task_id(),
+ TASK_STAGING,
+ "Reconciliation: Latest task state");
+
+ VLOG(1) << "Sending implicit reconciliation state "
+ << update.status().state()
+ << " for task " << update.status().task_id()
+ << " of framework " << frameworkId;
+
+ // TODO(bmahler): Consider using forward(); might lead to too
+ // much logging.
+ StatusUpdateMessage message;
+ message.mutable_update()->CopyFrom(update);
+ send(framework->pid, message);
+ }
+
foreachvalue (Task* task, framework->tasks) {
const StatusUpdate& update = protobuf::createStatusUpdate(
frameworkId,
@@ -3331,7 +3350,7 @@ void Master::reconcileTasks(
<< statuses.size() << " tasks of framework " << frameworkId;
// Explicit reconciliation occurs for the following cases:
- // (1) Task is known, but pending: no-op.
+ // (1) Task is known, but pending: TASK_STAGING.
// (2) Task is known: send the latest state.
// (3) Task is unknown, slave is registered: TASK_LOST.
// (4) Task is unknown, slave is transitioning: no-op.
@@ -3353,10 +3372,14 @@ void Master::reconcileTasks(
Task* task = framework->getTask(status.task_id());
if (framework->pendingTasks.contains(status.task_id())) {
- // (1) Task is known, but pending: no-op.
- LOG(INFO) << "Ignoring reconciliation request of task "
- << status.task_id() << " from framework " << frameworkId
- << " because the task is pending";
+ // (1) Task is known, but pending: TASK_STAGING.
+ const TaskInfo& task_ = framework->pendingTasks[status.task_id()];
+ update = protobuf::createStatusUpdate(
+ frameworkId,
+ task_.slave_id(),
+ task_.task_id(),
+ TASK_STAGING,
+ "Reconciliation: Latest task state");
} else if (task != NULL) {
// (2) Task is known: send the latest state.
update = protobuf::createStatusUpdate(
http://git-wip-us.apache.org/repos/asf/mesos/blob/4700fadf/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
index f0f0648..b9aa7bf 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -523,84 +523,6 @@ TEST_F(MasterAuthorizationTest, FrameworkRemoved)
}
-// This test verifies that a reconciliation request that comes before
-// '_launchTasks()' is ignored.
-TEST_F(MasterAuthorizationTest, ReconcileTask)
-{
- MockAuthorizer authorizer;
- Try<PID<Master> > master = StartMaster(&authorizer);
- ASSERT_SOME(master);
-
- MockExecutor exec(DEFAULT_EXECUTOR_ID);
-
- Try<PID<Slave> > slave = StartSlave(&exec);
- ASSERT_SOME(slave);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
- EXPECT_CALL(sched, registered(&driver, _, _))
- .Times(1);
-
- Future<vector<Offer> > offers;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
-
- driver.start();
-
- AWAIT_READY(offers);
- EXPECT_NE(0u, offers.get().size());
-
- TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
- vector<TaskInfo> tasks;
- tasks.push_back(task);
-
- // Return a pending future from authorizer.
- Future<Nothing> future;
- Promise<bool> promise;
- EXPECT_CALL(authorizer, authorize(An<const mesos::ACL::RunTask&>()))
- .WillOnce(DoAll(FutureSatisfy(&future),
- Return(promise.future())));
-
- driver.launchTasks(offers.get()[0].id(), tasks);
-
- // Wait until authorization is in progress.
- AWAIT_READY(future);
-
- // Scheduler shouldn't get an update from reconciliation.
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .Times(0);
-
- Future<ReconcileTasksMessage> reconcileTasksMessage =
- FUTURE_PROTOBUF(ReconcileTasksMessage(), _, _);
-
- vector<TaskStatus> statuses;
-
- TaskStatus status;
- status.mutable_task_id()->CopyFrom(task.task_id());
- status.mutable_slave_id()->CopyFrom(offers.get()[0].slave_id());
- status.set_state(TASK_STAGING);
-
- statuses.push_back(status);
-
- driver.reconcileTasks(statuses);
-
- AWAIT_READY(reconcileTasksMessage);
-
- // Make sure the framework doesn't receive any update.
- Clock::pause();
- Clock::settle();
-
- // Now stop the framework.
- driver.stop();
- driver.join();
-
- Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
-}
-
-
// This test verifies that two tasks each launched on a different
// slave with same executor id but different executor info are
// allowed even when the first task is pending due to authorization.
http://git-wip-us.apache.org/repos/asf/mesos/blob/4700fadf/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 3c4d7ed..8c66659 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -578,8 +578,7 @@ TEST_F(ReconciliationTest, ImplicitTerminalTask)
// This test ensures that reconciliation requests for tasks that are
-// pending (due to validation/authorization) do not result in status
-// updates.
+// pending are exposed in reconciliation.
TEST_F(ReconciliationTest, PendingTask)
{
MockAuthorizer authorizer;
@@ -615,10 +614,6 @@ TEST_F(ReconciliationTest, PendingTask)
AWAIT_READY(offers);
EXPECT_NE(0u, offers.get().size());
- // Framework should not receive any update.
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .Times(0);
-
// Return a pending future from authorizer.
Future<Nothing> future;
Promise<bool> promise;
@@ -635,43 +630,34 @@ TEST_F(ReconciliationTest, PendingTask)
// Wait until authorization is in progress.
AWAIT_READY(future);
- // First send an implicit reconciliation request for this task,
- // there should be no updates.
- Future<ReconcileTasksMessage> reconcileTasksMessage =
- FUTURE_PROTOBUF(ReconcileTasksMessage(), _ , _);
-
- Clock::pause();
+ // First send an implicit reconciliation request for this task.
+ Future<TaskStatus> update;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&update));
vector<TaskStatus> statuses;
driver.reconcileTasks(statuses);
- // Make sure the master received the reconcile tasks message.
- AWAIT_READY(reconcileTasksMessage);
+ AWAIT_READY(update);
+ EXPECT_EQ(TASK_STAGING, update.get().state());
+ EXPECT_TRUE(update.get().has_slave_id());
- // The Clock::settle() will ensure that framework would receive
- // a status update if it is sent by the master. In this test it
- // shouldn't receive any.
- Clock::settle();
+ // Now send an explicit reconciliation request for this task.
+ Future<TaskStatus> update2;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&update2));
- // Now send an explicit reconciliation request for this task;
- // there should be no updates.
TaskStatus status;
status.mutable_task_id()->CopyFrom(task.task_id());
status.mutable_slave_id()->CopyFrom(slaveId);
status.set_state(TASK_STAGING);
statuses.push_back(status);
- reconcileTasksMessage = FUTURE_PROTOBUF(ReconcileTasksMessage(), _ , _);
-
driver.reconcileTasks(statuses);
- // Make sure the master received the reconcile tasks message.
- AWAIT_READY(reconcileTasksMessage);
-
- // The Clock::settle() will ensure that framework would receive
- // a status update if it is sent by the master. In this test it
- // shouldn't receive any.
- Clock::settle();
+ AWAIT_READY(update2);
+ EXPECT_EQ(TASK_STAGING, update2.get().state());
+ EXPECT_TRUE(update2.get().has_slave_id());
driver.stop();
driver.join();
[4/5] git commit: Model pending tasks in the Master's metrics and
JSON.
Posted by bm...@apache.org.
Model pending tasks in the Master's metrics and JSON.
Review: https://reviews.apache.org/r/24515
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/85303d17
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/85303d17
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/85303d17
Branch: refs/heads/master
Commit: 85303d17e264fdceb9dd814018384e8759aa93b7
Parents: 3ff1180
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Aug 8 15:46:08 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Aug 13 11:54:23 2014 -0700
----------------------------------------------------------------------
src/Makefile.am | 3 +-
src/common/http.cpp | 54 +++++++++++++----
src/common/http.hpp | 15 +++++
src/master/http.cpp | 7 +++
src/master/master.cpp | 21 ++++++-
src/tests/common/http_tests.cpp | 113 +++++++++++++++++++++++++++++++++++
6 files changed, 200 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 60f89ed..0ac95b4 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1099,7 +1099,8 @@ mesos_tests_SOURCES = \
tests/status_update_manager_tests.cpp \
tests/utils.cpp \
tests/values_tests.cpp \
- tests/zookeeper_url_tests.cpp
+ tests/zookeeper_url_tests.cpp \
+ tests/common/http_tests.cpp
mesos_tests_CPPFLAGS = $(MESOS_CPPFLAGS)
mesos_tests_CPPFLAGS += -DSOURCE_DIR=\"$(abs_top_srcdir)\"
http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/common/http.cpp
----------------------------------------------------------------------
diff --git a/src/common/http.cpp b/src/common/http.cpp
index d27fe21..58050e9 100644
--- a/src/common/http.cpp
+++ b/src/common/http.cpp
@@ -16,12 +16,8 @@
* limitations under the License.
*/
-#include <map>
-#include <string>
+#include <vector>
-#include <glog/logging.h>
-
-#include <mesos/mesos.hpp>
#include <mesos/resources.hpp>
#include <stout/foreach.hpp>
@@ -32,14 +28,13 @@
#include "messages/messages.hpp"
-using std::map;
-using std::string;
+using std::vector;
namespace mesos {
namespace internal {
-// TODO(bmahler): Kill these in favor of automatic Proto->JSON Conversion (when
-// it becomes available).
+// TODO(bmahler): Kill these in favor of automatic Proto->JSON
+// Conversion (when it becomes available).
JSON::Object model(const Resources& resources)
{
@@ -118,7 +113,13 @@ JSON::Object model(const Task& task)
object.values["id"] = task.task_id().value();
object.values["name"] = task.name();
object.values["framework_id"] = task.framework_id().value();
- object.values["executor_id"] = task.executor_id().value();
+
+ if (task.has_executor_id()) {
+ object.values["executor_id"] = task.executor_id().value();
+ } else {
+ object.values["executor_id"] = "";
+ }
+
object.values["slave_id"] = task.slave_id().value();
object.values["state"] = TaskState_Name(task.state());
object.values["resources"] = model(task.resources());
@@ -132,5 +133,38 @@ JSON::Object model(const Task& task)
return object;
}
+
+// TODO(bmahler): Expose the executor name / source.
+JSON::Object model(
+ const TaskInfo& task,
+ const FrameworkID& frameworkId,
+ const TaskState& state,
+ const vector<TaskStatus>& statuses)
+{
+ JSON::Object object;
+ object.values["id"] = task.task_id().value();
+ object.values["name"] = task.name();
+ object.values["framework_id"] = frameworkId.value();
+
+ if (task.has_executor()) {
+ object.values["executor_id"] = task.executor().executor_id().value();
+ } else {
+ object.values["executor_id"] = "";
+ }
+
+ object.values["slave_id"] = task.slave_id().value();
+ object.values["state"] = TaskState_Name(state);
+ object.values["resources"] = model(task.resources());
+
+ JSON::Array array;
+ foreach (const TaskStatus& status, statuses) {
+ array.values.push_back(model(status));
+ }
+ object.values["statuses"] = array;
+
+ return object;
+}
+
+
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/common/http.hpp
----------------------------------------------------------------------
diff --git a/src/common/http.hpp b/src/common/http.hpp
index 8216401..afce7fe 100644
--- a/src/common/http.hpp
+++ b/src/common/http.hpp
@@ -19,18 +19,33 @@
#ifndef __COMMON_HTTP_HPP__
#define __COMMON_HTTP_HPP__
+#include <vector>
+
+#include <mesos/mesos.hpp>
+
#include <stout/json.hpp>
namespace mesos {
+
class Resources;
namespace internal {
+
class Attributes;
class Task;
+
JSON::Object model(const Resources& resources);
JSON::Object model(const Attributes& attributes);
+
+// These are the two identical ways to model a task, depending on
+// whether you have a 'Task' or a 'TaskInfo' available.
JSON::Object model(const Task& task);
+JSON::Object model(
+ const TaskInfo& task,
+ const FrameworkID& frameworkId,
+ const TaskState& state,
+ const std::vector<TaskStatus>& statuses);
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 9317a95..6dd11fe 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -127,6 +127,12 @@ JSON::Object model(const Framework& framework)
// Model all of the tasks associated with a framework.
{
JSON::Array array;
+
+ foreachvalue (const TaskInfo& task, framework.pendingTasks) {
+ vector<TaskStatus> statuses;
+ array.values.push_back(model(task, framework.id, TASK_STAGING, statuses));
+ }
+
foreachvalue (Task* task, framework.tasks) {
array.values.push_back(model(*task));
}
@@ -176,6 +182,7 @@ JSON::Object model(const Slave& slave)
return object;
}
+
// Returns a JSON object modeled after a Role.
JSON::Object model(const Role& role)
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 72494b5..f40a1cd 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2155,6 +2155,9 @@ void Master::launchTasks(
TASK_LOST,
"Task launched with invalid offers: " + error.get().message);
+ metrics.tasks_lost++;
+ stats.tasks[TASK_LOST]++;
+
forward(update, UPID(), framework);
}
return;
@@ -2177,6 +2180,8 @@ void Master::launchTasks(
// NOTE: We need to do this here after validation because of the
// way task validators work.
framework->pendingTasks[task.task_id()] = task;
+
+ stats.tasks[TASK_STAGING]++;
}
// Wait for all the tasks to be validated.
@@ -2322,8 +2327,6 @@ void Master::launchTask(
message.mutable_task()->MergeFrom(task);
send(slave->pid, message);
- stats.tasks[TASK_STAGING]++;
-
return;
}
@@ -2361,6 +2364,9 @@ void Master::_launchTasks(
TASK_LOST,
(slave == NULL ? "Slave removed" : "Slave disconnected"));
+ metrics.tasks_lost++;
+ stats.tasks[TASK_LOST]++;
+
forward(update, UPID(), framework);
}
@@ -2397,6 +2403,9 @@ void Master::_launchTasks(
TASK_LOST,
error);
+ metrics.tasks_lost++;
+ stats.tasks[TASK_LOST]++;
+
forward(update, UPID(), framework);
continue;
@@ -2423,6 +2432,9 @@ void Master::_launchTasks(
TASK_LOST,
error);
+ metrics.tasks_lost++;
+ stats.tasks[TASK_LOST]++;
+
forward(update, UPID(), framework);
continue;
@@ -4471,6 +4483,11 @@ double Master::_tasks_staging()
{
double count = 0.0;
+ // Add the tasks pending validation / authorization.
+ foreachvalue (Framework* framework, frameworks.registered) {
+ count += framework->pendingTasks.size();
+ }
+
foreachvalue (Slave* slave, slaves.registered) {
typedef hashmap<TaskID, Task*> TaskMap;
foreachvalue (const TaskMap& tasks, slave->tasks) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/85303d17/src/tests/common/http_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/common/http_tests.cpp b/src/tests/common/http_tests.cpp
new file mode 100644
index 0000000..5fa51bf
--- /dev/null
+++ b/src/tests/common/http_tests.cpp
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include <vector>
+
+#include <mesos/mesos.hpp>
+
+#include <stout/gtest.hpp>
+#include <stout/json.hpp>
+#include <stout/stringify.hpp>
+
+#include "common/http.hpp"
+#include "common/protobuf_utils.hpp"
+
+#include "messages/messages.hpp"
+
+using std::vector;
+
+using namespace mesos;
+using namespace mesos::internal;
+
+// TODO(bmahler): Add tests for other JSON models.
+
+// This test ensures we don't break the API when it comes to JSON
+// representation of tasks. Also, we want to ensure that tasks are
+// modeled the same way when using 'Task' vs. 'TaskInfo'.
+TEST(HTTP, ModelTask)
+{
+ TaskID taskId;
+ taskId.set_value("t");
+
+ SlaveID slaveId;
+ slaveId.set_value("s");
+
+ ExecutorID executorId;
+ executorId.set_value("e");
+
+ FrameworkID frameworkId;
+ frameworkId.set_value("f");
+
+ TaskState state = TASK_RUNNING;
+
+ vector<TaskStatus> statuses;
+
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(taskId);
+ status.set_state(state);
+ status.mutable_slave_id()->CopyFrom(slaveId);
+ status.mutable_executor_id()->CopyFrom(executorId);
+ status.set_timestamp(0.0);
+
+ statuses.push_back(status);
+
+ TaskInfo task;
+ task.set_name("task");
+ task.mutable_task_id()->CopyFrom(taskId);
+ task.mutable_slave_id()->CopyFrom(slaveId);
+ task.mutable_command()->set_value("echo hello");
+
+ Task task_ = protobuf::createTask(task, state, executorId, frameworkId);
+ task_.add_statuses()->CopyFrom(statuses[0]);
+
+ JSON::Value object = model(task, frameworkId, state, statuses);
+ JSON::Value object_ = model(task_);
+
+ Try<JSON::Value> expected = JSON::parse(
+ "{"
+ " \"executor_id\":\"\","
+ " \"framework_id\":\"f\","
+ " \"id\":\"t\","
+ " \"name\":\"task\","
+ " \"resources\":"
+ " {"
+ " \"cpus\":0,"
+ " \"disk\":0,"
+ " \"mem\":0"
+ " },"
+ " \"slave_id\":\"s\","
+ " \"state\":\"TASK_RUNNING\","
+ " \"statuses\":"
+ " ["
+ " {"
+ " \"state\":\"TASK_RUNNING\","
+ " \"timestamp\":0"
+ " }"
+ " ]"
+ "}");
+
+ ASSERT_SOME(expected);
+
+ EXPECT_EQ(expected.get(), object);
+ EXPECT_EQ(expected.get(), object_);
+
+ // Ensure both are modeled the same.
+ EXPECT_EQ(object, object_);
+}
[2/5] git commit: Some Master cleanups.
Posted by bm...@apache.org.
Some Master cleanups.
Review: https://reviews.apache.org/r/24576
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3ff1180d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3ff1180d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3ff1180d
Branch: refs/heads/master
Commit: 3ff1180d2ee0e1d219335dbad38b7537ecaf9311
Parents: 0e8fa7b
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Mon Aug 11 13:44:13 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Aug 13 11:54:23 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 142 ++++++++++++++++-----------------------------
1 file changed, 51 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3ff1180d/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a8cf9ba..72494b5 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2098,87 +2098,54 @@ void Master::launchTasks(
return;
}
- // TODO(bmahler): This case can be caught during offer validation.
- if (offerIds.empty()) {
- LOG(WARNING) << "No offers to launch tasks on";
-
- foreach (const TaskInfo& task, tasks) {
- const StatusUpdate& update = protobuf::createStatusUpdate(
- framework->id,
- task.slave_id(),
- task.task_id(),
- TASK_LOST,
- "Task launched without offers");
-
- forward(update, UPID(), framework);
- }
- return;
- }
-
- // Common slave id for task validation.
- Option<SlaveID> slaveId;
-
- // Create offer visitors.
- list<OfferVisitor*> offerVisitors;
- offerVisitors.push_back(new ValidOfferChecker());
- offerVisitors.push_back(new FrameworkChecker());
- offerVisitors.push_back(new SlaveChecker());
- offerVisitors.push_back(new UniqueOfferIDChecker());
-
- // Verify and aggregate all offers.
- // Abort offer and task processing if any offer validation failed.
- Resources totalResources;
+ // TODO(bmahler): We currently only support using multiple offers
+ // for a single slave.
+ Resources used;
+ Option<SlaveID> slaveId = None();
Option<Error> error = None();
- foreach (const OfferID& offerId, offerIds) {
- foreach (OfferVisitor* visitor, offerVisitors) {
- error = (*visitor)(offerId, *framework, this);
- if (error.isSome()) {
- break;
- }
- }
- // Offer validation error needs to be propagated from visitor
- // loop above.
- if (error.isSome()) {
- break;
- }
- // If offer validation succeeds, we need to pass along the common
- // slave. So optimistically, we store the first slave id we see.
- // In case of invalid offers (different slaves for example), we
- // report error and return from launchTask before slaveId is used.
- if (slaveId.isNone()) {
- slaveId = getOffer(offerId)->slave_id();
+ if (offerIds.empty()) {
+ error = Error("No offers specified");
+ } else {
+ list<Owned<OfferVisitor> > offerVisitors;
+ offerVisitors.push_back(Owned<OfferVisitor>(new ValidOfferChecker()));
+ offerVisitors.push_back(Owned<OfferVisitor>(new FrameworkChecker()));
+ offerVisitors.push_back(Owned<OfferVisitor>(new SlaveChecker()));
+ offerVisitors.push_back(Owned<OfferVisitor>(new UniqueOfferIDChecker()));
+
+ // Validate the offers.
+ foreach (const OfferID& offerId, offerIds) {
+ foreach (const Owned<OfferVisitor>& visitor, offerVisitors) {
+ if (error.isNone()) {
+ error = (*visitor)(offerId, *framework, this);
+ }
+ }
}
- totalResources += getOffer(offerId)->resources();
- }
-
- // Cleanup visitors.
- while (!offerVisitors.empty()) {
- OfferVisitor* visitor = offerVisitors.front();
- offerVisitors.pop_front();
- delete visitor;
- };
-
- // Remove offers and recover resources if any of the offers are
- // invalid.
- foreach (const OfferID& offerId, offerIds) {
- Offer* offer = getOffer(offerId);
- if (offer != NULL) {
- if (error.isSome()) {
- allocator->resourcesRecovered(
- offer->framework_id(),
- offer->slave_id(),
- offer->resources(),
- None());
+ // Compute used resources and remove the offers. If the
+ // validation failed, return resources to the allocator.
+ foreach (const OfferID& offerId, offerIds) {
+ Offer* offer = getOffer(offerId);
+ if (offer != NULL) {
+ slaveId = offer->slave_id();
+ used += offer->resources();
+
+ if (error.isSome()) {
+ allocator->resourcesRecovered(
+ offer->framework_id(),
+ offer->slave_id(),
+ offer->resources(),
+ None());
+ }
+ removeOffer(offer);
}
- removeOffer(offer);
}
}
+ // If invalid, send TASK_LOST for the launch attempts.
if (error.isSome()) {
- LOG(WARNING) << "Failed to validate offer " << stringify(offerIds)
- << ": " << error.get().message;
+ LOG(WARNING) << "Launch tasks message used invalid offers '"
+ << stringify(offerIds) << "': " << error.get().message;
foreach (const TaskInfo& task, tasks) {
const StatusUpdate& update = protobuf::createStatusUpdate(
@@ -2193,7 +2160,7 @@ void Master::launchTasks(
return;
}
- CHECK(slaveId.isSome()) << "Slave id not found";
+ CHECK_SOME(slaveId);
Slave* slave = CHECK_NOTNULL(getSlave(slaveId.get()));
LOG(INFO) << "Processing reply for offers: "
@@ -2204,7 +2171,7 @@ void Master::launchTasks(
// Validate each task and launch if valid.
list<Future<Option<Error> > > futures;
foreach (const TaskInfo& task, tasks) {
- futures.push_back(validateTask(task, framework, slave, totalResources));
+ futures.push_back(validateTask(task, framework, slave, used));
// Add to pending tasks.
// NOTE: We need to do this here after validation because of the
@@ -2221,7 +2188,7 @@ void Master::launchTasks(
framework->id,
slaveId.get(),
tasks,
- totalResources,
+ used,
filters,
lambda::_1));
}
@@ -2237,15 +2204,15 @@ Future<Option<Error> > Master::validateTask(
CHECK_NOTNULL(slave);
// Create task visitors.
- // TODO(vinod): Create the visitors on the heap and make the visit
+ // TODO(vinod): Create the visitors on the stack and make the visit
// operation const.
- list<TaskInfoVisitor*> taskVisitors;
- taskVisitors.push_back(new TaskIDChecker());
- taskVisitors.push_back(new SlaveIDChecker());
- taskVisitors.push_back(new UniqueTaskIDChecker());
- taskVisitors.push_back(new ResourceUsageChecker());
- taskVisitors.push_back(new ExecutorInfoChecker());
- taskVisitors.push_back(new CheckpointChecker());
+ list<Owned<TaskInfoVisitor> > taskVisitors;
+ taskVisitors.push_back(Owned<TaskInfoVisitor>(new TaskIDChecker()));
+ taskVisitors.push_back(Owned<TaskInfoVisitor>(new SlaveIDChecker()));
+ taskVisitors.push_back(Owned<TaskInfoVisitor>(new UniqueTaskIDChecker()));
+ taskVisitors.push_back(Owned<TaskInfoVisitor>(new ResourceUsageChecker()));
+ taskVisitors.push_back(Owned<TaskInfoVisitor>(new ExecutorInfoChecker()));
+ taskVisitors.push_back(Owned<TaskInfoVisitor>(new CheckpointChecker()));
// TODO(benh): Add a HealthCheckChecker visitor.
@@ -2253,20 +2220,13 @@ Future<Option<Error> > Master::validateTask(
// Invoke each visitor.
Option<Error> error = None();
- foreach (TaskInfoVisitor* visitor, taskVisitors) {
+ foreach (const Owned<TaskInfoVisitor>& visitor, taskVisitors) {
error = (*visitor)(task, totalResources, *framework, *slave);
if (error.isSome()) {
break;
}
}
- // Cleanup visitors.
- while (!taskVisitors.empty()) {
- TaskInfoVisitor* visitor = taskVisitors.front();
- taskVisitors.pop_front();
- delete visitor;
- };
-
if (error.isSome()) {
return Error(error.get().message);
}
[3/5] git commit: Removed unused test file 'process_spawn.cpp'.
Posted by bm...@apache.org.
Removed unused test file 'process_spawn.cpp'.
Review: https://reviews.apache.org/r/24582
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/30bc5477
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/30bc5477
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/30bc5477
Branch: refs/heads/master
Commit: 30bc547783741654a56d66b32040da521cfa6f25
Parents: 30fdabe
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Mon Aug 11 16:23:34 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Aug 13 11:54:23 2014 -0700
----------------------------------------------------------------------
src/tests/process_spawn.cpp | 82 ----------------------------------------
1 file changed, 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/30bc5477/src/tests/process_spawn.cpp
----------------------------------------------------------------------
diff --git a/src/tests/process_spawn.cpp b/src/tests/process_spawn.cpp
deleted file mode 100644
index db07ba0..0000000
--- a/src/tests/process_spawn.cpp
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <iostream>
-#include <climits>
-#include <cstdio>
-
-#include <stout/duration.hpp>
-#include <stout/os.hpp>
-
-using std::cout;
-using std::cin;
-using std::endl;
-
-
-void processinfo()
-{
- cout << "pid: " << getpid()
- << " pgid: " << getpgrp()
- << " ppid: " << getppid()
- << endl;
-}
-
-
-void dummywait()
-{
- while(getchar() == EOF) {
- os::sleep(Seconds(INT_MAX));
- }
-
- cout << "Error: Shouldn't come here" << endl;
-}
-
-
-// This program forks 2 processes in a chain. The child process exits after
-// forking its own child (the grandchild of the main). The grandchild is
-// parented by Init but is in the same group as the main process.
-int main()
-{
- // Become session leader.
- setsid();
-
- pid_t pid = fork();
-
- if (pid > 0) {
- // Inside parent process.
- processinfo();
- dummywait();
- } else {
- // Inside child process.
- processinfo();
-
- pid_t pid2 = fork();
-
- if (pid2 > 0) {
- // Kill the child process so that the tree link is broken.
- _exit(0);
- } else {
- // Inside grandchild process.
- processinfo();
- dummywait();
- }
- }
-
- cout << "Error: Shouldn't come here!!" << endl;
- return -1;
-}
[5/5] git commit: Added a missing test target in Makefile.am.
Posted by bm...@apache.org.
Added a missing test target in Makefile.am.
Review: https://reviews.apache.org/r/24583
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0e8fa7bf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0e8fa7bf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0e8fa7bf
Branch: refs/heads/master
Commit: 0e8fa7bf5213d235bdab524253c84a14d9e54fec
Parents: 30bc547
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Mon Aug 11 16:23:57 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Aug 13 11:54:23 2014 -0700
----------------------------------------------------------------------
src/Makefile.am | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0e8fa7bf/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 39af036..60f89ed 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1059,11 +1059,12 @@ mesos_tests_SOURCES = \
tests/containerizer.cpp \
tests/containerizer_tests.cpp \
tests/credentials_tests.cpp \
- tests/docker_tests.cpp \
tests/docker_containerizer_tests.cpp \
+ tests/docker_tests.cpp \
tests/environment.cpp \
tests/examples_tests.cpp \
tests/exception_tests.cpp \
+ tests/external_containerizer_test.cpp \
tests/health_check_tests.cpp \
tests/fault_tolerance_tests.cpp \
tests/fetcher_tests.cpp \
@@ -1071,7 +1072,6 @@ mesos_tests_SOURCES = \
tests/flags.cpp \
tests/gc_tests.cpp \
tests/isolator_tests.cpp \
- tests/external_containerizer_test.cpp \
tests/log_tests.cpp \
tests/logging_tests.cpp \
tests/main.cpp \
@@ -1098,6 +1098,7 @@ mesos_tests_SOURCES = \
tests/state_tests.cpp \
tests/status_update_manager_tests.cpp \
tests/utils.cpp \
+ tests/values_tests.cpp \
tests/zookeeper_url_tests.cpp
mesos_tests_CPPFLAGS = $(MESOS_CPPFLAGS)