You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/06/13 00:53:16 UTC

[01/12] git commit: Fixed master to properly do task reconciliation when slave re-registers.

Updated Branches:
  refs/heads/0.13.x cc5e1e7b7 -> 09f4c11b8
  refs/heads/master 46644b668 -> 90bcfe575


Fixed master to properly do task reconciliation when slave re-registers.

Review: https://reviews.apache.org/r/11229


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/25f136a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/25f136a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/25f136a3

Branch: refs/heads/0.13.x
Commit: 25f136a38c77a0cef6cd3b567fa731a68d4a242e
Parents: 22e6450
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri May 17 17:09:36 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri Jun 7 15:19:28 2013 -0700

----------------------------------------------------------------------
 src/master/master.cpp               | 91 ++++++++++++++++---------------
 src/master/master.hpp               | 17 ++++--
 src/slave/slave.cpp                 |  5 ++
 src/slave/slave.hpp                 |  2 -
 src/tests/fault_tolerance_tests.cpp | 93 +++++++++++++++++++++++++++++++-
 5 files changed, 156 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/25f136a3/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d031b95..6ac86cd 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -943,50 +943,8 @@ void Master::reregisterSlave(const SlaveID& slaveId,
                    << ") is being allowed to re-register with an already"
                    << " in use id (" << slaveId << ")";
 
-      // Consolidate tasks between master and the slave.
-      // Fist, look for the tasks present in the slave but not present
-      // in the master.
-      multihashmap<FrameworkID, TaskID> slaveTasks;
-      foreach (const Task& task, tasks) {
-        if (!slave->tasks.contains(
-            std::make_pair(task.framework_id(), task.task_id()))) {
-          // This might happen if a terminal status update for this task
-          // came before the slave re-registered message.
-          // TODO(vinod): Consider sending a KillTaskMessage.
-          // TODO(vinod): Export a statistic for these tasks.
-          LOG(WARNING) << "Slave " << slaveId << " attempted to re-register"
-                       << " with unknown task " << task.task_id()
-                       << " of framework " << task.framework_id();
-        }
-        slaveTasks.put(task.framework_id(), task.task_id());
-      }
-
-      // Send TASK_LOST updates for tasks present in the master but
-      // missing from the slave. This could happen if the task was
-      // dropped by the slave (e.g., slave exited before getting the
-      // task or the task was launched while slave was in recovery).
-      foreachvalue (Task* task, utils::copy(slave->tasks)) {
-        if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
-          LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
-                       << " of framework " << task->framework_id()
-                       << " unknown to the slave " << slaveId;
-
-          Framework* framework = getFramework(task->framework_id());
-          if (framework != NULL) {
-            const StatusUpdate& update = protobuf::createStatusUpdate(
-                task->framework_id(),
-                slaveId,
-                task->task_id(),
-                TASK_LOST,
-                "Task was not received by the slave");
-
-            StatusUpdateMessage message;
-            message.mutable_update()->CopyFrom(update);
-            send(framework->pid, message);
-          }
-          removeTask(task);
-        }
-      }
+      // Reconcile tasks between master and the slave.
+      reconcileTasks(slave, tasks);
 
       SlaveReregisteredMessage message;
       message.mutable_slave_id()->MergeFrom(slave->id);
@@ -1646,6 +1604,46 @@ Resources Master::launchTask(const TaskInfo& task,
 }
 
 
+void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
+{
+  CHECK_NOTNULL(slave);
+
+  // We convert the 'tasks' into a map for easier lookup below.
+  // TODO(vinod): Check if the tasks are known to the master.
+  multihashmap<FrameworkID, TaskID> slaveTasks;
+  foreach (const Task& task, tasks) {
+    slaveTasks.put(task.framework_id(), task.task_id());
+  }
+
+  // Send TASK_LOST updates for tasks present in the master but
+  // missing from the slave. This could happen if the task was
+  // dropped by the slave (e.g., slave exited before getting the
+  // task or the task was launched while slave was in recovery).
+  foreachvalue (Task* task, utils::copy(slave->tasks)) {
+    if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
+      LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
+                   << " of framework " << task->framework_id()
+                   << " unknown to the slave " << slave->id;
+
+      Framework* framework = getFramework(task->framework_id());
+      if (framework != NULL) {
+        const StatusUpdate& update = protobuf::createStatusUpdate(
+            task->framework_id(),
+            slave->id,
+            task->task_id(),
+            TASK_LOST,
+            "Task was not received by the slave");
+
+        StatusUpdateMessage message;
+        message.mutable_update()->CopyFrom(update);
+        send(framework->pid, message);
+      }
+      removeTask(task);
+    }
+  }
+}
+
+
 void Master::addFramework(Framework* framework, bool reregister)
 {
   CHECK(frameworks.count(framework->id) == 0);
@@ -1918,6 +1916,11 @@ void Master::readdSlave(Slave* slave,
   }
 
   foreach (const Task& task, tasks) {
+    // Ignore tasks that have reached terminal state.
+    if (protobuf::isTerminalState(task.state())) {
+      continue;
+    }
+
     Task* t = new Task(task);
 
     // Add the task to the slave.

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/25f136a3/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 82e3596..f0b1def 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -127,11 +127,18 @@ protected:
   // Process a launch tasks request (for a non-cancelled offer) by
   // launching the desired tasks (if the offer contains a valid set of
   // tasks) and reporting any unused resources to the allocator.
-  void processTasks(Offer* offer,
-                    Framework* framework,
-                    Slave* slave,
-                    const std::vector<TaskInfo>& tasks,
-                    const Filters& filters);
+  void processTasks(
+      Offer* offer,
+      Framework* framework,
+      Slave* slave,
+      const std::vector<TaskInfo>& tasks,
+      const Filters& filters);
+
+  // Reconciles a re-registering slave's tasks and sends TASK_LOST
+  // updates for tasks known to the master but unknown to the slave.
+  void reconcileTasks(
+      Slave* slave,
+      const std::vector<Task>& tasks);
 
   // Add a framework.
   void addFramework(Framework* framework, bool reregister = false);

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/25f136a3/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 840c64d..b5b7e0e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -703,6 +703,11 @@ void Slave::doReliableRegistration()
 
           message.add_tasks()->CopyFrom(t);
         }
+
+        // Add terminated tasks.
+        foreachvalue (Task* task, executor->terminatedTasks) {
+          message.add_tasks()->CopyFrom(*task);
+        }
       }
     }
     send(master, message);

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/25f136a3/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 5aba7ed..d1ba82e 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -376,8 +376,6 @@ struct Executor
   hashmap<TaskID, Task*> terminatedTasks; // Terminated but pending updates.
   boost::circular_buffer<Task> completedTasks; // Terminated and updates acked.
 
-  multihashmap<TaskID, UUID> updates; // Pending updates.
-
 private:
   Executor(const Executor&);              // No copying.
   Executor& operator = (const Executor&); // No assigning.

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/25f136a3/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index ef570b7..4afbbec 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -1378,7 +1378,7 @@ TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
 // This test verifies that the master sends TASK_LOST updates
 // for tasks in the master absent from the re-registered slave.
 // We do this by dropping RunTaskMessage from master to the slave.
-TEST_F(FaultToleranceTest, ConsolidateTasksOnSlaveReregistration)
+TEST_F(FaultToleranceTest, ReconcileLostTasks)
 {
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
@@ -1448,3 +1448,94 @@ TEST_F(FaultToleranceTest, ConsolidateTasksOnSlaveReregistration)
 
   Shutdown();
 }
+
+
+// This test verifies that when the slave re-registers, the master
+// does not send TASK_LOST update for a task that has reached terminal
+// state but is waiting for an acknowledgement.
+TEST_F(FaultToleranceTest, ReconcileIncompleteTasks)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave> > slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task;
+  task.set_name("test task");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  // Send a terminal update right away.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  // Drop the status update from slave to the master, so that
+  // the slave has a pending terminal update when it re-registers.
+  DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+  Future<Nothing> _statusUpdate = FUTURE_DISPATCH(_, &Slave::_statusUpdate);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Clock::pause();
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(_statusUpdate);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Simulate a spurious newMasterDetected event (e.g., due to ZooKeeper
+  // expiration) at the slave to force re-registration.
+  NewMasterDetectedMessage message;
+  message.set_pid(master.get());
+
+  process::post(slave.get(), message);
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // The master should not send a TASK_LOST after the slave
+  // re-registers. We check this by advancing the clock so that
+  // the only update the scheduler receives is the retried
+  // TASK_FINISHED update.
+  Clock::advance(STATUS_UPDATE_RETRY_INTERVAL);
+
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_FINISHED, status.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}


[03/12] git commit: Updated the NOTICE to include the correct year, and to fix line wrapping.

Posted by vi...@apache.org.
Updated the NOTICE to include the correct year, and to fix line
wrapping.

Review: https://reviews.apache.org/r/11696


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/3f809e07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/3f809e07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/3f809e07

Branch: refs/heads/0.13.x
Commit: 3f809e076467c785d3877ede9e960ecdf789d9a5
Parents: 5912aa2
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jun 6 15:17:57 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri Jun 7 15:19:28 2013 -0700

----------------------------------------------------------------------
 NOTICE | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/3f809e07/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index f92cbb4..aa2fc8b 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
 Apache Mesos
-Copyright 2012, The Apache Software Foundation
+Copyright 2013, The Apache Software Foundation
 
-This product includes software developed by The Apache Software
-Foundation (http://www.apache.org/).
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).


[07/12] git commit: Added a new 'statistics.json' endpoint to the ResourceMonitor, this deprecates the old usage.json endpoint.

Posted by vi...@apache.org.
Added a new 'statistics.json' endpoint to the ResourceMonitor, this
deprecates the old usage.json endpoint.

This is v2 of the monitoring statistics, added with a new 'statistics.json' endpoint.

The 'usage.json' endpoint is now deprecated, and will be removed at a later point.

From: Ben Mahler <be...@gmail.com>
Review: https://reviews.apache.org/r/11641


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/062c850a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/062c850a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/062c850a

Branch: refs/heads/0.13.x
Commit: 062c850ae59a737bf2047a42e23012cdd1c84cee
Parents: 25f136a
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri Jun 7 16:17:19 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri Jun 7 16:18:05 2013 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto      |  15 ++--
 src/slave/cgroups_isolator.cpp |  21 ++++-
 src/slave/cgroups_isolator.hpp |   2 +
 src/slave/monitor.cpp          | 155 +++++++++++++++++++++++++++++++-----
 src/slave/monitor.hpp          |  10 +++
 src/slave/process_isolator.cpp |  51 +++++++++---
 src/slave/process_isolator.hpp |   1 +
 src/tests/isolator_tests.cpp   |  21 +++--
 src/tests/monitor_tests.cpp    |  77 +++++++++++++++---
 9 files changed, 294 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/062c850a/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index ece6559..8cbcd9a 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -263,15 +263,18 @@ message ResourceStatistics {
   required double timestamp = 1; // Snapshot time, in seconds since the Epoch.
 
   // CPU Usage Information:
-  // A percentage based cpu usage rate since the last snapshot.
-  // This is akin to what the 'top' program shows.
-  optional double cpu_usage = 2;
   // Total CPU time spent in user mode, and kernel mode.
-  optional double cpu_user_time = 3;   // In seconds.
-  optional double cpu_system_time = 4; // In seconds.
+  optional double cpus_user_time_secs = 2;
+  optional double cpus_system_time_secs = 3;
+
+  // Number of CPUs allocated.
+  required double cpus_limit = 4;
 
   // Memory Usage Information:
-  optional uint64 memory_rss = 5; // Resident Set Size (in bytes).
+  optional uint64 mem_rss_bytes = 5; // Resident Set Size.
+
+  // Amount of memory resources allocated.
+  optional uint64 mem_limit_bytes = 6;
 
   // TODO(bmahler): Add disk usage.
   // TODO(bmahler): Add network usage?

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/062c850a/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 9b3a3a5..553844b 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -651,6 +651,8 @@ void CgroupsIsolator::resourcesChanged(
     return;
   }
 
+  info->resources = resources;
+
   LOG(INFO) << "Changing cgroup controls for executor " << executorId
             << " of framework " << frameworkId
             << " with resources " << resources;
@@ -689,6 +691,17 @@ Future<ResourceStatistics> CgroupsIsolator::usage(
   ResourceStatistics result;
   result.set_timestamp(Clock::now().secs());
 
+  // Set the resource allocations.
+  Option<Bytes> mem = info->resources.mem();
+  if (mem.isSome()) {
+    result.set_mem_limit_bytes(mem.get().bytes());
+  }
+
+  Option<double> cpus = info->resources.cpus();
+  if (cpus.isSome()) {
+    result.set_cpus_limit(cpus.get());
+  }
+
   Try<hashmap<string, uint64_t> > stat =
     cgroups::stat(hierarchy, info->name(), "cpuacct.stat");
 
@@ -700,8 +713,10 @@ Future<ResourceStatistics> CgroupsIsolator::usage(
   // TODO(bmahler): Add namespacing to cgroups to enforce the expected
   // structure, e.g., cgroups::cpuacct::stat.
   if (stat.get().contains("user") && stat.get().contains("system")) {
-    result.set_cpu_user_time((double) stat.get()["user"] / (double) ticks);
-    result.set_cpu_system_time((double) stat.get()["system"] / (double) ticks);
+    result.set_cpus_user_time_secs(
+        (double) stat.get()["user"] / (double) ticks);
+    result.set_cpus_system_time_secs(
+        (double) stat.get()["system"] / (double) ticks);
   }
 
   stat = cgroups::stat(hierarchy, info->name(), "memory.stat");
@@ -714,7 +729,7 @@ Future<ResourceStatistics> CgroupsIsolator::usage(
   // TODO(bmahler): Add namespacing to cgroups to enforce the expected
   // structure, e.g, cgroups::memory::stat.
   if (stat.get().contains("rss")) {
-    result.set_memory_rss(stat.get()["rss"]);
+    result.set_mem_rss_bytes(stat.get()["rss"]);
   }
 
   return result;

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/062c850a/src/slave/cgroups_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.hpp b/src/slave/cgroups_isolator.hpp
index 7b8270d..124a4b3 100644
--- a/src/slave/cgroups_isolator.hpp
+++ b/src/slave/cgroups_isolator.hpp
@@ -169,6 +169,8 @@ private:
 
     Flags flags; // Slave flags.
 
+    Resources resources; // Resources allocated to the cgroup.
+
     // Used to cancel the OOM listening.
     process::Future<uint64_t> oomNotifier;
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/062c850a/src/slave/monitor.cpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.cpp b/src/slave/monitor.cpp
index 5de1c15..4f3c91f 100644
--- a/src/slave/monitor.cpp
+++ b/src/slave/monitor.cpp
@@ -37,6 +37,8 @@
 
 using namespace process;
 
+using process::statistics;
+
 using std::map;
 using std::string;
 
@@ -49,7 +51,15 @@ using process::wait; // Necessary on some OS's to disambiguate.
 // Resource statistics constants.
 // These match the names in the ResourceStatistics protobuf.
 // TODO(bmahler): Later, when we have a richer monitoring story,
-// we will want to publish these outisde of this file.
+// we will want to publish these outside of this file.
+const std::string CPUS_TIME_SECS        = "cpus_time_secs";
+const std::string CPUS_USER_TIME_SECS   = "cpus_user_time_secs";
+const std::string CPUS_SYSTEM_TIME_SECS = "cpus_system_time_secs";
+const std::string CPUS_LIMIT            = "cpus_limit";
+const std::string MEM_RSS_BYTES         = "mem_rss_bytes";
+const std::string MEM_LIMIT_BYTES       = "mem_limit_bytes";
+
+// TODO(bmahler): Deprecated statistical names, these will be removed!
 const std::string CPU_TIME   = "cpu_time";
 const std::string CPU_USAGE  = "cpu_usage";
 const std::string MEMORY_RSS = "memory_rss";
@@ -61,11 +71,15 @@ void publish(
     const ExecutorID& executorId,
     const ResourceStatistics& statistics);
 
-Future<http::Response> _usage(
+Future<http::Response> _statisticsJSON(
     const hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> >& executors,
     const map<string, double>& statistics,
     const Option<string>& jsonp);
 
+Future<http::Response> _usage(
+    const hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> >& executors,
+    const map<string, double>& statistics,
+    const Option<string>& jsonp);
 
 Future<Nothing> ResourceMonitorProcess::watch(
     const FrameworkID& frameworkId,
@@ -84,9 +98,9 @@ Future<Nothing> ResourceMonitorProcess::watch(
   const string& prefix =
     strings::join("/", frameworkId.value(), executorId.value(), "");
 
-  process::statistics->meter(
+  ::statistics->meter(
       "monitor",
-      prefix + CPU_TIME,
+      prefix + CPUS_TIME_SECS,
       new meters::TimeRate(prefix + CPU_USAGE));
 
   // Schedule the resource collection.
@@ -105,8 +119,13 @@ Future<Nothing> ResourceMonitorProcess::unwatch(
 
   // In case we've already noticed the executor was terminated,
   // we need to archive the statistics first.
-  process::statistics->archive("monitor", prefix + MEMORY_RSS);
-  process::statistics->archive("monitor", prefix + CPU_TIME);
+  // No need to archive CPUS_USAGE as it is implicitly archived along
+  // with CPUS_TIME_SECS.
+  ::statistics->archive("monitor", prefix + CPUS_USER_TIME_SECS);
+  ::statistics->archive("monitor", prefix + CPUS_SYSTEM_TIME_SECS);
+  ::statistics->archive("monitor", prefix + CPUS_LIMIT);
+  ::statistics->archive("monitor", prefix + MEM_RSS_BYTES);
+  ::statistics->archive("monitor", prefix + MEM_LIMIT_BYTES);
 
   if (!watches.contains(frameworkId) ||
       !watches[frameworkId].contains(executorId)) {
@@ -195,20 +214,114 @@ void publish(
   const string& prefix =
     strings::join("/", frameworkId.value(), executorId.value(), "");
 
-  // Publish memory statistic.
-  process::statistics->set(
+  // Publish cpu usage statistics.
+  ::statistics->set(
+      "monitor",
+      prefix + CPUS_USER_TIME_SECS,
+      statistics.cpus_user_time_secs(),
+      time);
+  ::statistics->set(
       "monitor",
-      prefix + MEMORY_RSS,
-      statistics.memory_rss(),
+      prefix + CPUS_SYSTEM_TIME_SECS,
+      statistics.cpus_system_time_secs(),
+      time);
+  ::statistics->set(
+      "monitor",
+      prefix + CPUS_LIMIT,
+      statistics.cpus_limit(),
+      time);
+  // The applied meter from watch() will publish the cpu usage.
+  ::statistics->set(
+      "monitor",
+      prefix + CPUS_TIME_SECS,
+      statistics.cpus_user_time_secs() + statistics.cpus_system_time_secs(),
       time);
 
-  // Publish cpu usage statistics. The applied meter from watch()
-  // will publish the cpu usage percentage.
-  process::statistics->set(
+  // Publish memory statistics.
+  ::statistics->set(
       "monitor",
-      prefix + CPU_TIME,
-      statistics.cpu_user_time() + statistics.cpu_system_time(),
+      prefix + MEM_RSS_BYTES,
+      statistics.mem_rss_bytes(),
       time);
+  ::statistics->set(
+      "monitor",
+      prefix + MEM_LIMIT_BYTES,
+      statistics.mem_limit_bytes(),
+      time);
+}
+
+
+Future<http::Response> ResourceMonitorProcess::statisticsJSON(
+    const http::Request& request)
+{
+  lambda::function<Future<http::Response>(const map<string, double>&)>
+    _statisticsJSON = lambda::bind(
+      slave::_statisticsJSON,
+      watches,
+      lambda::_1,
+      request.query.get("jsonp"));
+
+  return ::statistics->get("monitor").then(_statisticsJSON);
+}
+
+
+Future<http::Response> _statisticsJSON(
+    const hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> >& watches,
+    const map<string, double>& statistics,
+    const Option<string>& jsonp)
+{
+  JSON::Array result;
+
+  foreachkey (const FrameworkID& frameworkId, watches) {
+    foreachkey (const ExecutorID& executorId, watches.get(frameworkId).get()) {
+      const ExecutorInfo& info =
+        watches.get(frameworkId).get().get(executorId).get();
+      const string& prefix =
+        strings::join("/", frameworkId.value(), executorId.value(), "");
+
+      // Export zero values by default.
+      JSON::Object usage;
+      usage.values[CPUS_USER_TIME_SECS] = 0;
+      usage.values[CPUS_SYSTEM_TIME_SECS] = 0;
+      usage.values[CPUS_LIMIT] = 0;
+      usage.values[MEM_RSS_BYTES] = 0;
+      usage.values[MEM_LIMIT_BYTES] = 0;
+
+      // Set the cpu usage data if present.
+      if (statistics.count(prefix + CPUS_USER_TIME_SECS) > 0) {
+        usage.values[CPUS_USER_TIME_SECS] =
+          statistics.find(prefix + CPUS_USER_TIME_SECS)->second;
+      }
+      if (statistics.count(prefix + CPUS_SYSTEM_TIME_SECS) > 0) {
+        usage.values[CPUS_SYSTEM_TIME_SECS] =
+          statistics.find(prefix + CPUS_SYSTEM_TIME_SECS)->second;
+      }
+      if (statistics.count(prefix + CPUS_LIMIT) > 0) {
+        usage.values[CPUS_LIMIT] = statistics.find(prefix + CPUS_LIMIT)->second;
+      }
+
+      // Set the memory usage data if present.
+      if (statistics.count(prefix + MEM_RSS_BYTES) > 0) {
+        usage.values[MEM_RSS_BYTES] =
+          statistics.find(prefix + MEM_RSS_BYTES)->second;
+      }
+      if (statistics.count(prefix + MEM_LIMIT_BYTES) > 0) {
+        usage.values[MEM_LIMIT_BYTES] =
+          statistics.find(prefix + MEM_LIMIT_BYTES)->second;
+      }
+
+      JSON::Object entry;
+      entry.values["framework_id"] = frameworkId.value();
+      entry.values["executor_id"] = executorId.value();
+      entry.values["executor_name"] = info.name();
+      entry.values["source"] = info.source();
+      entry.values["statistics"] = usage;
+
+      result.values.push_back(entry);
+    }
+  }
+
+  return http::OK(result, jsonp);
 }
 
 
@@ -222,7 +335,7 @@ Future<http::Response> ResourceMonitorProcess::usage(
       lambda::_1,
       request.query.get("jsonp"));
 
-  return process::statistics->get("monitor").then(_usage);
+  return ::statistics->get("monitor").then(_usage);
 }
 
 
@@ -250,11 +363,13 @@ Future<http::Response> _usage(
       if (statistics.count(prefix + CPU_USAGE) > 0) {
         usage.values[CPU_USAGE] = statistics.find(prefix + CPU_USAGE)->second;
       }
-      if (statistics.count(prefix + CPU_TIME) > 0) {
-        usage.values[CPU_TIME] = statistics.find(prefix + CPU_TIME)->second;
+      if (statistics.count(prefix + CPUS_TIME_SECS) > 0) {
+        usage.values[CPU_TIME] =
+          statistics.find(prefix + CPUS_TIME_SECS)->second;
       }
-      if (statistics.count(prefix + MEMORY_RSS) > 0) {
-        usage.values[MEMORY_RSS] = statistics.find(prefix + MEMORY_RSS)->second;
+      if (statistics.count(prefix + MEM_RSS_BYTES) > 0) {
+        usage.values[MEMORY_RSS] =
+          statistics.find(prefix + MEM_RSS_BYTES)->second;
       }
 
       JSON::Object entry;

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/062c850a/src/slave/monitor.hpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.hpp b/src/slave/monitor.hpp
index 8770dc0..e5fdcbd 100644
--- a/src/slave/monitor.hpp
+++ b/src/slave/monitor.hpp
@@ -44,6 +44,10 @@ class ResourceMonitorProcess;
 // Provides resource monitoring for executors. Resource usage time
 // series are stored using the Statistics module. Usage information
 // is also exported via a JSON endpoint.
+// TODO(bmahler): Once the deprecated usage.json endpoint is removed,
+// clean this up!! It will be possible to drive collection directly
+// via the http endpoints, and the isolator can return all
+// information in one request.
 // TODO(bmahler): Forward usage information to the master.
 // TODO(bmahler): Consider pulling out the resource collection into
 // a Collector abstraction. The monitor can then become a true
@@ -98,6 +102,7 @@ public:
 protected:
   virtual void initialize()
   {
+    route("/statistics.json", &ResourceMonitorProcess::statisticsJSON);
     route("/usage.json", &ResourceMonitorProcess::usage);
   }
 
@@ -113,6 +118,11 @@ private:
       const ExecutorID& executorId,
       const Duration& interval);
 
+  // Returns the monitoring statistics. Requests have no parameters.
+  process::Future<process::http::Response> statisticsJSON(
+      const process::http::Request& request);
+
+  // TODO(bmahler): Deprecated.
   // Returns the usage information. Requests have no parameters.
   process::Future<process::http::Response> usage(
       const process::http::Request& request);

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/062c850a/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index d4f7b76..b54bf7e 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -296,6 +296,19 @@ void ProcessIsolator::resourcesChanged(
     const Resources& resources)
 {
   CHECK(initialized) << "Cannot do resourcesChanged before initialization!";
+
+  if (!infos.contains(frameworkId) ||
+      !infos[frameworkId].contains(executorId) ||
+      infos[frameworkId][executorId]->killed) {
+    LOG(INFO) << "Asked to update resources for an unknown/killed executor '"
+              << executorId << "' of framework " << frameworkId;
+    return;
+  }
+
+  ProcessInfo* info = CHECK_NOTNULL(infos[frameworkId][executorId]);
+
+  info->resources = resources;
+
   // Do nothing; subclasses may override this.
 }
 
@@ -365,6 +378,17 @@ Future<ResourceStatistics> ProcessIsolator::usage(
 
   result.set_timestamp(Clock::now().secs());
 
+  // Set the resource allocations.
+  Option<Bytes> mem = info->resources.mem();
+  if (mem.isSome()) {
+    result.set_mem_limit_bytes(mem.get().bytes());
+  }
+
+  Option<double> cpus = info->resources.cpus();
+  if (cpus.isSome()) {
+    result.set_cpus_limit(cpus.get());
+  }
+
 #ifdef __linux__
   // Get the page size, used for memory accounting.
   // NOTE: This is more portable than using getpagesize().
@@ -384,9 +408,11 @@ Future<ResourceStatistics> ProcessIsolator::usage(
     return Future<ResourceStatistics>::failed(status.error());
   }
 
-  result.set_memory_rss(status.get().rss * pageSize);
-  result.set_cpu_user_time((double) status.get().utime / (double) ticks);
-  result.set_cpu_system_time((double) status.get().stime / (double) ticks);
+  result.set_mem_rss_bytes(status.get().rss * pageSize);
+  result.set_cpus_user_time_secs(
+      (double) status.get().utime / (double) ticks);
+  result.set_cpus_system_time_secs(
+      (double) status.get().stime / (double) ticks);
 
   // Now aggregate all descendant process usage statistics.
   Try<set<pid_t> > children = proc::children(info->pid.get(), true);
@@ -408,16 +434,15 @@ Future<ResourceStatistics> ProcessIsolator::usage(
       continue;
     }
 
-    result.set_memory_rss(
-        result.memory_rss() +
-        status.get().rss * pageSize);
+    result.set_mem_rss_bytes(
+        result.mem_rss_bytes() + status.get().rss * pageSize);
 
-    result.set_cpu_user_time(
-        result.cpu_user_time() +
+    result.set_cpus_user_time_secs(
+        result.cpus_user_time_secs() +
         (double) status.get().utime / (double) ticks);
 
-    result.set_cpu_system_time(
-        result.cpu_system_time() +
+    result.set_cpus_system_time_secs(
+        result.cpus_system_time_secs() +
         (double) status.get().stime / (double) ticks);
   }
 #elif defined __APPLE__
@@ -441,11 +466,11 @@ Future<ResourceStatistics> ProcessIsolator::usage(
         "Failed to get proc_pidinfo: " + stringify(size));
   }
 
-  result.set_memory_rss(task.pti_resident_size);
+  result.set_mem_rss_bytes(task.pti_resident_size);
 
   // NOTE: CPU Times are in nanoseconds, but this is not documented!
-  result.set_cpu_user_time(Nanoseconds(task.pti_total_user).secs());
-  result.set_cpu_system_time(Nanoseconds(task.pti_total_system).secs());
+  result.set_cpus_user_time_secs(Nanoseconds(task.pti_total_user).secs());
+  result.set_cpus_system_time_secs(Nanoseconds(task.pti_total_system).secs());
 #endif
 
   return result;

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/062c850a/src/slave/process_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.hpp b/src/slave/process_isolator.hpp
index 9875f4a..ee693f6 100644
--- a/src/slave/process_isolator.hpp
+++ b/src/slave/process_isolator.hpp
@@ -101,6 +101,7 @@ private:
     ExecutorID executorId;
     Option<pid_t> pid; // PID of the forked executor process.
     bool killed; // True if "killing" has been initiated via 'killExecutor'.
+    Resources resources; // Resources allocated to the process tree.
   };
 
   // TODO(benh): Make variables const by passing them via constructor.

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/062c850a/src/tests/isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/isolator_tests.cpp b/src/tests/isolator_tests.cpp
index aae8b2f..7013fa2 100644
--- a/src/tests/isolator_tests.cpp
+++ b/src/tests/isolator_tests.cpp
@@ -119,6 +119,12 @@ TYPED_TEST(IsolatorTest, Usage)
   task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
   task.mutable_resources()->MergeFrom(offers.get()[0].resources());
 
+  Resources resources(offers.get()[0].resources());
+  Option<Bytes> mem = resources.mem();
+  ASSERT_SOME(mem);
+  Option<double> cpus = resources.cpus();
+  ASSERT_SOME(cpus);
+
   const std::string& file = path::join(flags.work_dir, "ready");
 
   // This task induces user/system load in a child process by
@@ -172,9 +178,9 @@ TYPED_TEST(IsolatorTest, Usage)
     statistics = usage.get();
 
     // If we meet our usage expectations, we're done!
-    if (statistics.memory_rss() >= 1024u &&
-        statistics.cpu_user_time() >= 0.125 &&
-        statistics.cpu_system_time() >= 0.125) {
+    if (statistics.cpus_user_time_secs() >= 0.125 &&
+        statistics.cpus_system_time_secs() >= 0.125 &&
+        statistics.mem_rss_bytes() >= 1024u) {
       break;
     }
 
@@ -182,9 +188,12 @@ TYPED_TEST(IsolatorTest, Usage)
     waited += Milliseconds(100);
   } while (waited < Seconds(10));
 
-  EXPECT_GE(statistics.memory_rss(), 1024u);
-  EXPECT_GE(statistics.cpu_user_time(), 0.125);
-  EXPECT_GE(statistics.cpu_system_time(), 0.125);
+
+  EXPECT_GE(statistics.cpus_user_time_secs(), 0.125);
+  EXPECT_GE(statistics.cpus_system_time_secs(), 0.125);
+  EXPECT_EQ(statistics.cpus_limit(), cpus.get());
+  EXPECT_GE(statistics.mem_rss_bytes(), 1024u);
+  EXPECT_EQ(statistics.mem_limit_bytes(), mem.get().bytes());
 
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status));

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/062c850a/src/tests/monitor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/monitor_tests.cpp b/src/tests/monitor_tests.cpp
index 53920a0..3142416 100644
--- a/src/tests/monitor_tests.cpp
+++ b/src/tests/monitor_tests.cpp
@@ -72,21 +72,34 @@ TEST(MonitorTest, WatchUnwatch)
   executorInfo.set_name("name");
   executorInfo.set_source("source");
 
+  ResourceStatistics initialStatistics;
+  initialStatistics.set_cpus_user_time_secs(0);
+  initialStatistics.set_cpus_system_time_secs(0);
+  initialStatistics.set_cpus_limit(1.0);
+  initialStatistics.set_mem_rss_bytes(0);
+  initialStatistics.set_mem_limit_bytes(2048);
+  initialStatistics.set_timestamp(Clock::now().secs());
+
   ResourceStatistics statistics;
-  statistics.set_cpu_user_time(5);
-  statistics.set_cpu_system_time(1);
-  statistics.set_memory_rss(1024);
-  statistics.set_timestamp(Clock::now().secs());
+  statistics.set_cpus_user_time_secs(4);
+  statistics.set_cpus_system_time_secs(1);
+  statistics.set_cpus_limit(1.0);
+  statistics.set_mem_rss_bytes(1024);
+  statistics.set_mem_limit_bytes(2048);
+  statistics.set_timestamp(
+      initialStatistics.timestamp() +
+      slave::RESOURCE_MONITORING_INTERVAL.secs());
 
   TestingIsolator isolator;
 
   process::spawn(isolator);
 
-  Future<Nothing> usage;
+  Future<Nothing> usage1, usage2;
   EXPECT_CALL(isolator, usage(frameworkId, executorId))
-    .WillOnce(DoAll(FutureSatisfy(&usage),
+    .WillOnce(DoAll(FutureSatisfy(&usage1),
+                    Return(initialStatistics)))
+    .WillOnce(DoAll(FutureSatisfy(&usage2),
                     Return(statistics)));
-
   slave::ResourceMonitor monitor(&isolator);
 
   // We pause the clock first in order to make sure that we can
@@ -107,7 +120,16 @@ TEST(MonitorTest, WatchUnwatch)
   process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
   process::Clock::settle();
 
-  AWAIT_READY(usage);
+  AWAIT_READY(usage1);
+
+  // Wait until the isolator has finished returning the statistics.
+  process::Clock::settle();
+
+  // The second collection will populate the cpus_usage.
+  process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
+  process::Clock::settle();
+
+  AWAIT_READY(usage2);
 
   // Wait until the isolator has finished returning the statistics.
   process::Clock::settle();
@@ -136,9 +158,42 @@ TEST(MonitorTest, WatchUnwatch)
               "},"
               "\"source\":\"source\""
           "}]",
-          statistics.cpu_user_time() + statistics.cpu_system_time(),
-          statistics.cpu_usage(),
-          statistics.memory_rss()).get(),
+          statistics.cpus_system_time_secs() + statistics.cpus_user_time_secs(),
+          (statistics.cpus_system_time_secs() +
+           statistics.cpus_user_time_secs()) /
+               slave::RESOURCE_MONITORING_INTERVAL.secs(),
+          statistics.mem_rss_bytes()).get(),
+      response);
+
+  response = process::http::get(upid, "statistics.json");
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ(
+      "application/json",
+      "Content-Type",
+      response);
+
+  // TODO(bmahler): Verify metering directly through statistics.
+  AWAIT_EXPECT_RESPONSE_BODY_EQ(
+      strings::format(
+          "[{"
+              "\"executor_id\":\"executor\","
+              "\"executor_name\":\"name\","
+              "\"framework_id\":\"framework\","
+              "\"source\":\"source\","
+              "\"statistics\":{"
+                  "\"cpus_limit\":%g,"
+                  "\"cpus_system_time_secs\":%g,"
+                  "\"cpus_user_time_secs\":%g,"
+                  "\"mem_limit_bytes\":%lu,"
+                  "\"mem_rss_bytes\":%lu"
+              "}"
+          "}]",
+          statistics.cpus_limit(),
+          statistics.cpus_system_time_secs(),
+          statistics.cpus_user_time_secs(),
+          statistics.mem_limit_bytes(),
+          statistics.mem_rss_bytes()).get(),
       response);
 
   // Ensure the monitor stops polling the isolator.


[02/12] git commit: Fixed slave to properly handle terminated tasks that have pending updates.

Posted by vi...@apache.org.
Fixed slave to properly handle terminated tasks that have pending
updates.

Review: https://reviews.apache.org/r/11694


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/22e64506
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/22e64506
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/22e64506

Branch: refs/heads/0.13.x
Commit: 22e64506da12dc29ecd101bf73702c8287811bf2
Parents: 3f809e0
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Jun 5 22:47:57 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri Jun 7 15:19:28 2013 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 116 +++++++++++++++++++++++++++--------------------
 src/slave/slave.hpp |  14 ++++--
 2 files changed, 75 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/22e64506/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8ce1646..840c64d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1041,21 +1041,14 @@ void Slave::killTask(const FrameworkID& frameworkId, const TaskID& taskId)
 
   switch (executor->state) {
     case Executor::REGISTERING: {
-      if (executor->queuedTasks.contains(taskId)) {
-        // We remove the task here so that if this executor registers at
-        // a later point in time it won't be sent this task.
-        LOG(WARNING) << "Removing queued task " << taskId
-                     << " from executor '" << executor->id
-                     << "' of framework " << frameworkId
-                     << " because the executor hasn't registered yet";
-        executor->removeTask(taskId);
-      } else {
-        LOG(WARNING) << "Cannot kill task " << taskId
-                     << " of framework " << frameworkId
-                     << " because the executor '" << executor->id
-                     << "' hasn't registered yet";
-      }
+      LOG(WARNING) << "Removing queued task " << taskId
+                   << " of framework " << frameworkId
+                   << " because the executor '" << executor->id
+                   << "' hasn't registered yet";
 
+      // NOTE: Sending a TASK_KILLED update removes the task from
+      // Executor::queuedTasks, so that if the executor registers at
+      // a later point in time, it won't get this task.
       const StatusUpdate& update = protobuf::createStatusUpdate(
           frameworkId,
           info.id(),
@@ -1376,11 +1369,13 @@ void Slave::_statusUpdateAcknowledgement(
         executor->state == Executor::TERMINATED)
     << executor->state;
 
-  executor->updates.remove(taskId, uuid);
+  if (executor->terminatedTasks.contains(taskId)) {
+    executor->completeTask(taskId);
+  }
 
   // Remove the executor if it has terminated and there are no more
-  // pending updates.
-  if (executor->state == Executor::TERMINATED && executor->updates.empty()) {
+  // incomplete tasks.
+  if (executor->state == Executor::TERMINATED && !executor->incompleteTasks()) {
     remove(framework, executor);
   }
 }
@@ -1594,15 +1589,13 @@ void Slave::reregisterExecutor(
       send(executor->pid, message);
 
       // Handle all the pending updates.
+      // The status update manager might have already checkpointed some
+      // of these pending updates (for example, if the slave died right
+      // after it checkpointed the update but before it could send the
+      // ACK to the executor). This is ok because the status update
+      // manager correctly handles duplicate updates.
       foreach (const StatusUpdate& update, updates) {
-        // The status update manager might have already checkpointed some
-        // of these pending updates (for e.g: if the slave died right
-        // after it checkpointed the update but before it could send the
-        // ACK to the executor). If so, we can just ignore those updates.
-        if (!executor->updates.contains(
-            update.status().task_id(), UUID::fromBytes(update.uuid()))) {
-          statusUpdate(update); // This also updates the executor's resources!
-        }
+        statusUpdate(update); // This also updates the executor's resources!
       }
 
       // Now, if there is any task still in STAGING state and not in
@@ -1720,8 +1713,13 @@ void Slave::statusUpdate(const StatusUpdate& update)
   if (executor == NULL) {
     LOG(WARNING)  << "Could not find the executor for "
                   << "status update " << update;
-    stats.invalidStatusUpdates++;
+    stats.validStatusUpdates++;
 
+    // NOTE: We forward the update here because this update could be
+    // generated by the slave when the executor is unknown to it,
+    // e.g., killTask(), _runTask().
+    // TODO(vinod): Revisit these semantics when we disallow updates
+    // sent by executors that are unknown to the slave.
     statusUpdateManager->update(update, info.id())
       .onAny(defer(self(), &Slave::_statusUpdate, params::_1, update, None()));
 
@@ -1740,11 +1738,10 @@ void Slave::statusUpdate(const StatusUpdate& update)
   stats.validStatusUpdates++;
 
   executor->updateTaskState(status.task_id(), status.state());
-  executor->updates.put(status.task_id(), UUID::fromBytes(update.uuid()));
 
   // Handle the task appropriately if it's terminated.
   if (protobuf::isTerminalState(status.state())) {
-    executor->removeTask(status.task_id());
+    executor->terminateTask(status.task_id(), status.state());
 
     // Tell the isolator to update the resources.
     dispatch(isolator,
@@ -2152,10 +2149,10 @@ void Slave::executorTerminated(
         send(master, message);
       }
 
-      // Remove the executor if either there are no pending updates
-      // or the framework is terminating.
-      if (executor->updates.empty() ||
-          framework->state == Framework::TERMINATING) {
+      // Remove the executor if either the framework is terminating or
+      // there are no incomplete tasks.
+      if (framework->state == Framework::TERMINATING ||
+          !executor->incompleteTasks()) {
         remove(framework, executor);
       }
       break;
@@ -2186,8 +2183,8 @@ void Slave::remove(Framework* framework, Executor* executor)
   // care for pending updates when a framework is terminating
   // because the framework cannot ACK them.
   CHECK(executor->state == Executor::TERMINATED) << executor->state;
-  CHECK (executor->updates.empty() ||
-         framework->state == Framework::TERMINATING);
+  CHECK(framework->state == Framework::TERMINATING ||
+        !executor->incompleteTasks());
 
   // TODO(vinod): Move the responsibility of gc'ing to the
   // Executor struct.
@@ -2789,7 +2786,7 @@ Executor* Framework::getExecutor(const TaskID& taskId)
   foreachvalue (Executor* executor, executors) {
     if (executor->queuedTasks.contains(taskId) ||
         executor->launchedTasks.contains(taskId) ||
-        executor->updates.contains(taskId)) {
+        executor->terminatedTasks.contains(taskId)) {
       return executor;
     }
   }
@@ -2941,23 +2938,36 @@ Task* Executor::addTask(const TaskInfo& task)
 }
 
 
-void Executor::removeTask(const TaskID& taskId)
+void Executor::terminateTask(
+    const TaskID& taskId,
+    const mesos::TaskState& state)
 {
+  Task* task = NULL;
   // Remove the task if it's queued.
-  queuedTasks.erase(taskId);
-
-  // Update the resources if it's been launched.
-  if (launchedTasks.contains(taskId)) {
-    Task* task = launchedTasks[taskId];
+  if (queuedTasks.contains(taskId)) {
+    task = new Task(
+        protobuf::createTask(queuedTasks[taskId], state, id, frameworkId));
+  } else if (launchedTasks.contains(taskId)) {
+    // Update the resources if it's been launched.
+    task = launchedTasks[taskId];
     foreach (const Resource& resource, task->resources()) {
       resources -= resource;
     }
     launchedTasks.erase(taskId);
+  }
 
-    completedTasks.push_back(*task);
+  terminatedTasks[taskId] = CHECK_NOTNULL(task);
+}
 
-    delete task;
-  }
+
+void Executor::completeTask(const TaskID& taskId)
+{
+  CHECK(terminatedTasks.contains(taskId)) << "Unknown task " << taskId;
+
+  Task* task = terminatedTasks[taskId];
+  completedTasks.push_back(*task);
+  terminatedTasks.erase(taskId);
+  delete task;
 }
 
 
@@ -3001,16 +3011,14 @@ void Executor::recoverTask(const TaskState& state)
   // Read updates to get the latest state of the task.
   foreach (const StatusUpdate& update, state.updates) {
     updateTaskState(state.id, update.status().state());
-    updates.put(state.id, UUID::fromBytes(update.uuid()));
 
-    // Remove the task if it received a terminal update.
+    // Terminate the task if it received a terminal update.
     if (protobuf::isTerminalState(update.status().state())) {
-      removeTask(state.id);
+      terminateTask(state.id, update.status().state());
 
-      // If the terminal update has been acknowledged, remove it
-      // from pending tasks.
+      // If the terminal update has been acknowledged, remove it.
       if (state.acks.contains(UUID::fromBytes(update.uuid()))) {
-        updates.remove(state.id, UUID::fromBytes(update.uuid()));
+        completeTask(state.id);
       }
       break;
     }
@@ -3026,6 +3034,14 @@ void Executor::updateTaskState(const TaskID& taskId, mesos::TaskState state)
 }
 
 
+bool Executor::incompleteTasks()
+{
+  return !queuedTasks.empty() ||
+         !launchedTasks.empty() ||
+         !terminatedTasks.empty();
+}
+
+
 std::ostream& operator << (std::ostream& stream, Framework::State state) {
   switch (state) {
     case Framework::RUNNING:     return stream << "RUNNING";

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/22e64506/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 26dc96e..5aba7ed 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -335,11 +335,15 @@ struct Executor
   ~Executor();
 
   Task* addTask(const TaskInfo& task);
-  void removeTask(const TaskID& taskId);
+  void terminateTask(const TaskID& taskId, const mesos::TaskState& state);
+  void completeTask(const TaskID& taskId);
   void checkpointTask(const TaskInfo& task);
   void recoverTask(const state::TaskState& state);
   void updateTaskState(const TaskID& taskId, TaskState state);
 
+  // Returns true if there are any queued/launched/terminated tasks.
+  bool incompleteTasks();
+
   enum State {
     REGISTERING,  // Executor is launched but not (re-)registered yet.
     RUNNING,      // Executor has (re-)registered.
@@ -367,13 +371,13 @@ struct Executor
 
   Resources resources; // Currently consumed resources.
 
-  hashmap<TaskID, TaskInfo> queuedTasks;
-  hashmap<TaskID, Task*> launchedTasks;
+  hashmap<TaskID, TaskInfo> queuedTasks; // Not yet launched.
+  hashmap<TaskID, Task*> launchedTasks;  // Running.
+  hashmap<TaskID, Task*> terminatedTasks; // Terminated but pending updates.
+  boost::circular_buffer<Task> completedTasks; // Terminated and updates acked.
 
   multihashmap<TaskID, UUID> updates; // Pending updates.
 
-  boost::circular_buffer<Task> completedTasks;
-
 private:
   Executor(const Executor&);              // No copying.
   Executor& operator = (const Executor&); // No assigning.


[05/12] git commit: Fixed Zookeeper to recursively create parent paths as necessary.

Posted by vi...@apache.org.
Fixed Zookeeper to recursively create parent paths as necessary.

Review: https://reviews.apache.org/r/11366


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/9029967d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/9029967d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/9029967d

Branch: refs/heads/0.13.x
Commit: 9029967d96c0f2cc09a2b09ab3d0a8d0ab4005b7
Parents: cc5e1e7
Author: Vinod Kone <vi...@twitter.com>
Authored: Thu May 23 23:26:31 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri Jun 7 15:19:28 2013 -0700

----------------------------------------------------------------------
 src/tests/zookeeper_tests.cpp | 12 ++++-----
 src/zookeeper/zookeeper.cpp   | 51 +++++++++++++++++++-------------------
 src/zookeeper/zookeeper.hpp   |  2 +-
 3 files changed, 33 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/9029967d/src/tests/zookeeper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/zookeeper_tests.cpp b/src/tests/zookeeper_tests.cpp
index 77a5ab2..16c5fb7 100644
--- a/src/tests/zookeeper_tests.cpp
+++ b/src/tests/zookeeper_tests.cpp
@@ -82,12 +82,12 @@ TEST_F(ZooKeeperTest, Create)
   ZooKeeper nonOwnerZk(server->connectString(), NO_TIMEOUT, &watcher);
   watcher.awaitSessionEvent(ZOO_CONNECTED_STATE);
   nonOwnerZk.authenticate("digest", "non-owner:non-owner");
-  EXPECT_EQ(ZNOAUTH, nonOwnerZk.create("/foo/bar/baz",
-                                       "",
-                                       zookeeper::EVERYONE_READ_CREATOR_ALL,
-                                       0,
-                                       NULL,
-                                       true));
+  EXPECT_EQ(ZNODEEXISTS, nonOwnerZk.create("/foo/bar/baz",
+                                           "",
+                                           zookeeper::EVERYONE_READ_CREATOR_ALL,
+                                           0,
+                                           NULL,
+                                           true));
   EXPECT_EQ(ZOK, nonOwnerZk.create("/foo/bar/baz/bam",
                                    "44",
                                    zookeeper::EVERYONE_READ_CREATOR_ALL,

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/9029967d/src/zookeeper/zookeeper.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/zookeeper.cpp b/src/zookeeper/zookeeper.cpp
index 267c38a..de33003 100644
--- a/src/zookeeper/zookeeper.cpp
+++ b/src/zookeeper/zookeeper.cpp
@@ -517,41 +517,42 @@ int ZooKeeper::authenticate(const string& scheme, const string& credentials)
 }
 
 
-int ZooKeeper::create(const string& path, const string& data,
-                      const ACL_vector& acl, int flags, string* result,
-                      bool recursive)
+int ZooKeeper::create(
+    const string& path,
+    const string& data,
+    const ACL_vector& acl,
+    int flags,
+    string* result,
+    bool recursive)
 {
   if (!recursive) {
     return impl->create(path, data, acl, flags, result).get();
   }
 
-  // Do "recursive" create, i.e., ensure intermediate znodes exist.
-  string prefix = "/";
-  int code = ZOK;
-  foreach (const string& token, strings::tokenize(path, "/")) {
-    prefix = path::join(prefix, token);
-
-    // Make sure we include 'flags' and 'data' for the final znode.
-    if (prefix == path || (prefix + "/") == path) {
-      code = impl->create(path, data, acl, flags, result).get();
-    } else {
-      code = impl->create(prefix, "", acl, 0, result).get();
-    }
+  // First check if the path exists.
+  int code = impl->exists(path, false, NULL).get();
+  if (code == ZOK) {
+    return ZNODEEXISTS;
+  }
 
-    // We fail all non-OK return codes except for:
-    // ZNODEEXISTS says the node in the znode path we are trying to
-    //   create already exists - this is what we wanted, so we
-    //   continue.
-    // ZNOAUTH says we can't write the node, but it doesn't tell us
-    //   whether the node already exists. We take the optimistic
-    //   approach and assume the node's parent doesn't allow us to
-    //   write an already existing node (but it exists).
-    if (code != ZOK && code != ZNODEEXISTS && code != ZNOAUTH) {
+  // Now recursively create the parent path.
+  // NOTE: We don't use 'dirname()' to get the parent path here
+  // because, it doesn't return the expected path when a path ends
+  // with "/". For example, to create path "/a/b/", we want to
+  // recursively create "/a/b", instead of just creating "/a".
+  const string& parent = path.substr(0, path.find_last_of("/"));
+  if (!parent.empty()) {
+    code = create(parent, "", acl, 0, result, true);
+    if (code != ZOK && code != ZNODEEXISTS) {
       return code;
     }
   }
 
-  return code;
+  // Finally create the path.
+  // TODO(vinod): Delete any intermediate nodes created if this fails.
+  // This requires synchronization because the deletion might affect
+  // other callers (different threads/processes) acting on this path.
+  return impl->create(path, data, acl, flags, result).get();
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/9029967d/src/zookeeper/zookeeper.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/zookeeper.hpp b/src/zookeeper/zookeeper.hpp
index 99e689e..7243543 100644
--- a/src/zookeeper/zookeeper.hpp
+++ b/src/zookeeper/zookeeper.hpp
@@ -184,7 +184,7 @@ public:
 	     const ACL_vector &acl,
 	     int flags,
 	     std::string *result,
-             bool recursive = false);
+	     bool recursive = false);
 
   /**
    * \brief delete a node in zookeeper synchronously.


[12/12] git commit: Fixed slave to properly handle duplicate terminal updates for the same task.

Posted by vi...@apache.org.
Fixed slave to properly handle duplicate terminal updates for the
same task.

Review: https://reviews.apache.org/r/11798


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/d5f1d11b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/d5f1d11b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/d5f1d11b

Branch: refs/heads/master
Commit: d5f1d11b90aa83fb5df0a84c07c1ccd2bccf86a6
Parents: 46644b6
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Jun 10 22:44:42 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Jun 12 15:52:58 2013 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp                       |  25 ++-
 src/tests/status_update_manager_tests.cpp | 202 +++++++++++++++++++++++++
 2 files changed, 220 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/d5f1d11b/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index b5b7e0e..fd96de9 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1687,6 +1687,8 @@ void Slave::reregisterExecutorTimeout()
 // 2) When slave generates task updates (e.g LOST/KILLED/FAILED).
 void Slave::statusUpdate(const StatusUpdate& update)
 {
+  LOG(INFO) << "Handling status update " << update;
+
   CHECK(state == RECOVERING || state == DISCONNECTED ||
         state == RUNNING || state == TERMINATING)
     << state;
@@ -1724,7 +1726,8 @@ void Slave::statusUpdate(const StatusUpdate& update)
     // generated by the slave when the executor is unknown to it,
     // e.g., killTask(), _runTask().
     // TODO(vinod): Revisit these semantics when we disallow updates
-    // sent by executors that are unknown to the slave.
+    // (e.g., when slave recovery is always enabled) sent by executors
+    // that are unknown to the slave.
     statusUpdateManager->update(update, info.id())
       .onAny(defer(self(), &Slave::_statusUpdate, params::_1, update, None()));
 
@@ -1737,15 +1740,17 @@ void Slave::statusUpdate(const StatusUpdate& update)
         executor->state == Executor::TERMINATED)
     << executor->state;
 
-  VLOG(1) << "Handling status update " << update;
-
   stats.tasks[update.status().state()]++;
   stats.validStatusUpdates++;
 
   executor->updateTaskState(status.task_id(), status.state());
 
-  // Handle the task appropriately if it's terminated.
-  if (protobuf::isTerminalState(status.state())) {
+  // Handle the task appropriately if it is terminated.
+  // TODO(vinod): Revisit these semantics when we disallow duplicate
+  // terminal updates (e.g., when slave recovery is always enabled).
+  if (protobuf::isTerminalState(status.state()) &&
+      (executor->queuedTasks.contains(status.task_id()) ||
+       executor->launchedTasks.contains(status.task_id()))) {
     executor->terminateTask(status.task_id(), status.state());
 
     // Tell the isolator to update the resources.
@@ -2967,7 +2972,8 @@ void Executor::terminateTask(
 
 void Executor::completeTask(const TaskID& taskId)
 {
-  CHECK(terminatedTasks.contains(taskId)) << "Unknown task " << taskId;
+  CHECK(terminatedTasks.contains(taskId))
+    << "Failed to find terminated task " << taskId;
 
   Task* task = terminatedTasks[taskId];
   completedTasks.push_back(*task);
@@ -3018,7 +3024,12 @@ void Executor::recoverTask(const TaskState& state)
     updateTaskState(state.id, update.status().state());
 
     // Terminate the task if it received a terminal update.
-    if (protobuf::isTerminalState(update.status().state())) {
+    // We ignore duplicate terminal updates by checking if
+    // the task is present in launchedTasks.
+    // TODO(vinod): Revisit these semantics when we disallow duplicate
+    // terminal updates (e.g., when slave recovery is always enabled).
+    if (protobuf::isTerminalState(update.status().state()) &&
+        launchedTasks.contains(state.id)) {
       terminateTask(state.id, update.status().state());
 
       // If the terminal update has been acknowledged, remove it.

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/d5f1d11b/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 61ccfcc..4239532 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -464,3 +464,205 @@ TEST_F(StatusUpdateManagerTest, IgnoreUnexpectedStatusUpdateAck)
 
   Shutdown();
 }
+
+
+// This test verifies that the slave and status update manager
+// properly handle duplicate terminal status updates, when the
+// second update is received before the ACK for the first update.
+// The proper behavior here is for the status update manager to
+// drop the duplicate update.
+TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.checkpoint = true;
+
+  Try<PID<Slave> > slave = StartSlave(&exec, flags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+  frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  // Send a terminal update right away.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  // Drop the first ACK from the scheduler to the slave.
+  Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage =
+    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get());
+
+  Clock::pause();
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_FINISHED, status.get().state());
+
+  AWAIT_READY(statusUpdateAcknowledgementMessage);
+
+  Future<Nothing> _statusUpdate =
+    FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdate);
+
+  // Now send a TASK_KILLED update for the same task.
+  TaskStatus status2 = status.get();
+  status2.set_state(TASK_KILLED);
+  execDriver->sendStatusUpdate(status2);
+
+  // At this point the status update manager has enqueued
+  // TASK_FINISHED and TASK_KILLED updates.
+  AWAIT_READY(_statusUpdate);
+
+  // After we advance the clock, the scheduler should receive
+  // the retried TASK_FINISHED update and acknowledge it. The
+  // TASK_KILLED update should be dropped by the status update
+  // manager, as the stream is already terminated.
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
+  Clock::settle();
+
+  // Ensure the scheduler receives TASK_FINISHED.
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_FINISHED, update.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies that the slave and status update manager
+// properly handle duplicate terminal status updates, when the
+// second update is received after the ACK for the first update.
+// The proper behavior here is for the status update manager to
+// forward the duplicate update to the scheduler.
+TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateAfterAck)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.checkpoint = true;
+
+  Try<PID<Slave> > slave = StartSlave(&exec, flags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+  frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  // Send a terminal update right away.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Future<Nothing> _statusUpdateAcknowledgement =
+    FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_FINISHED, status.get().state());
+
+  AWAIT_READY(_statusUpdateAcknowledgement);
+
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
+    FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
+
+  Clock::pause();
+
+  // Now send a TASK_KILLED update for the same task.
+  TaskStatus status2 = status.get();
+  status2.set_state(TASK_KILLED);
+  execDriver->sendStatusUpdate(status2);
+
+  // Ensure the scheduler receives TASK_KILLED.
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_KILLED, update.get().state());
+
+  // Ensure the slave properly handles the ACK.
+  // Clock::settle() ensures that the slave successfully
+  // executes Slave::_statusUpdateAcknowledgement().
+  AWAIT_READY(_statusUpdateAcknowledgement2);
+  Clock::settle();
+
+  Clock::resume();
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}


[11/12] git commit: Updated CHANGELOG for 0.13.0 (rc2).

Posted by vi...@apache.org.
Updated CHANGELOG for 0.13.0 (rc2).


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/90bcfe57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/90bcfe57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/90bcfe57

Branch: refs/heads/master
Commit: 90bcfe5755a7d1b974dc7ed0fe916cd4c44fe684
Parents: d5f1d11
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Jun 11 12:07:13 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Jun 12 15:52:58 2013 -0700

----------------------------------------------------------------------
 CHANGELOG | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/90bcfe57/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index b070e22..c262c74 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -58,6 +58,7 @@ Release Notes - Mesos - Version 0.13.0
     * [MESOS-481] -  Slave needs to only inform the master about non-terminal executors, for proper resource accounting
     * [MESOS-482] - Status update manager should not cleanup the stream when there are pending updates, even though it received an ACK for a terminal update
     * [MESOS-484] - Latest ZooKeeperState.cpp doesn't compile on Mountain Lion
+    * [MESOS-502] - Slave crashes when handling duplicate terminal updates
 
 ** Improvement
     * [MESOS-46] - Refactor MasterTest to use fixture


[09/12] git commit: Fixed slave to properly handle duplicate terminal updates for the same task.

Posted by vi...@apache.org.
Fixed slave to properly handle duplicate terminal updates for the
same task.

Review: https://reviews.apache.org/r/11798


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/cfb83a4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/cfb83a4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/cfb83a4f

Branch: refs/heads/0.13.x
Commit: cfb83a4f98c67c3fd74ab4375f474989763d0b2e
Parents: bc2a09d
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Jun 10 22:44:42 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 11 12:01:44 2013 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp                       |  25 ++-
 src/tests/status_update_manager_tests.cpp | 202 +++++++++++++++++++++++++
 2 files changed, 220 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/cfb83a4f/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index b5b7e0e..fd96de9 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1687,6 +1687,8 @@ void Slave::reregisterExecutorTimeout()
 // 2) When slave generates task updates (e.g LOST/KILLED/FAILED).
 void Slave::statusUpdate(const StatusUpdate& update)
 {
+  LOG(INFO) << "Handling status update " << update;
+
   CHECK(state == RECOVERING || state == DISCONNECTED ||
         state == RUNNING || state == TERMINATING)
     << state;
@@ -1724,7 +1726,8 @@ void Slave::statusUpdate(const StatusUpdate& update)
     // generated by the slave when the executor is unknown to it,
     // e.g., killTask(), _runTask().
     // TODO(vinod): Revisit these semantics when we disallow updates
-    // sent by executors that are unknown to the slave.
+    // (e.g., when slave recovery is always enabled) sent by executors
+    // that are unknown to the slave.
     statusUpdateManager->update(update, info.id())
       .onAny(defer(self(), &Slave::_statusUpdate, params::_1, update, None()));
 
@@ -1737,15 +1740,17 @@ void Slave::statusUpdate(const StatusUpdate& update)
         executor->state == Executor::TERMINATED)
     << executor->state;
 
-  VLOG(1) << "Handling status update " << update;
-
   stats.tasks[update.status().state()]++;
   stats.validStatusUpdates++;
 
   executor->updateTaskState(status.task_id(), status.state());
 
-  // Handle the task appropriately if it's terminated.
-  if (protobuf::isTerminalState(status.state())) {
+  // Handle the task appropriately if it is terminated.
+  // TODO(vinod): Revisit these semantics when we disallow duplicate
+  // terminal updates (e.g., when slave recovery is always enabled).
+  if (protobuf::isTerminalState(status.state()) &&
+      (executor->queuedTasks.contains(status.task_id()) ||
+       executor->launchedTasks.contains(status.task_id()))) {
     executor->terminateTask(status.task_id(), status.state());
 
     // Tell the isolator to update the resources.
@@ -2967,7 +2972,8 @@ void Executor::terminateTask(
 
 void Executor::completeTask(const TaskID& taskId)
 {
-  CHECK(terminatedTasks.contains(taskId)) << "Unknown task " << taskId;
+  CHECK(terminatedTasks.contains(taskId))
+    << "Failed to find terminated task " << taskId;
 
   Task* task = terminatedTasks[taskId];
   completedTasks.push_back(*task);
@@ -3018,7 +3024,12 @@ void Executor::recoverTask(const TaskState& state)
     updateTaskState(state.id, update.status().state());
 
     // Terminate the task if it received a terminal update.
-    if (protobuf::isTerminalState(update.status().state())) {
+    // We ignore duplicate terminal updates by checking if
+    // the task is present in launchedTasks.
+    // TODO(vinod): Revisit these semantics when we disallow duplicate
+    // terminal updates (e.g., when slave recovery is always enabled).
+    if (protobuf::isTerminalState(update.status().state()) &&
+        launchedTasks.contains(state.id)) {
       terminateTask(state.id, update.status().state());
 
       // If the terminal update has been acknowledged, remove it.

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/cfb83a4f/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 61ccfcc..4239532 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -464,3 +464,205 @@ TEST_F(StatusUpdateManagerTest, IgnoreUnexpectedStatusUpdateAck)
 
   Shutdown();
 }
+
+
+// This test verifies that the slave and status update manager
+// properly handle duplicate terminal status updates, when the
+// second update is received before the ACK for the first update.
+// The proper behavior here is for the status update manager to
+// drop the duplicate update.
+TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.checkpoint = true;
+
+  Try<PID<Slave> > slave = StartSlave(&exec, flags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+  frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  // Send a terminal update right away.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  // Drop the first ACK from the scheduler to the slave.
+  Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage =
+    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get());
+
+  Clock::pause();
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_FINISHED, status.get().state());
+
+  AWAIT_READY(statusUpdateAcknowledgementMessage);
+
+  Future<Nothing> _statusUpdate =
+    FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdate);
+
+  // Now send a TASK_KILLED update for the same task.
+  TaskStatus status2 = status.get();
+  status2.set_state(TASK_KILLED);
+  execDriver->sendStatusUpdate(status2);
+
+  // At this point the status update manager has enqueued
+  // TASK_FINISHED and TASK_KILLED updates.
+  AWAIT_READY(_statusUpdate);
+
+  // After we advance the clock, the scheduler should receive
+  // the retried TASK_FINISHED update and acknowledge it. The
+  // TASK_KILLED update should be dropped by the status update
+  // manager, as the stream is already terminated.
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
+  Clock::settle();
+
+  // Ensure the scheduler receives TASK_FINISHED.
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_FINISHED, update.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies that the slave and status update manager
+// properly handle duplicate terminal status updates, when the
+// second update is received after the ACK for the first update.
+// The proper behavior here is for the status update manager to
+// forward the duplicate update to the scheduler.
+TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateAfterAck)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.checkpoint = true;
+
+  Try<PID<Slave> > slave = StartSlave(&exec, flags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+  frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  // Send a terminal update right away.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Future<Nothing> _statusUpdateAcknowledgement =
+    FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_FINISHED, status.get().state());
+
+  AWAIT_READY(_statusUpdateAcknowledgement);
+
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
+    FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
+
+  Clock::pause();
+
+  // Now send a TASK_KILLED update for the same task.
+  TaskStatus status2 = status.get();
+  status2.set_state(TASK_KILLED);
+  execDriver->sendStatusUpdate(status2);
+
+  // Ensure the scheduler receives TASK_KILLED.
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_KILLED, update.get().state());
+
+  // Ensure the slave properly handles the ACK.
+  // Clock::settle() ensures that the slave successfully
+  // executes Slave::_statusUpdateAcknowledgement().
+  AWAIT_READY(_statusUpdateAcknowledgement2);
+  Clock::settle();
+
+  Clock::resume();
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}


[06/12] git commit: Exposed version in "/vars" and "/state.json" endpoints.

Posted by vi...@apache.org.
Exposed version in "/vars" and "/state.json" endpoints.

Review: https://reviews.apache.org/r/11642


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/e029cf50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/e029cf50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/e029cf50

Branch: refs/heads/0.13.x
Commit: e029cf507fa970b331bb95f6e2946e0e1b676997
Parents: 9029967
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Jun 4 15:05:42 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri Jun 7 15:19:28 2013 -0700

----------------------------------------------------------------------
 src/master/http.cpp | 10 +++++++---
 src/slave/http.cpp  | 10 +++++++---
 2 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/e029cf50/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 7e105a4..47471d6 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -22,6 +22,8 @@
 #include <string>
 #include <vector>
 
+#include <mesos/mesos.hpp>
+
 #include <stout/foreach.hpp>
 #include <stout/json.hpp>
 #include <stout/net.hpp>
@@ -225,9 +227,10 @@ Future<Response> vars(
   std::ostringstream out;
 
   out <<
-    "build_date " << build::DATE << "\n" <<
-    "build_user " << build::USER << "\n" <<
-    "build_flags " << build::FLAGS << "\n";
+    "version: " << MESOS_VERSION << "\n" <<
+    "build_date: " << build::DATE << "\n" <<
+    "build_user: " << build::USER << "\n" <<
+    "build_flags: " << build::FLAGS << "\n";
 
   // TODO(benh): Output flags.
   return OK(out.str(), request.query.get("jsonp"));
@@ -311,6 +314,7 @@ Future<Response> state(
   VLOG(1) << "HTTP request for '" << request.path << "'";
 
   JSON::Object object;
+  object.values["version"] = MESOS_VERSION;
   object.values["build_date"] = build::DATE;
   object.values["build_time"] = build::TIME;
   object.values["build_user"] = build::USER;

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/e029cf50/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 67877c9..e362c30 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -21,6 +21,8 @@
 #include <string>
 #include <vector>
 
+#include <mesos/mesos.hpp>
+
 #include <stout/foreach.hpp>
 #include <stout/json.hpp>
 #include <stout/net.hpp>
@@ -261,9 +263,10 @@ Future<Response> vars(
   std::ostringstream out;
 
   out <<
-    "build_date " << build::DATE << "\n" <<
-    "build_user " << build::USER << "\n" <<
-    "build_flags " << build::FLAGS << "\n";
+    "version: " << MESOS_VERSION << "\n" <<
+    "build_date: " << build::DATE << "\n" <<
+    "build_user: " << build::USER << "\n" <<
+    "build_flags: " << build::FLAGS << "\n";
 
   // TODO(benh): Output flags.
   return OK(out.str(), request.query.get("jsonp"));
@@ -301,6 +304,7 @@ Future<Response> state(
   LOG(INFO) << "HTTP request for '" << request.path << "'";
 
   JSON::Object object;
+  object.values["version"] = MESOS_VERSION;
   object.values["build_date"] = build::DATE;
   object.values["build_time"] = build::TIME;
   object.values["build_user"] = build::USER;


[04/12] git commit: Fixed slave to not send tasks and executor info of a terminated executor, when re-registering with master.

Posted by vi...@apache.org.
Fixed slave to not send tasks and executor info of a terminated executor,
when re-registering with master.

Review: https://reviews.apache.org/r/11543


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/5912aa2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/5912aa2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/5912aa2a

Branch: refs/heads/0.13.x
Commit: 5912aa2a127b2af1135396d332388915d7670494
Parents: e029cf5
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed May 29 18:14:01 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri Jun 7 15:19:28 2013 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp                 |  6 +++
 src/tests/fault_tolerance_tests.cpp | 78 ++++++++++++++++++++++++++++++++
 2 files changed, 84 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/5912aa2a/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e905ab3..8ce1646 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -679,6 +679,12 @@ void Slave::doReliableRegistration()
 
     foreachvalue (Framework* framework, frameworks){
       foreachvalue (Executor* executor, framework->executors) {
+        // Ignore terminated executors because they do not consume
+        // any resources.
+        if (executor->state == Executor::TERMINATED) {
+          continue;
+        }
+
         // TODO(benh): Kill this once framework_id is required
         // on ExecutorInfo.
         ExecutorInfo* executorInfo = message.add_executor_infos();

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/5912aa2a/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index c8fad7c..ef570b7 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -1297,6 +1297,84 @@ TEST_F(FaultToleranceTest, SlaveReregisterOnZKExpiration)
 }
 
 
+// This test verifies that a re-registering slave does not inform
+// the master about a terminated executor (and its tasks), when the
+// executor has pending updates. We check this by ensuring that the
+// master sends a TASK_LOST update for the task belonging to the
+// terminated executor.
+TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestingIsolator isolator(&exec);
+
+  Try<PID<Slave> > slave = StartSlave(&isolator);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(LaunchTasks(1, 1, 512))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.start();
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  // Drop the TASK_LOST status update(s) sent to the master.
+  // This ensures that the TASK_LOST received by the scheduler
+  // is generated by the master.
+  DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get());
+
+  Future<ExitedExecutorMessage> executorExitedMessage =
+    FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _);
+
+  // Now kill the executor.
+  dispatch(isolator,
+           &Isolator::killExecutor,
+           frameworkId.get(),
+           DEFAULT_EXECUTOR_ID);
+
+  AWAIT_READY(executorExitedMessage);
+
+  // Simulate a spurious newMasterDetected event (e.g., due to ZooKeeper
+  // expiration) at the slave to force re-registration.
+  Future<TaskStatus> status2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status2));
+
+  NewMasterDetectedMessage message;
+  message.set_pid(master.get());
+
+  process::post(slave.get(), message);
+
+  AWAIT_READY(status2);
+  EXPECT_EQ(TASK_LOST, status2.get().state());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
 // This test verifies that the master sends TASK_LOST updates
 // for tasks in the master absent from the re-registered slave.
 // We do this by dropping RunTaskMessage from master to the slave.


[08/12] git commit: Updated CHANGELOG for 0.13.0.

Posted by vi...@apache.org.
Updated CHANGELOG for 0.13.0.

Conflicts:
	CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/bc2a09d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/bc2a09d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/bc2a09d1

Branch: refs/heads/0.13.x
Commit: bc2a09d11edd34a66d438badb17700e5728b40df
Parents: 062c850
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Jun 10 14:47:35 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 11 12:01:23 2013 -0700

----------------------------------------------------------------------
 CHANGELOG | 159 ++++++++++++++++++++++++++++++++++++++++++++++-----------
 1 file changed, 130 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/bc2a09d1/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 844696e..b070e22 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,3 +1,115 @@
+Release Notes - Mesos - Version 0.13.0
+--------------------------------------
+* The primary feature in this release is "Slave Recovery". This enables a slave to reconnect
+  with old running executors after it restarts (e.g., after an upgrade).
+* This release also includes a major refactor of the internal testing infrastructure.
+* There are also several bug fixes and stability improvements (esp. around ZooKeeper).
+
+** Bug
+    * [MESOS-77] - ExceptionTest.AbortOnFrameworkError sometimes hangs if Mesos built without optimizations
+    * [MESOS-201] - CppFramework test occasionally fails
+    * [MESOS-217] - LOST tasks are incorrectly reconciled between mesos and framework
+    * [MESOS-232] - Unit test CoordinatorTest.Elect triggers non-deterministic assertion failure in libprocess.
+    * [MESOS-276] - SIGSEV with current trunk and OpenJDK 7u3
+    * [MESOS-277] - Java test framework test is flaky
+    * [MESOS-289] - Zookeeper tests are flaky
+    * [MESOS-301] - Coordinator test is flaky
+    * [MESOS-318] - os::memory does not consider sysinfo.mem_unit
+    * [MESOS-321] - libprocess http::encode fails test
+    * [MESOS-344] - --disable-java still uses java headers during make check
+    * [MESOS-353] - ZooKeepet state test GetSetGet hung
+    * [MESOS-362] - Inconsistent slave maps in the master.
+    * [MESOS-365] - Slave should reject tasks before registering with the master.
+    * [MESOS-366] - Master check failure during load tests.
+    * [MESOS-368] - HTTP.Endpoints test is flaky.
+    * [MESOS-369] - Mesos tests spitting out error messages.
+    * [MESOS-379] - Zookeeper MasterDetectorExpireSlaveZKSessionNewMaster test is flaky
+    * [MESOS-385] - MasterTest.TaskRunning flaky on Jenkins.
+    * [MESOS-392] - FaultTolerance SchedulerExit test hangs
+    * [MESOS-393] - Forking at an unlucky time on OS X can cause the C++ library to deadlock.
+    * [MESOS-394] - Don't do ExecutorLauncher in forked process but exec first instead.
+    * [MESOS-395] - FaultToleranceTest.SchedulerFailoverFrameworkMessage test is flaky.
+    * [MESOS-399] - MonitorTest.WatchUnwatch failed.
+    * [MESOS-400] - Example Java framework test is flaky
+    * [MESOS-401] - SlaveRecoveryTest/0.RecoverTerminatedExecutor is flaky on OSX.
+    * [MESOS-402] - CoordinatorTest.TruncateNotLearnedFill test is flaky
+    * [MESOS-403] - CoordinatorTest.TruncateLearnedFill test is flaky
+    * [MESOS-405] - SlaveRecoveryTest/1.ReconnectExecutor crashes.
+    * [MESOS-406] - Google mock throws a segfault when invoked by TestFilter
+    * [MESOS-407] - Google test filter processing is incorrect for the empty string.
+    * [MESOS-408] - FaultToleranceTest.SlavePartitioned is flaky
+    * [MESOS-412] - MasterTest.ShutdownUnregisteredExecutor flaky
+    * [MESOS-423] - A slave asked to shutdown should not re-register with a new slave id
+    * [MESOS-424] - CgroupsIsolatorTest.BalloonFramework runs forever
+    * [MESOS-436] - FaultToleranceTest.SchedulerFailover test is flaky
+    * [MESOS-437] - ResourceOffersTest.ResourceOfferWithMultipleSlaves is flaky
+    * [MESOS-440] - Allow for headroom in the GC algorithm.
+    * [MESOS-441] - AllocatorZooKeeperTest/0.FrameworkReregistersFirst is flaky
+    * [MESOS-446] - Master should shutdown slaves that were deactivated
+    * [MESOS-447] - Master should send TASK_LOST updates for unknown tasks when slave reregisters
+    * [MESOS-453] - AllocatorZookeeper tests are using /tmp/mesos work directory
+    * [MESOS-454] - ResourceOffers tests are using /tmp/mesos working directory
+    * [MESOS-462] - Resource usage collection failure messages have '1' as the failure message.
+    * [MESOS-466] - Master should always send a re-registered message to framework when the scheduler driver sends a re-register
+    * [MESOS-467] - AllocatorTest.FrameworkExited is flaky
+    * [MESOS-469] - Scheduler driver should call disconnected on master failover
+    * [MESOS-474] - Mesos 0.10.0: make check fails on Ubuntu 12.04LTS 
+    * [MESOS-476] - Upgrade libev to 4.15
+    * [MESOS-481] -  Slave needs to only inform the master about non-terminal executors, for proper resource accounting
+    * [MESOS-482] - Status update manager should not cleanup the stream when there are pending updates, even though it received an ACK for a terminal update
+    * [MESOS-484] - Latest ZooKeeperState.cpp doesn't compile on Mountain Lion
+
+** Improvement
+    * [MESOS-46] - Refactor MasterTest to use fixture
+    * [MESOS-134] - Add Python documentation
+    * [MESOS-140] - Unrecognized command line args should fail the process
+    * [MESOS-242] - Add more tests to Dominant Share Allocator
+    * [MESOS-305] - Inform the frameworks / slaves about a master failover
+    * [MESOS-409] - Master detector code should stat nodes before attempting to create
+    * [MESOS-472] - Separate ResourceStatistics::cpu_time into ResourceStatistics::cpu_user_time and ResourceStatistics::cpu_system_time.
+    * [MESOS-493] - Expose version information in http endpoints
+
+** New Feature
+    * [MESOS-110] - Mesos deploys should not restart tasks
+    * [MESOS-169] - Ability to for tests to catch in-flight messages that are dispatched
+
+
+Release Notes - Mesos - Version 0.12.0
+--------------------------------------
+* This release includes bug fixes and stability improvements.
+* The primary feature in this release is executor resource consumption monitoring. Slaves now monitor resource consumption of running executors and expose it over JSON and through the webui.
+* This release also includes a new and improved Hadoop framework. The new port doesn't require patching Hadoop (i.e., it's a self contained Hadoop contrib) and lets you use existing schedulers (e.g., the fair scheduler or capacity scheduler)! The tradeoff, however, is that it doesn't take as much advantage of the fine-grained nature of a Mesos task (i.e., there is no longer a 1-1 mapping between a Mesos task and a map/reduce task).
+
+** Sub-task
+    * [MESOS-214] - Report resources being used by executors
+    * [MESOS-419] - Old slave directories should not be garbage collected based on file modification time
+
+** Bug
+    * [MESOS-107] - Scheduler library should not acknowledge a status update if the driver has been aborted.
+    * [MESOS-152] - Slave should forward status updates for unknown tasks
+    * [MESOS-285] - configure.macosx checks for version "10.7" but should check for 10.7 or greater
+    * [MESOS-307] - Web UI file download links are broken.
+    * [MESOS-317] - python mesos core bindings rejects framework messages with null bytes
+    * [MESOS-319] - Fix buggy read / write calls.
+    * [MESOS-325] - make clean is broken
+    * [MESOS-332] - Executor launcher fetches resources as slave user, instead of executor user.
+    * [MESOS-340] - Gperftools target always rebuilds.
+    * [MESOS-374] - HTTP GET requests to /statistics/snapshot.json crash the slave
+    * [MESOS-422] - Master leader election should be more robust to stale ephemeral nodes
+    * [MESOS-486] - TaskInfo should include a 'source' in order to enable getting resource monitoring statistics.
+
+** Improvement
+    * [MESOS-293] - Make clean deletes checked in files.
+
+** New Feature
+    * [MESOS-99] - display slave resource usage information in the slave webui
+
+** Task
+    * [MESOS-274] - Unicode / Binary files over http endpoints.
+    * [MESOS-324] - Monitor executor resource usage.
+    * [MESOS-331] - Add --disable-perftools to configure.
+
+
 Release Notes - Mesos - Version 0.11.0
 --------------------------------------
 ** Brainstorming
@@ -6,14 +118,27 @@ Release Notes - Mesos - Version 0.11.0
 
 ** Bug
     * [MESOS-260] - Implement a duration abstraction
+    * [MESOS-261] - bootstrap fails when automake version >= 1.12
+    * [MESOS-263] - Complete the new webui (slave, framework, executor pages)
+    * [MESOS-264] - Make fails on the latest ubuntu
+    * [MESOS-270] - Log viewing broken on mesos-local runs
     * [MESOS-364] - cgroup tests fail on Ubuntu
     * [MESOS-386] - AllocatorTest/0.TaskFinished has incomplete expectations.
     * [MESOS-388] - Latest update breaks building on OSX
     * [MESOS-404] - FilesTest.BrowseTest is flaky
     * [MESOS-413] - AllocatorTest/0.TaskFinished test has bad expectations.
 
+** Improvement
+    * [MESOS-252] - Web UI Improvements
+    * [MESOS-253] - Enable -Wall -Werror on the build
+    * [MESOS-254] - Improve mesos slave's garbage collection
+    * [MESOS-259] - Expose slave attributes in slave endpoint & sortable tables
+
 ** Task
+    * [MESOS-275] - HTTP endpoint for file download.
+    * [MESOS-279] - Impose a limit on HTTP Response size
     * [MESOS-389] - Add OSX slave to the Jenkins build.
+    * [MESOS-491] - Add mesos-0.11.0-incubating jar to maven central
 
 
 Release Notes - Mesos - Version 0.10.0
@@ -34,7 +159,6 @@ Release Notes - Mesos - Version 0.10.0
     * [MESOS-83] - Filters should be removed when more resources are available than rejected offer had
     * [MESOS-145] - mesos executor holds on to fd spawned by slave after slave death, preventing slave from restarting
     * [MESOS-148] - Building of included Hadoop broken
-    * [MESOS-152] - Slave should forward status updates for unknown tasks
     * [MESOS-164] - Crash on Mac OS X due to dlopen not being thread-safe
     * [MESOS-183] - Included MPI Framework Fails to Start
     * [MESOS-187] - Mesos should not pass an invalid task to a slave
@@ -53,16 +177,11 @@ Release Notes - Mesos - Version 0.10.0
     * [MESOS-248] - Python quit unexpectedly while using mesos plugin
     * [MESOS-251] - DRF allocator doesn't expire filter correctly
     * [MESOS-257] - Master doesn't recover resources when executor exits
-    * [MESOS-261] - bootstrap fails when automake version >= 1.12
-    * [MESOS-262] - Slave should not charge the resources required for launching a executor against the executor 
-    * [MESOS-263] - Complete the new webui (slave, framework, executor pages)
-    * [MESOS-264] - Make fails on the latest ubuntu
+    * [MESOS-262] - Slave should not charge the resources required for launching a executor against the executor
     * [MESOS-266] - When the master removes a slave, a shutdown should be sent.
-    * [MESOS-268] - Slave should force kill executors when it is shutting down 
-    * [MESOS-270] - Log viewing broken on mesos-local runs
+    * [MESOS-268] - Slave should force kill executors when it is shutting down
     * [MESOS-278] - Master Fails to Connect to Zookeeper
     * [MESOS-284] - Short-term fix for fire-walling slave shutdown and lost slave messages from the 'wrong' master
-    * [MESOS-285] - configure.macosx checks for version "10.7" but should check for 10.7 or greater
     * [MESOS-286] - AllocatorTest is flaky
     * [MESOS-288] - Latest trunk does not finish make check
     * [MESOS-290] - Jobtracker can't get TaskTrackerInfo when the JobTracker log file is deleted
@@ -70,18 +189,11 @@ Release Notes - Mesos - Version 0.10.0
     * [MESOS-302] - Scheduler driver shouldn't send an ACK if the driver is aborted while sending stats update
     * [MESOS-303] - mesos slave crashes during framework termination
     * [MESOS-306] - Mesos-master frequently crashes
-    * [MESOS-307] - Web UI file download links are broken.
     * [MESOS-310] - cgroups isolation module should not block on fetching executors
-    * [MESOS-317] - python mesos core bindings rejects framework messages with null bytes
-    * [MESOS-319] - Fix buggy read / write calls.
-    * [MESOS-321] - libprocess http::encode fails test
-    * [MESOS-325] - make clean is broken
-    * [MESOS-332] - Executor launcher fetches resources as slave user, instead of executor user. 
     * [MESOS-339] - Release script is expecting enter, not any key
-    * [MESOS-340] - Gperftools target always rebuilds.
-    * [MESOS-381] - Problem building on Ubuntu 12.04 LTE
     * [MESOS-382] - FaultToleranceTest.FrameworkReliableRegistration test is flaky.
     * [MESOS-383] - AllocatorTest/0.FrameworkExited test is broken
+    * [MESOS-464] - mesos 0.10.0 fails to build on ubutu 13.04
 
 ** Improvement
     * [MESOS-8] - Maintain a history of executed frameworks/tasks and show it on the web UI
@@ -90,20 +202,14 @@ Release Notes - Mesos - Version 0.10.0
     * [MESOS-149] - Garbage collection on slaves
     * [MESOS-171] - Make CommandInfo 'uri' field be repeated, possibly making a URI embedded message to describe whether or not we should 'chmod +x' the resulting resource.
     * [MESOS-180] - Update the Hadoop patch to list protobuf-2.4.1 as a dependency so Maven pulls it down.
-    * [MESOS-193] - Create a single-page javascript interface to replace the existing webui 
+    * [MESOS-193] - Create a single-page javascript interface to replace the existing webui
     * [MESOS-194] - Make killtree more verbose.
-    * [MESOS-252] - Web UI Improvements
-    * [MESOS-253] - Enable -Wall -Werror on the build
-    * [MESOS-254] - Improve mesos slave's garbage collection
     * [MESOS-255] - Expose files through HTTP endpoints.
     * [MESOS-256] - Introduce a cluster name into the Web UI
-    * [MESOS-259] - Expose slave attributes in slave endpoint & sortable tables
     * [MESOS-272] - Create a 'fs' namespace and migrate as appropriate from our 'os' namespace.
-    * [MESOS-293] - Make clean deletes checked in files.
 
 ** New Feature
     * [MESOS-86] - Expose master url to the scheduler
-    * [MESOS-99] - display slave resource usage information in the slave webui
     * [MESOS-158] - Make ExecutorInfo more rich
     * [MESOS-185] - Provide a master stat indicating number of outstanding resource offers
     * [MESOS-207] - A new isolation module on Linux that uses Linux control groups (cgroups) directly.
@@ -118,12 +224,7 @@ Release Notes - Mesos - Version 0.10.0
     * [MESOS-69] - Migrate to Apache wiki and de-activate github wiki
     * [MESOS-80] - Add a page or section to the wiki defining project coding standards
     * [MESOS-133] - Make Mesos clean of most GCC warnings
-    * [MESOS-274] - Unicode / Binary files over http endpoints.
-    * [MESOS-275] - HTTP endpoint for file download.
-    * [MESOS-279] - Impose a limit on HTTP Response size
-    * [MESOS-283] - Use cpp implementations for python protobufs.
-    * [MESOS-324] - Monitor executor resource usage.
-    * [MESOS-331] - Add --disable-perftools to configure.
+    * [MESOS-398] - Add mesos-0.10.0-incubating jar to maven central
 
 
 Release Notes - Mesos - Version 0.9.0


[10/12] git commit: Updated CHANGELOG for 0.13.0 (rc2).

Posted by vi...@apache.org.
Updated CHANGELOG for 0.13.0 (rc2).


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/09f4c11b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/09f4c11b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/09f4c11b

Branch: refs/heads/0.13.x
Commit: 09f4c11b820124a42c18d52ce80ad0e1fc5abc82
Parents: cfb83a4
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Jun 11 12:07:13 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 11 12:08:51 2013 -0700

----------------------------------------------------------------------
 CHANGELOG | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/09f4c11b/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index b070e22..c262c74 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -58,6 +58,7 @@ Release Notes - Mesos - Version 0.13.0
     * [MESOS-481] -  Slave needs to only inform the master about non-terminal executors, for proper resource accounting
     * [MESOS-482] - Status update manager should not cleanup the stream when there are pending updates, even though it received an ACK for a terminal update
     * [MESOS-484] - Latest ZooKeeperState.cpp doesn't compile on Mountain Lion
+    * [MESOS-502] - Slave crashes when handling duplicate terminal updates
 
 ** Improvement
     * [MESOS-46] - Refactor MasterTest to use fixture