You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/01/11 23:28:44 UTC

svn commit: r1230289 [2/3] - in /incubator/mesos/trunk: ./ include/mesos/ src/ src/common/ src/detector/ src/examples/ src/examples/java/ src/examples/python/ src/exec/ src/java/jni/ src/java/src/org/apache/mesos/ src/log/ src/master/ src/messages/ src...

Modified: incubator/mesos/trunk/src/log/coordinator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/coordinator.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/coordinator.cpp (original)
+++ incubator/mesos/trunk/src/log/coordinator.cpp Wed Jan 11 22:28:41 2012
@@ -1,8 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #include <algorithm>
 
 #include <process/dispatch.hpp>
 #include <process/future.hpp>
-#include <process/timeout.hpp>
 
 #include "common/foreach.hpp"
 #include "common/option.hpp"
@@ -34,9 +51,15 @@ Coordinator::Coordinator(int _quorum,
 Coordinator::~Coordinator() {}
 
 
-Result<uint64_t> Coordinator::elect()
+Result<uint64_t> Coordinator::elect(const Timeout& timeout)
 {
-  CHECK(!elected);
+  LOG(INFO) << "Coordinator attempting to get elected within "
+            << timeout.remaining() << " seconds";
+
+  if (elected) {
+    // TODO(benh): No-op instead of error?
+    return Result<uint64_t>::error("Coordinator already elected");
+  }
 
   // Get the highest known promise from our local replica.
   Future<uint64_t> promise = replica->promised();
@@ -61,8 +84,6 @@ Result<uint64_t> Coordinator::elect()
   Option<Future<PromiseResponse> > option;
   int okays = 0;
 
-  Timeout timeout = 1.0; // TODO(benh): Have timeout get passed in!
-
   do {
     option = select(futures, timeout.remaining());
     if (option.isSome()) {
@@ -87,7 +108,7 @@ Result<uint64_t> Coordinator::elect()
 
   // Either we have a quorum or we timed out.
   if (okays >= quorum) {
-    LOG(INFO) << "Coordinator elected!";
+    LOG(INFO) << "Coordinator elected, attempting to fill missing positions";
     elected = true;
 
     // Need to "catchup" local replica (i.e., fill in any unlearned
@@ -99,19 +120,22 @@ Result<uint64_t> Coordinator::elect()
 
     Future<set<uint64_t> > positions = replica->missing(index);
 
-    positions.await();  // TODO(benh): Have timeout get passed in!
+    positions.await(timeout.remaining());
 
     if (positions.isFailed()) {
+      elected = false;
       return Result<uint64_t>::error(positions.failure());
     }
 
     CHECK(positions.isReady()) << "Not expecting a discarded future!";
 
     foreach (uint64_t position, positions.get()) {
-      Result<Action> result = fill(position);
+      Result<Action> result = fill(position, timeout);
       if (result.isError()) {
+        elected = false;
         return Result<uint64_t>::error(result.error());
       } else if (result.isNone()) {
+        elected = false;
         return Result<uint64_t>::none();
       } else {
         CHECK(result.isSome());
@@ -136,7 +160,9 @@ Result<uint64_t> Coordinator::demote()
 }
 
 
-Result<uint64_t> Coordinator::append(const string& bytes)
+Result<uint64_t> Coordinator::append(
+    const string& bytes,
+    const Timeout& timeout)
 {
   if (!elected) {
     return Result<uint64_t>::error("Coordinator not elected");
@@ -150,7 +176,7 @@ Result<uint64_t> Coordinator::append(con
   Action::Append* append = action.mutable_append();
   append->set_bytes(bytes);
 
-  Result<uint64_t> result = write(action);
+  Result<uint64_t> result = write(action, Timeout(timeout));
 
   if (result.isSome()) {
     CHECK(result.get() == index);
@@ -161,7 +187,9 @@ Result<uint64_t> Coordinator::append(con
 }
 
 
-Result<uint64_t> Coordinator::truncate(uint64_t to)
+Result<uint64_t> Coordinator::truncate(
+    uint64_t to,
+    const Timeout& timeout)
 {
   if (!elected) {
     return Result<uint64_t>::error("Coordinator not elected");
@@ -175,7 +203,7 @@ Result<uint64_t> Coordinator::truncate(u
   Action::Truncate* truncate = action.mutable_truncate();
   truncate->set_to(to);
 
-  Result<uint64_t> result = write(action);
+  Result<uint64_t> result = write(action, timeout);
 
   if (result.isSome()) {
     CHECK(result.get() == index);
@@ -186,11 +214,14 @@ Result<uint64_t> Coordinator::truncate(u
 }
 
 
-Result<uint64_t> Coordinator::write(const Action& action)
+Result<uint64_t> Coordinator::write(
+    const Action& action,
+    const Timeout& timeout)
 {
   LOG(INFO) << "Coordinator attempting to write "
             << Action::Type_Name(action.type())
-            << " action at position " << action.position();
+            << " action at position " << action.position()
+            << " within " << timeout.remaining() << " seconds";
 
   CHECK(elected);
 
@@ -238,8 +269,6 @@ Result<uint64_t> Coordinator::write(cons
   Option<Future<WriteResponse> > option;
   int okays = 0;
 
-  Timeout timeout = 1.0; // TODO(benh): Have timeout get passed in!
-
   do {
     option = select(futures, timeout.remaining());
     if (option.isSome()) {
@@ -307,16 +336,20 @@ Result<uint64_t> Coordinator::commit(con
       LOG(FATAL) << "Unknown Action::Type!";
   }
 
+  //  TODO(benh): Add a non-message based way to do this write.
+  Future<WriteResponse> future = protocol::write(replica->pid(), request);
+
   // We send a write request to the *local* replica just as the
   // others: asynchronously via messages. However, rather than add the
   // complications of dealing with timeouts for local operations
   // (especially since we are trying to commit something), we make
   // things simpler and block on the response from the local replica.
-  // TODO(benh): Add a non-message based way to do this write.
+  // Maybe we can let it timeout, but consider it a failure? This
+  // might be sound because we don't send the learned messages ... so
+  // this should be the same as if we just failed before we even do
+  // the write ... a client should just retry this write later.
 
-  Future<WriteResponse> future = protocol::write(replica->pid(), request);
-
-  future.await(); // TODO(benh): Let it timeout, but consider it a failure.
+  future.await(); // TODO(benh): Don't wait forever, see comment above.
 
   if (future.isFailed()) {
     return Result<uint64_t>::error(future.failure());
@@ -349,7 +382,7 @@ Result<uint64_t> Coordinator::commit(con
 }
 
 
-Result<Action> Coordinator::fill(uint64_t position)
+Result<Action> Coordinator::fill(uint64_t position, const Timeout& timeout)
 {
   LOG(INFO) << "Coordinator attempting to fill position "
             << position << " in the log";
@@ -367,8 +400,6 @@ Result<Action> Coordinator::fill(uint64_
   Option<Future<PromiseResponse> > option;
   list<PromiseResponse> responses;
 
-  Timeout timeout = 1.0; // TODO(benh): Have timeout get passed in!
-
   do {
     option = select(futures, timeout.remaining());
     if (option.isSome()) {
@@ -433,7 +464,7 @@ Result<Action> Coordinator::fill(uint64_
       action.set_performed(id);
     }
 
-    Result<uint64_t> result = write(action);
+    Result<uint64_t> result = write(action, timeout);
 
     if (result.isError()) {
       return Result<Action>::error(result.error());

Modified: incubator/mesos/trunk/src/log/coordinator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/coordinator.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/coordinator.hpp (original)
+++ incubator/mesos/trunk/src/log/coordinator.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #ifndef __LOG_COORDINATOR_HPP__
 #define __LOG_COORDINATOR_HPP__
 
@@ -5,8 +23,10 @@
 #include <vector>
 
 #include <process/process.hpp>
+#include <process/timeout.hpp>
 
 #include "common/result.hpp"
+#include "common/seconds.hpp"
 
 #include "log/network.hpp"
 #include "log/replica.hpp"
@@ -20,9 +40,6 @@ namespace log {
 
 using namespace process;
 
-// TODO(benh): Pass timeouts into the coordinator functions rather
-// than have hard coded timeouts within.
-
 class Coordinator
 {
 public:
@@ -36,32 +53,32 @@ public:
   // coordinator failed to achieve a quorum (e.g., due to timeout) but
   // can be retried. A some result returns the last committed log
   // position.
-  Result<uint64_t> elect();
+  Result<uint64_t> elect(const Timeout& timeout);
   Result<uint64_t> demote();
 
   // Returns the result of trying to append the specified bytes. A
   // result of none means the append failed (e.g., due to timeout),
   // but can be retried.
-  Result<uint64_t> append(const std::string& bytes);
+  Result<uint64_t> append(const std::string& bytes, const Timeout& timeout);
 
   // Returns the result of trying to truncate the log (from the
   // beginning to the specified position exclusive). A result of
   // none means the truncate failed (e.g., due to timeout), but can be
   // retried.
-  Result<uint64_t> truncate(uint64_t to);
+  Result<uint64_t> truncate(uint64_t to, const Timeout& timeout);
 
 private:
   // Helper that tries to achieve consensus of the specified action. A
   // result of none means the write failed (e.g., due to timeout), but
   // can be retried.
-  Result<uint64_t> write(const Action& action);
+  Result<uint64_t> write(const Action& action, const Timeout& timeout);
 
   // Helper that handles commiting an action (i.e., writing to the
   // local replica and then sending out learned messages).
   Result<uint64_t> commit(const Action& action);
 
   // Helper that tries to fill a position in the log.
-  Result<Action> fill(uint64_t position);
+  Result<Action> fill(uint64_t position, const Timeout& timeout);
 
   // Helper that uses the specified protocol to broadcast a request to
   // our group and return a set of futures.

Modified: incubator/mesos/trunk/src/log/log.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/log.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/log.cpp (original)
+++ incubator/mesos/trunk/src/log/log.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 // TODO(benh): Optimize LearnedMessage (and the "commit" stage in
 // general) by figuring out a way to not send the entire action
 // contents a second time (should cut bandwidth used in half).
@@ -603,7 +621,7 @@ int main(int argc, char** argv)
   }
 
   args = argv;
-  
+
   int quorum = atoi(argv[1]);
   string file = argv[2];
   string servers = argv[3];
@@ -619,7 +637,7 @@ int main(int argc, char** argv)
 
     int at = atoi(argv[i]);
     int to = atoi(argv[i + 1]);
-    
+
     truncations[at] = to;
   }
 

Modified: incubator/mesos/trunk/src/log/log.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/log.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/log.hpp (original)
+++ incubator/mesos/trunk/src/log/log.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #ifndef __LOG_HPP__
 #define __LOG_HPP__
 
@@ -6,9 +24,11 @@
 #include <string>
 
 #include <process/process.hpp>
+#include <process/timeout.hpp>
 
 #include "common/foreach.hpp"
 #include "common/result.hpp"
+#include "common/seconds.hpp"
 #include "common/try.hpp"
 
 #include "log/coordinator.hpp"
@@ -103,7 +123,9 @@ public:
 
     // Returns all entries between the specified positions, unless
     // those positions are invalid, in which case returns an error.
-    Result<std::list<Entry> > read(const Position& from, const Position& to);
+    Result<std::list<Entry> > read(const Position& from,
+                                   const Position& to,
+                                   const seconds& timeout);
 
     // Returns the beginning position of the log from the perspective
     // of the local replica (which may be out of date if the log has
@@ -127,20 +149,20 @@ public:
     // one writer (local and remote) is valid at a time. A writer
     // becomes invalid if any operation returns an error, and a new
     // writer must be created in order perform subsequent operations.
-    Writer(Log* log, int retries = 3);
+    Writer(Log* log, const seconds& timeout, int retries = 3);
     ~Writer();
 
     // Attempts to append the specified data to the log. A none result
     // means the operation timed out, otherwise the new ending
     // position of the log is returned or an error. Upon error a new
     // Writer must be created.
-    Result<Position> append(const std::string& data);
+    Result<Position> append(const std::string& data, const seconds& timeout);
 
     // Attempts to truncate the log up to but not including the
     // specificed position. A none result means the operation timed
     // out, otherwise the new ending position of the log is returned
     // or an error. Upon error a new Writer must be created.
-    Result<Position> truncate(const Position& to);
+    Result<Position> truncate(const Position& to, const seconds& timeout);
 
   private:
     Option<std::string> error;
@@ -258,19 +280,19 @@ Log::Reader::~Reader() {}
 
 Result<std::list<Log::Entry> > Log::Reader::read(
     const Log::Position& from,
-    const Log::Position& to)
+    const Log::Position& to,
+    const seconds& timeout)
 {
   process::Future<std::list<Action> > actions =
     replica->read(from.value, to.value);
 
-  // TODO(benh): Take a timeout!
-  actions.await();
-
-  if (actions.isFailed()) {
+  if (!actions.await(timeout.value)) {
+    return Result<std::list<Log::Entry> >::none();
+  } else if (actions.isFailed()) {
     return Result<std::list<Log::Entry> >::error(actions.failure());
   }
 
-  CHECK(actions.isReady()) << "Not expecting discarded future!"; 
+  CHECK(actions.isReady()) << "Not expecting discarded future!";
 
   std::list<Log::Entry> entries;
 
@@ -319,12 +341,14 @@ Log::Position Log::Reader::ending()
 }
 
 
-Log::Writer::Writer(Log* log, int retries)
+Log::Writer::Writer(Log* log, const seconds& timeout, int retries)
   : coordinator(log->quorum, log->replica, log->network),
     error(Option<std::string>::none())
 {
+  LOG(INFO) << "Number of retries: " << retries;
+
   do {
-    Result<uint64_t> result = coordinator.elect();
+    Result<uint64_t> result = coordinator.elect(Timeout(timeout.value));
     if (result.isNone()) {
       retries--;
     } else if (result.isSome()) {
@@ -343,7 +367,9 @@ Log::Writer::~Writer()
 }
 
 
-Result<Log::Position> Log::Writer::append(const std::string& data)
+Result<Log::Position> Log::Writer::append(
+    const std::string& data,
+    const seconds& timeout)
 {
   if (error.isSome()) {
     return Result<Log::Position>::error(error.get());
@@ -351,7 +377,7 @@ Result<Log::Position> Log::Writer::appen
 
   LOG(INFO) << "Attempting to append " << data.size() << " bytes to the log";
 
-  Result<uint64_t> result = coordinator.append(data);
+  Result<uint64_t> result = coordinator.append(data, Timeout(timeout.value));
 
   if (result.isError()) {
     error = result.error();
@@ -366,7 +392,9 @@ Result<Log::Position> Log::Writer::appen
 }
 
 
-Result<Log::Position> Log::Writer::truncate(const Log::Position& to)
+Result<Log::Position> Log::Writer::truncate(
+    const Log::Position& to,
+    const seconds& timeout)
 {
   if (error.isSome()) {
     return Result<Log::Position>::error(error.get());
@@ -374,7 +402,8 @@ Result<Log::Position> Log::Writer::trunc
 
   LOG(INFO) << "Attempting to truncate the log to " << to.value;
 
-  Result<uint64_t> result = coordinator.truncate(to.value);
+  Result<uint64_t> result =
+    coordinator.truncate(to.value, Timeout(timeout.value));
 
   if (result.isError()) {
     error = result.error();

Modified: incubator/mesos/trunk/src/log/main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/main.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/main.cpp (original)
+++ incubator/mesos/trunk/src/log/main.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #include <iostream>
 #include <list>
 #include <string>

Modified: incubator/mesos/trunk/src/log/network.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/network.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/network.hpp (original)
+++ incubator/mesos/trunk/src/log/network.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #ifndef __NETWORK_HPP__
 #define __NETWORK_HPP__
 

Modified: incubator/mesos/trunk/src/log/replica.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/replica.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/replica.cpp (original)
+++ incubator/mesos/trunk/src/log/replica.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #include <leveldb/comparator.h>
 #include <leveldb/db.h>
 #include <leveldb/write_batch.h>
@@ -7,6 +25,7 @@
 #include <process/dispatch.hpp>
 #include <process/protobuf.hpp>
 
+#include "common/timer.hpp"
 #include "common/utils.hpp"
 
 #include "log/replica.hpp"
@@ -108,48 +127,55 @@ private:
     }
   };
 
-  static leveldb::Slice slice(uint64_t position)
+  // Returns a string representing the specified position. Note that
+  // we adjust the actual position by incrementing it by 1 because we
+  // reserve 0 for storing the promise record (Record::Promise).
+  static string encode(uint64_t position, bool adjust = true)
   {
-    // TODO(benh): Use varint comparator.
-    LOG(FATAL) << "Unimplemented";
+    // Adjusted stringified represenation is plus 1 of actual position.
+    position = adjust ? position + 1 : position;
+
+    // TODO(benh): Use varint encoding for VarInt64Comparator!
     // string s;
     // google::protobuf::io::StringOutputStream _stream(&s);
     // google::protobuf::io::CodedOutputStream stream(&_stream);
+    // position = adjust ? position + 1 : position;
     // stream.WriteVarint64(position);
     // return s;
-  }
 
-  static string stringify(uint64_t position)
-  {
-    // TODO(benh): Eliminate this once we can use custom comparators!
     Try<string> s = strings::format("%.*d", 10, position);
     CHECK(s.isSome());
     return s.get();
   }
 
-  static uint64_t position(const leveldb::Slice& s)
+  // Returns the position as represented in the specified slice
+  // (performing a decrement as necessary to determine the actual
+  // position represented).
+  static uint64_t decode(const leveldb::Slice& s)
   {
-    // TODO(benh): Use varint comparator.
+    // TODO(benh): Use varint decoding for VarInt64Comparator!
     // uint64_t position;
     // google::protobuf::io::ArrayInputStream _stream(s.data(), s.size());
     // google::protobuf::io::CodedInputStream stream(&_stream);
     // bool success = stream.ReadVarint64(&position);
     // CHECK(success);
-    // return position;
+    // return position - 1; // Actual position is less 1 of stringified.
     Try<uint64_t> position =
       utils::numify<uint64_t>(string(s.data(), s.size()));
     CHECK(position.isSome());
-    return position.get();
+    return position.get() - 1; // Actual position is less 1 of stringified.
   }
 
   // Varint64Comparator comparator; // TODO(benh): Use varint comparator.
 
   leveldb::DB* db;
+
+  uint64_t first; // First position still in leveldb, used during truncation.
 };
 
 
 LevelDBStorage::LevelDBStorage()
-  : db(NULL)
+  : db(NULL), first(0)
 {
   // Nothing to see here.
 }
@@ -173,9 +199,9 @@ Try<State> LevelDBStorage::recover(const
   // string produces a stable ordering. Checks below.
   // options.comparator = &comparator;
 
-  const string& one = stringify(1);
-  const string& two = stringify(2);
-  const string& ten = stringify(10);
+  const string& one = encode(1);
+  const string& two = encode(2);
+  const string& ten = encode(10);
 
   CHECK(leveldb::BytewiseComparator()->Compare(one, two) < 0);
   CHECK(leveldb::BytewiseComparator()->Compare(two, one) > 0);
@@ -195,6 +221,11 @@ Try<State> LevelDBStorage::recover(const
   state.begin = 0;
   state.end = 0;
 
+  // TODO(benh): Consider just reading the "promise" record (e.g.,
+  // 'encode(0, false)') and then iterating over the rest of the
+  // records and confirming that they are all indeed of type
+  // Record::Action.
+
   leveldb::Iterator* iterator = db->NewIterator(leveldb::ReadOptions());
 
   iterator->SeekToFirst();
@@ -243,6 +274,17 @@ Try<State> LevelDBStorage::recover(const
     iterator->Next();
   }
 
+  // Determine the first position still in leveldb so during a
+  // truncation we can attempt to delete all positions from the first
+  // position up to the truncate position. Note that this is not the
+  // beginning position of the log, but rather the first position that
+  // remains (i.e., hasn't been deleted) in leveldb.
+  iterator->Seek(encode(0));
+
+  if (iterator->Valid()) {
+    first = decode(iterator->key());
+  }
+
   delete iterator;
 
   return state;
@@ -251,6 +293,9 @@ Try<State> LevelDBStorage::recover(const
 
 Try<void> LevelDBStorage::persist(const Promise& promise)
 {
+  Timer timer;
+  timer.start();
+
   leveldb::WriteOptions options;
   options.sync = true;
 
@@ -264,45 +309,24 @@ Try<void> LevelDBStorage::persist(const 
     return Try<void>::error("Failed to serialize record");
   }
 
-  leveldb::Status status = db->Put(options, stringify(0), value);
+  leveldb::Status status = db->Put(options, encode(0, false), value);
 
   if (!status.ok()) {
     return Try<void>::error(status.ToString());
   }
 
+  LOG(INFO) << "Persisting promise (" << value.size()
+            << " bytes) to leveldb took "
+            << timer.elapsed().millis() << " milliseconds";
+
   return Try<void>::some();
 }
 
 
 Try<void> LevelDBStorage::persist(const Action& action)
 {
-  leveldb::WriteBatch batch;
-
-  // Delete positions only if a truncate action has been *learned*.
-  // TODO(benh): Consider doing this asynchronously (but will require
-  // synchronization on the underlying DB).
-  if (action.has_learned() && action.learned() &&
-      action.has_type() && action.type() == Action::TRUNCATE) {
-    CHECK(action.has_truncate());
-
-    leveldb::Iterator* iterator = db->NewIterator(leveldb::ReadOptions());
-
-    iterator->Seek(stringify(1)); // The actual "beginning" of the log.
-
-    const string& to = stringify(action.truncate().to() + 1);
-
-    while (iterator->Valid()) {
-      // Only iterate as far as (but excluding) the truncate position.
-      // TODO(benh): Use varint comparator.
-      if (leveldb::BytewiseComparator()->Compare(iterator->key(), to) >= 0) {
-        break;
-      }
-      batch.Delete(iterator->key());
-      iterator->Next();
-    }
-
-    delete iterator;
-  }
+  Timer timer;
+  timer.start();
 
   Record record;
   record.set_type(Record::ACTION);
@@ -314,28 +338,84 @@ Try<void> LevelDBStorage::persist(const 
     return Try<void>::error("Failed to serialize record");
   }
 
-  batch.Put(stringify(action.position() + 1), value);
-
   leveldb::WriteOptions options;
   options.sync = true;
 
-  leveldb::Status status = db->Write(options, &batch);
+  leveldb::Status status = db->Put(options, encode(action.position()), value);
 
   if (!status.ok()) {
     return Try<void>::error(status.ToString());
   }
 
+  LOG(INFO) << "Persisting action (" << value.size()
+            << " bytes) to leveldb took "
+            << timer.elapsed().millis() << " milliseconds";
+
+  // Delete positions if a truncate action has been *learned*. Note
+  // that we do this in a best-effort fashion (i.e., we ignore any
+  // failures to the database since we can always try again).
+  if (action.has_type() && action.type() == Action::TRUNCATE &&
+      action.has_learned() && action.learned()) {
+    CHECK(action.has_truncate());
+
+    timer.start(); // Restart the timer.
+
+    // To actually perform the truncation in leveldb we need to remove
+    // all the keys that represent positions no longer in the log. We
+    // do this by attempting to delete all keys that represent the
+    // first position we know is still in leveldb up to (but
+    // excluding) the truncate position. Note that this works because
+    // the semantics of WriteBatch are such that even if the position
+    // doesn't exist (which is possible because this replica has some
+    // holes), we can attempt to delete the key that represents it and
+    // it will just ignore that key. This is *much* cheaper than
+    // actually iterating through the entire database instead (which
+    // was, for posterity, the original implementation). In addition,
+    // caching the "first" position we know is in the database is
+    // cheaper than using an iterator to determine the first position
+    // (which was, for posterity, the second implementation).
+
+    leveldb::WriteBatch batch;
+
+    // Add positions up to (but excluding) the truncate position to
+    // the batch starting at the first position still in leveldb.
+    uint64_t index = 0;
+    while ((first + index) < action.truncate().to()) {
+      batch.Delete(encode(first + index));
+      index++;
+    }
+
+    // If we added any positions, attempt to delete them!
+    if (index > 0) {
+      // We do this write asynchronously (e.g., using default options).
+      leveldb::Status status = db->Write(leveldb::WriteOptions(), &batch);
+
+      if (!status.ok()) {
+        LOG(WARNING) << "Ignoring leveldb batch delete failure: "
+                     << status.ToString();
+      } else {
+        first = action.truncate().to(); // Save the new first position!
+
+        LOG(INFO) << "Deleting ~" << index << " keys from leveldb took "
+                  << timer.elapsed().millis() << " milliseconds";
+      }
+    }
+  }
+
   return Try<void>::some();
 }
 
 
 Try<Action> LevelDBStorage::read(uint64_t position)
 {
+  Timer timer;
+  timer.start();
+
   string value;
 
   leveldb::ReadOptions options;
 
-  leveldb::Status status = db->Get(options, stringify(position + 1), &value);
+  leveldb::Status status = db->Get(options, encode(position), &value);
 
   if (!status.ok()) {
     return Try<Action>::error(status.ToString());
@@ -353,6 +433,9 @@ Try<Action> LevelDBStorage::read(uint64_
     return Try<Action>::error("Bad record");
   }
 
+  LOG(INFO) << "Reading position from leveldb took "
+            << timer.elapsed().millis() << " milliseconds";
+
   return record.action();
 }
 
@@ -661,6 +744,8 @@ void ReplicaProcess::promise(const Promi
 
 void ReplicaProcess::write(const WriteRequest& request)
 {
+  LOG(INFO) << "Replica received write request for position " << request.position();
+
   Result<Action> result = read(request.position());
 
   if (result.isError()) {
@@ -759,6 +844,8 @@ void ReplicaProcess::write(const WriteRe
 
 void ReplicaProcess::learned(const Action& action)
 {
+  LOG(INFO) << "Replica received learned notice for position " << action.position();
+
   CHECK(action.learned());
 
   if (persist(action)) {
@@ -771,6 +858,8 @@ void ReplicaProcess::learned(const Actio
 
 void ReplicaProcess::learn(uint64_t position)
 {
+  LOG(INFO) << "Replica received learn request for position " << position;
+
   Result<Action> result = read(position);
 
   if (result.isError()) {

Modified: incubator/mesos/trunk/src/log/replica.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/replica.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/replica.hpp (original)
+++ incubator/mesos/trunk/src/log/replica.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #ifndef __LOG_REPLICA_HPP__
 #define __LOG_REPLICA_HPP__
 

Modified: incubator/mesos/trunk/src/master/constants.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/constants.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/constants.hpp (original)
+++ incubator/mesos/trunk/src/master/constants.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #ifndef __MASTER_CONSTANTS_HPP__
 #define __MASTER_CONSTANTS_HPP__
 
@@ -32,15 +50,15 @@ const double SLAVE_PONG_TIMEOUT = 15.0;
 // Maximum number of timeouts until slave is considered failed.
 const int MAX_SLAVE_TIMEOUTS = 5;
 
-// Default time to wait for a framework scheduler to failover.
-const int DEFAULT_FAILOVER_TIMEOUT = 1;
+// Time to wait for a framework to failover.
+const double FRAMEWORK_FAILOVER_TIMEOUT = 1.0;
 
 // Maximum number of completed frameworks to store in the cache.
-// TODO(thomasm): make configurable
+// TODO(thomasm): Make configurable.
 const int MAX_COMPLETED_FRAMEWORKS = 100;
 
-// Maximum number of completed tasks per framework to store in the cache.
-// TODO(thomasm): make configurable
+// Maximum number of completed tasks per framework to store in the
+// cache.  TODO(thomasm): Make configurable.
 const int MAX_COMPLETED_TASKS_PER_FRAMEWORK = 500;
 
 } // namespace mesos {

Modified: incubator/mesos/trunk/src/master/frameworks_manager.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/frameworks_manager.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/frameworks_manager.cpp (original)
+++ incubator/mesos/trunk/src/master/frameworks_manager.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #include <glog/logging.h>
 
 #include <process/timer.hpp>

Modified: incubator/mesos/trunk/src/master/frameworks_manager.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/frameworks_manager.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/frameworks_manager.hpp (original)
+++ incubator/mesos/trunk/src/master/frameworks_manager.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #ifndef __MASTER_FRAMEWORKS_MANAGER_HPP__
 #define __MASTER_FRAMEWORKS_MANAGER_HPP__
 

Modified: incubator/mesos/trunk/src/master/http.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/http.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/http.cpp (original)
+++ incubator/mesos/trunk/src/master/http.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #include <iomanip>
 #include <sstream>
 #include <string>
@@ -31,9 +49,9 @@ namespace master {
 JSON::Object model(const Resources& resources)
 {
   // TODO(benh): Add all of the resources.
-  Resource::Scalar none;
-  Resource::Scalar cpus = resources.get("cpus", none);
-  Resource::Scalar mem = resources.get("mem", none);
+  Value::Scalar none;
+  Value::Scalar cpus = resources.get("cpus", none);
+  Value::Scalar mem = resources.get("mem", none);
 
   JSON::Object object;
   object.values["cpus"] = cpus.value();
@@ -93,6 +111,7 @@ JSON::Object model(const Framework& fram
     object.values["tasks"] = array;
   }
 
+  // Model all of the completed tasks of a framework.
   {
     JSON::Array array;
     foreach (const Task& task, framework.completedTasks) {
@@ -196,7 +215,7 @@ Promise<HttpResponse> stats(
   }
 
   foreach (const Resource& resource, totalResources) {
-    if (resource.type() == Resource::SCALAR) {
+    if (resource.type() == Value::SCALAR) {
       CHECK(resource.has_scalar());
       double total = resource.scalar().value();
       object.values[resource.name() + "_total"] = total;
@@ -214,7 +233,7 @@ Promise<HttpResponse> stats(
   JSON::render(out, object);
 
   HttpOKResponse response;
-  response.headers["Content-Type"] = "text/x-json";
+  response.headers["Content-Type"] = "application/json";
   response.headers["Content-Length"] = utils::stringify(out.str().size());
   response.body = out.str().data();
   return response;
@@ -270,7 +289,7 @@ Promise<HttpResponse> state(
   JSON::render(out, object);
 
   HttpOKResponse response;
-  response.headers["Content-Type"] = "text/x-json";
+  response.headers["Content-Type"] = "application/json";
   response.headers["Content-Length"] = utils::stringify(out.str().size());
   response.body = out.str().data();
   return response;

Modified: incubator/mesos/trunk/src/master/http.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/http.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/http.hpp (original)
+++ incubator/mesos/trunk/src/master/http.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #ifndef __MASTER_HTTP_HPP__
 #define __MASTER_HTTP_HPP__
 

Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Wed Jan 11 22:28:41 2012
@@ -229,7 +229,7 @@ void Master::registerOptions(Configurato
   configurator->addOption<int>(
       "failover_timeout",
       "Framework failover timeout in seconds",
-      DEFAULT_FAILOVER_TIMEOUT);
+      FRAMEWORK_FAILOVER_TIMEOUT);
 }
 
 
@@ -311,7 +311,7 @@ void Master::initialize()
   nextSlaveId = 0;
   nextOfferId = 0;
 
-  failoverTimeout = conf.get<int>("failover_timeout", DEFAULT_FAILOVER_TIMEOUT);
+  failoverTimeout = conf.get<int>("failover_timeout", FRAMEWORK_FAILOVER_TIMEOUT);
 
   // Start all the statistics at 0.
   CHECK(TASK_STARTING == TaskState_MIN);
@@ -694,13 +694,15 @@ void Master::launchTasks(const Framework
     } else {
       // The offer is gone (possibly rescinded, lost slave, re-reply
       // to same offer, etc). Report all tasks in it as failed.
+      // TODO: Consider adding a new task state TASK_INVALID for
+      // situations like these.
       foreach (const TaskDescription& task, tasks) {
         StatusUpdateMessage message;
         StatusUpdate* update = message.mutable_update();
         update->mutable_framework_id()->MergeFrom(frameworkId);
         TaskStatus* status = update->mutable_status();
         status->mutable_task_id()->MergeFrom(task.task_id());
-        status->set_state(TASK_FAILED);
+        status->set_state(TASK_LOST);
         status->set_message("Task launched with invalid offer");
         update->set_timestamp(elapsedTime());
         update->set_uuid(UUID::random().toBytes());
@@ -759,6 +761,7 @@ void Master::killTask(const FrameworkID&
       TaskStatus* status = update->mutable_status();
       status->mutable_task_id()->MergeFrom(taskId);
       status->set_state(TASK_LOST);
+      status->set_message("Task not found");
       update->set_timestamp(elapsedTime());
       update->set_uuid(UUID::random().toBytes());
       send(framework->pid, message);
@@ -1017,7 +1020,7 @@ void Master::exitedExecutor(const SlaveI
       // Tell the framework which tasks have been lost.
       foreachvalue (Task* task, utils::copy(framework->tasks)) {
         if (task->slave_id() == slave->id &&
-	    task->executor_id() == executorId) {
+            task->executor_id() == executorId) {
           StatusUpdateMessage message;
           StatusUpdate* update = message.mutable_update();
           update->mutable_framework_id()->MergeFrom(task->framework_id());
@@ -1026,6 +1029,7 @@ void Master::exitedExecutor(const SlaveI
           TaskStatus* status = update->mutable_status();
           status->mutable_task_id()->MergeFrom(task->task_id());
           status->set_state(TASK_LOST);
+          status->set_message("Lost executor");
           update->set_timestamp(elapsedTime());
           update->set_uuid(UUID::random().toBytes());
           send(framework->pid, message);
@@ -1040,7 +1044,7 @@ void Master::exitedExecutor(const SlaveI
         }
       }
 
-      // Remove executor from slave.
+      // Remove executor from slave and framework.
       slave->removeExecutor(frameworkId, executorId);
       framework->removeExecutor(slave->id, executorId);
 
@@ -1161,6 +1165,7 @@ void Master::makeOffers(Framework* frame
     offer->mutable_slave_id()->MergeFrom(slave->id);
     offer->set_hostname(slave->info.hostname());
     offer->mutable_resources()->MergeFrom(resources);
+    offer->mutable_attributes()->MergeFrom(slave->info.attributes());
 
     // Add all framework's executors running on this slave.
     if (slave->executors.contains(framework->id)) {
@@ -1306,7 +1311,12 @@ struct ResourceUsageChecker : TaskDescri
       if (!slave->hasExecutor(framework->id, executorInfo.executor_id())) {
         taskResources += executorInfo.resources();
         if (!((usedResources + taskResources) <= offer->resources())) {
-          return TaskDescriptionError::some(
+          LOG(WARNING) << "Task " << task.task_id() << " attempted to use "
+                  << taskResources << " combined with already used "
+                  << usedResources << " is greater than offered "
+                  << offer->resources();
+
+	  return TaskDescriptionError::some(
               "Task + executor uses more resources than offered");
         }
       }
@@ -1363,7 +1373,7 @@ void Master::processTasks(Offer* offer,
       update->mutable_framework_id()->MergeFrom(framework->id);
       TaskStatus* status = update->mutable_status();
       status->mutable_task_id()->MergeFrom(task.task_id());
-      status->set_state(TASK_FAILED);
+      status->set_state(TASK_LOST);
       status->set_message(error.get());
       update->set_timestamp(elapsedTime());
       update->set_uuid(UUID::random().toBytes());
@@ -1451,6 +1461,7 @@ Resources Master::launchTask(const TaskD
   resources += task.resources();
 
   LOG(INFO) << "Launching task " << task.task_id()
+            << " with resources " << task.resources()
             << " on slave " << slave->id;
 
   RunTaskMessage message;
@@ -1492,15 +1503,6 @@ void Master::failoverFramework(Framework
 {
   const UPID& oldPid = framework->pid;
 
-  // Remove the framework's offers (if they weren't removed before).
-  // TODO(benh): Consider just reoffering these to the new framework.
-  foreach (Offer* offer, utils::copy(framework->offers)) {
-    allocator->resourcesRecovered(offer->framework_id(),
-                                  offer->slave_id(),
-                                  offer->resources());
-    removeOffer(offer);
-  }
-
   {
     FrameworkErrorMessage message;
     message.set_code(1);
@@ -1518,9 +1520,23 @@ void Master::failoverFramework(Framework
 
   framework->reregisteredTime = elapsedTime();
 
-  FrameworkRegisteredMessage message;
-  message.mutable_framework_id()->MergeFrom(framework->id);
-  send(newPid, message);
+  {
+    FrameworkRegisteredMessage message;
+    message.mutable_framework_id()->MergeFrom(framework->id);
+    send(newPid, message);
+  }
+
+  // Remove the framework's offers (if they weren't removed before).
+  // We do this after we have updated the pid and sent the framework
+  // registered message so that the allocator can immediately re-offer
+  // these resources to this framework if it wants.
+  // TODO(benh): Consider just reoffering these to
+  foreach (Offer* offer, utils::copy(framework->offers)) {
+    allocator->resourcesRecovered(offer->framework_id(),
+                                  offer->slave_id(),
+                                  offer->resources());
+    removeOffer(offer);
+  }
 }
 
 
@@ -1631,9 +1647,9 @@ void Master::readdSlave(Slave* slave,
     // Find the executor running this task and add it to the slave.
     foreach (const ExecutorInfo& executorInfo, executorInfos) {
       if (executorInfo.executor_id() == task.executor_id()) {
-	if (!slave->hasExecutor(task.framework_id(), task.executor_id())) {
-	  slave->addExecutor(task.framework_id(), executorInfo);
-	}
+        if (!slave->hasExecutor(task.framework_id(), task.executor_id())) {
+          slave->addExecutor(task.framework_id(), executorInfo);
+        }
 
         // Also add it to the framework if it has re-registered with us.
         Framework* framework = getFramework(task.framework_id());
@@ -1641,7 +1657,7 @@ void Master::readdSlave(Slave* slave,
           CHECK(!framework->hasExecutor(slave->id, task.executor_id()));
           framework->addExecutor(slave->id, executorInfo);
         }
-	break;
+        break;
       }
     }
 
@@ -1700,6 +1716,7 @@ void Master::removeSlave(Slave* slave)
       TaskStatus* status = update->mutable_status();
       status->mutable_task_id()->MergeFrom(task->task_id());
       status->set_state(TASK_LOST);
+      status->set_message("Slave removed");
       update->set_timestamp(elapsedTime());
       update->set_uuid(UUID::random().toBytes());
       send(framework->pid, message);
@@ -1722,7 +1739,7 @@ void Master::removeSlave(Slave* slave)
       }
     }
   }
-  
+
   // Remove slave from any filters.
   foreachvalue (Framework* framework, frameworks) {
     framework->slaveFilter.erase(slave);

Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Wed Jan 11 22:28:41 2012
@@ -216,6 +216,8 @@ private:
 
   std::list<Framework> completedFrameworks;
 
+  double failoverTimeout; // Failover timeout for frameworks, in seconds.
+
   int64_t nextFrameworkId; // Used to give each framework a unique ID.
   int64_t nextOfferId;     // Used to give each slot offer a unique ID.
   int64_t nextSlaveId;     // Used to give each slave a unique ID.
@@ -229,11 +231,7 @@ private:
     uint64_t invalidFrameworkMessages;
   } stats;
 
-  // Start time used to calculate uptime.
-  double startTime;
-
-  // Failover timeout for frameworks, in seconds.
-  int failoverTimeout;
+  double startTime; // Start time used to calculate uptime.
 };
 
 
@@ -271,6 +269,8 @@ struct Slave
       std::make_pair(task->framework_id(), task->task_id());
     CHECK(tasks.count(key) == 0);
     tasks[key] = task;
+    VLOG(1) << "Adding task with resources " << task->resources()
+	    << " on slave " << id;
     resourcesInUse += task->resources();
   }
 
@@ -280,6 +280,8 @@ struct Slave
       std::make_pair(task->framework_id(), task->task_id());
     CHECK(tasks.count(key) > 0);
     tasks.erase(key);
+    VLOG(1) << "Removing task with resources " << task->resources()
+	    << " on slave " << id;
     resourcesInUse -= task->resources();
   }
 
@@ -287,6 +289,8 @@ struct Slave
   {
     CHECK(!offers.contains(offer));
     offers.insert(offer);
+    VLOG(1) << "Adding offer with resources " << offer->resources()
+	    << " on slave " << id;
     resourcesOffered += offer->resources();
   }
 
@@ -294,6 +298,8 @@ struct Slave
   {
     CHECK(offers.contains(offer));
     offers.erase(offer);
+    VLOG(1) << "Removing offer with resources " << offer->resources()
+	    << " on slave " << id;
     resourcesOffered -= offer->resources();
   }
 
@@ -330,7 +336,13 @@ struct Slave
 
   Resources resourcesFree()
   {
-    return info.resources() - (resourcesOffered + resourcesInUse);
+    Resources resources = info.resources() - (resourcesOffered + resourcesInUse);
+    VLOG(1) << "Calculating resources free on slave " << id << std::endl
+	    << "    Resources: " << info.resources() << std::endl
+	    << "    Resources Offered: " << resourcesOffered << std::endl
+	    << "    Resources In Use: " << resourcesInUse << std::endl
+	    << "    Resources Free: " << resources << std::endl;
+    return resources;
   }
 
   const SlaveID id;
@@ -419,14 +431,14 @@ struct Framework
   }
 
   bool hasExecutor(const SlaveID& slaveId,
-		   const ExecutorID& executorId)
+                   const ExecutorID& executorId)
   {
     return executors.contains(slaveId) &&
       executors[slaveId].contains(executorId);
   }
 
   void addExecutor(const SlaveID& slaveId,
-		   const ExecutorInfo& executorInfo)
+                   const ExecutorInfo& executorInfo)
   {
     CHECK(!hasExecutor(slaveId, executorInfo.executor_id()));
     executors[slaveId][executorInfo.executor_id()] = executorInfo;
@@ -436,7 +448,7 @@ struct Framework
   }
 
   void removeExecutor(const SlaveID& slaveId,
-		      const ExecutorID& executorId)
+                      const ExecutorID& executorId)
   {
     if (hasExecutor(slaveId, executorId)) {
       // Update our resources to reflect removing this executor.
@@ -444,7 +456,7 @@ struct Framework
 
       executors[slaveId].erase(executorId);
       if (executors[slaveId].size() == 0) {
-	executors.erase(slaveId);
+        executors.erase(slaveId);
       }
     }
   }
@@ -481,6 +493,7 @@ struct Framework
   hashset<Offer*> offers; // Active offers for framework.
 
   Resources resources; // Total resources (tasks + offers + executors).
+
   hashmap<SlaveID, hashmap<ExecutorID, ExecutorInfo> > executors;
 
   // Contains a time of unfiltering for each slave we've filtered,

Modified: incubator/mesos/trunk/src/master/simple_allocator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/simple_allocator.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/simple_allocator.cpp (original)
+++ incubator/mesos/trunk/src/master/simple_allocator.cpp Wed Jan 11 22:28:41 2012
@@ -16,8 +16,6 @@
  * limitations under the License.
  */
 
-#include <algorithm>
-
 #include <glog/logging.h>
 
 #include <algorithm>
@@ -172,14 +170,14 @@ struct DominantShareComparator
     // scalars.
 
     foreach (const Resource& resource, resources) {
-      if (resource.type() == Resource::SCALAR) {
+      if (resource.type() == Value::SCALAR) {
         double total = resource.scalar().value();
 
         if (total > 0) {
-          Resource::Scalar none;
-          const Resource::Scalar& scalar1 =
+          Value::Scalar none;
+          const Value::Scalar& scalar1 =
             framework1->resources.get(resource.name(), none);
-          const Resource::Scalar& scalar2 =
+          const Value::Scalar& scalar2 =
             framework2->resources.get(resource.name(), none);
           share1 = max(share1, scalar1.value() / total);
           share2 = max(share2, scalar2.value() / total);
@@ -257,9 +255,9 @@ void SimpleAllocator::makeNewOffers(cons
       // resources, rather than the master pushing resources out to
       // frameworks.
 
-      Resource::Scalar none;
-      Resource::Scalar cpus = resources.get("cpus", none);
-      Resource::Scalar mem = resources.get("mem", none);
+      Value::Scalar none;
+      Value::Scalar cpus = resources.get("cpus", none);
+      Value::Scalar mem = resources.get("mem", none);
 
       if (cpus.value() >= MIN_CPUS && mem.value() > MIN_MEM) {
         VLOG(1) << "Found available resources: " << resources

Modified: incubator/mesos/trunk/src/messages/log.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/log.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages/log.hpp (original)
+++ incubator/mesos/trunk/src/messages/log.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #ifndef __MESSAGES_LOG_HPP__
 #define __MESSAGES_LOG_HPP__
 

Modified: incubator/mesos/trunk/src/messages/log.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/log.proto?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages/log.proto (original)
+++ incubator/mesos/trunk/src/messages/log.proto Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package mesos.internal.log;
 
 

Modified: incubator/mesos/trunk/src/messages/messages.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/messages.proto?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages/messages.proto (original)
+++ incubator/mesos/trunk/src/messages/messages.proto Wed Jan 11 22:28:41 2012
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

Modified: incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp (original)
+++ incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp Wed Jan 11 22:28:41 2012
@@ -91,7 +91,7 @@ PyTypeObject MesosExecutorDriverImplType
 PyMethodDef MesosExecutorDriverImpl_methods[] = {
   {"start", (PyCFunction) MesosExecutorDriverImpl_start, METH_NOARGS,
    "Start the driver to connect to Mesos"},
-  {"stop", (PyCFunction) MesosExecutorDriverImpl_stop, METH_VARARGS,
+  {"stop", (PyCFunction) MesosExecutorDriverImpl_stop, METH_NOARGS,
    "Stop the driver, disconnecting from Mesos"},
   {"abort", (PyCFunction) MesosExecutorDriverImpl_abort, METH_NOARGS,
    "Abort the driver, disallowing calls from and to the driver"},
@@ -223,21 +223,14 @@ PyObject* MesosExecutorDriverImpl_start(
 }
 
 
-PyObject* MesosExecutorDriverImpl_stop(MesosExecutorDriverImpl* self,
-                                       PyObject* args)
+PyObject* MesosExecutorDriverImpl_stop(MesosExecutorDriverImpl* self)
 {
   if (self->driver == NULL) {
     PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is NULL");
     return NULL;
   }
 
-  bool failover = false;
-
-  if (!PyArg_ParseTuple(args, "b", &failover)) {
-    return NULL;
-  }
-
-  Status status = self->driver->stop(failover);
+  Status status = self->driver->stop();
   return PyInt_FromLong(status); // Sets an exception if creating the int fails
 }
 

Modified: incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.hpp (original)
+++ incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.hpp Wed Jan 11 22:28:41 2012
@@ -83,8 +83,7 @@ int MesosExecutorDriverImpl_clear(MesosE
 // MesosExecutorDriverImpl methods
 PyObject* MesosExecutorDriverImpl_start(MesosExecutorDriverImpl* self);
 
-PyObject* MesosExecutorDriverImpl_stop(MesosExecutorDriverImpl* self,
-                                       PyObject* args);
+PyObject* MesosExecutorDriverImpl_stop(MesosExecutorDriverImpl* self);
 
 PyObject* MesosExecutorDriverImpl_abort(MesosExecutorDriverImpl* self);
 

Modified: incubator/mesos/trunk/src/python/src/mesos.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/src/mesos.py?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/src/mesos.py (original)
+++ incubator/mesos/trunk/src/python/src/mesos.py Wed Jan 11 22:28:41 2012
@@ -38,12 +38,12 @@ class Scheduler:
 # in mock objects for tests.
 class SchedulerDriver:
   def start(self): pass
-  def stop(self, failover = False): pass
+  def stop(self, failover=False): pass
   def abort(self) : pass
   def join(self): pass
   def run(self): pass
   def requestResources(self, requests): pass
-  def launchTasks(self, offerId, tasks, filters = None): pass
+  def launchTasks(self, offerId, tasks, filters=None): pass
   def killTask(self, taskId): pass
   def reviveOffers(self): pass
   def sendFrameworkMessage(self, slaveId, executorId, data): pass
@@ -68,7 +68,7 @@ class Executor:
 # in mock objects for tests.
 class ExecutorDriver:
   def start(self): pass
-  def stop(self, failover = False): pass
+  def stop(self): pass
   def abort(self): pass
   def join(self): pass
   def run(self): pass

Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Wed Jan 11 22:28:41 2012
@@ -281,7 +281,8 @@ protected:
 
     invoke(bind(&Scheduler::statusUpdate, sched, driver, status));
 
-    if (pid) {
+    // Send a status update acknowledgement ONLY if not aborted!
+    if (!aborted && pid) {
       // Acknowledge the message (we do this last, after we invoked
       // the scheduler, if we did at all, in case it causes a crash,
       // since this way the message might get resent/routed after the
@@ -409,14 +410,14 @@ protected:
     if (!connected) {
       VLOG(1) << "Ignoring launch tasks message as master is disconnected";
       // NOTE: Reply to the framework with TASK_LOST messages for each
-      // task. This is a hack for now, to not let the scheduler believe the
-      // tasks are forever in PENDING state, when actually the master
-      // never received the launchTask message. Also, realize that this
-      // hack doesn't capture the case when the scheduler process sends it
-      // but the master never receives it (message lost, master failover etc).
-      // In the future, this should be solved by the replicated log and timeouts.
+      // task. This is a hack for now, to not let the scheduler
+      // believe the tasks are forever in PENDING state, when actually
+      // the master never received the launchTask message. Also,
+      // realize that this hack doesn't capture the case when the
+      // scheduler process sends it but the master never receives it
+      // (message lost, master failover etc).  In the future, this
+      // should be solved by the replicated log and timeouts.
       foreach (const TaskDescription& task, tasks) {
-        VLOG(1) << "Sending TASK_LOST update for task" << task.task_id().value();
         StatusUpdate update;
         update.mutable_framework_id()->MergeFrom(frameworkId);
         TaskStatus* status = update.mutable_status();
@@ -437,10 +438,18 @@ protected:
     message.mutable_filters()->MergeFrom(filters);
 
     foreach (const TaskDescription& task, tasks) {
-      VLOG(1) << "Launching task id: " << task.task_id().value() << " name: " << task.name();
       // Keep only the slave PIDs where we run tasks so we can send
       // framework messages directly.
-      savedSlavePids[task.slave_id()] = savedOffers[offerId][task.slave_id()];
+      if (savedOffers.count(offerId) > 0) {
+        if (savedOffers[offerId].count(task.slave_id()) > 0) {
+          savedSlavePids[task.slave_id()] =
+            savedOffers[offerId][task.slave_id()];
+        } else {
+          VLOG(1) << "Attempting to launch a task with the wrong slave id";
+        }
+      } else {
+        VLOG(1) << "Attempting to launch a task with an unknown offer";
+      }
 
       message.add_tasks()->MergeFrom(task);
     }
@@ -464,8 +473,8 @@ protected:
   }
 
   void sendFrameworkMessage(const SlaveID& slaveId,
-			                const ExecutorID& executorId,
-			                const string& data)
+                            const ExecutorID& executorId,
+                            const string& data)
   {
     if (!connected) {
      VLOG(1) << "Ignoring send framework message as master is disconnected";

Modified: incubator/mesos/trunk/src/slave/constants.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/constants.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/constants.hpp (original)
+++ incubator/mesos/trunk/src/slave/constants.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #ifndef __SLAVE_CONSTANTS_HPP__
 #define __SLAVE_CONSTANTS_HPP__
 

Modified: incubator/mesos/trunk/src/slave/http.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/http.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/http.cpp (original)
+++ incubator/mesos/trunk/src/slave/http.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #include <iomanip>
 #include <sstream>
 #include <string>
@@ -31,9 +49,9 @@ namespace slave {
 JSON::Object model(const Resources& resources)
 {
   // TODO(benh): Add all of the resources.
-  Resource::Scalar none;
-  Resource::Scalar cpus = resources.get("cpus", none);
-  Resource::Scalar mem = resources.get("mem", none);
+  Value::Scalar none;
+  Value::Scalar cpus = resources.get("cpus", none);
+  Value::Scalar mem = resources.get("mem", none);
 
   JSON::Object object;
   object.values["cpus"] = cpus.value();
@@ -150,7 +168,7 @@ Promise<HttpResponse> stats(
   JSON::render(out, object);
 
   HttpOKResponse response;
-  response.headers["Content-Type"] = "text/x-json";
+  response.headers["Content-Type"] = "application/json";
   response.headers["Content-Length"] = utils::stringify(out.str().size());
   response.body = out.str().data();
   return response;
@@ -185,7 +203,7 @@ Promise<HttpResponse> state(
   JSON::render(out, object);
 
   HttpOKResponse response;
-  response.headers["Content-Type"] = "text/x-json";
+  response.headers["Content-Type"] = "application/json";
   response.headers["Content-Length"] = utils::stringify(out.str().size());
   response.body = out.str().data();
   return response;

Modified: incubator/mesos/trunk/src/slave/http.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/http.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/http.hpp (original)
+++ incubator/mesos/trunk/src/slave/http.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #ifndef __SLAVE_HTTP_HPP__
 #define __SLAVE_HTTP_HPP__
 

Modified: incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp Wed Jan 11 22:28:41 2012
@@ -294,7 +294,7 @@ void LxcIsolationModule::resourcesChange
   string property;
   uint64_t value;
 
-  double cpu = resources.get("cpu", Resource::Scalar()).value();
+  double cpu = resources.get("cpu", Value::Scalar()).value();
   int32_t cpu_shares = max(CPU_SHARES_PER_CPU * (int32_t) cpu, MIN_CPU_SHARES);
 
   property = "cpu.shares";
@@ -306,7 +306,7 @@ void LxcIsolationModule::resourcesChange
     return;
   }
 
-  double mem = resources.get("mem", Resource::Scalar()).value();
+  double mem = resources.get("mem", Value::Scalar()).value();
   int64_t limit_in_bytes = max((int64_t) mem, MIN_MEMORY_MB) * 1024LL * 1024LL;
 
   property = "memory.limit_in_bytes";
@@ -377,7 +377,7 @@ vector<string> LxcIsolationModule::getCo
 
   std::ostringstream out;
 
-  double cpu = resources.get("cpu", Resource::Scalar()).value();
+  double cpu = resources.get("cpu", Value::Scalar()).value();
   int32_t cpu_shares = max(CPU_SHARES_PER_CPU * (int32_t) cpu, MIN_CPU_SHARES);
 
   options.push_back("-s");
@@ -386,7 +386,7 @@ vector<string> LxcIsolationModule::getCo
 
   out.str("");
 
-  double mem = resources.get("mem", Resource::Scalar()).value();
+  double mem = resources.get("mem", Value::Scalar()).value();
   int64_t limit_in_bytes = max((int64_t) mem, MIN_MEMORY_MB) * 1024LL * 1024LL;
 
   options.push_back("-s");

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Jan 11 22:28:41 2012
@@ -92,6 +92,9 @@ Slave::Slave(const Configuration& _conf,
   resources =
     Resources::parse(conf.get<string>("resources", "cpus:1;mem:1024"));
 
+  attributes =
+    Attributes::parse(conf.get<string>("attributes", ""));
+
   initialize();
 }
 
@@ -277,10 +280,11 @@ void Slave::operator () ()
   }
 
   // Initialize slave info.
+  info.set_hostname(hostname);
   info.set_webui_hostname(webui_hostname);
   info.set_webui_port(conf.get<int>("webui_port", 8081));
-  info.set_hostname(hostname);
   info.mutable_resources()->MergeFrom(resources);
+  info.mutable_attributes()->MergeFrom(attributes);
 
   // Spawn and initialize the isolation module.
   // TODO(benh): Seems like the isolation module should really be
@@ -780,6 +784,7 @@ void Slave::registerExecutor(const Frame
 
     foreachvalue (const TaskDescription& task, executor->queuedTasks) {
       stats.tasks[TASK_STARTING]++;
+
       RunTaskMessage message;
       message.mutable_framework_id()->MergeFrom(framework->id);
       message.mutable_framework()->MergeFrom(framework->info);
@@ -1388,9 +1393,12 @@ string Slave::createUniqueWorkDirectory(
   LOG(INFO) << "Generating a unique work directory for executor '"
             << executorId << "' of framework " << frameworkId;
 
-  string workDir = "work";  // No relevant conf options set.
+  string workDir = "work";  // Default work directory.
+
+  // Now look for configured work directory.
   Option<string> option = conf.get("work_dir");
-  if (!option.isSome()) {
+  if (option.isNone()) {
+    // Okay, then look for a home directory instead.
     option = conf.get("home");
     if (option.isSome()) {
       workDir = option.get() + "/work";
@@ -1399,7 +1407,6 @@ string Slave::createUniqueWorkDirectory(
     workDir = option.get();
   }
 
-
   std::ostringstream out(std::ios_base::app | std::ios_base::out);
   out << workDir << "/slaves/" << id
       << "/frameworks/" << frameworkId
@@ -1415,11 +1422,11 @@ string Slave::createUniqueWorkDirectory(
 
   for (int i = 0; i < INT_MAX; i++) {
     out << i;
-    if (opendir(out.str().c_str()) == NULL && errno == ENOENT)
+    DIR* d = opendir(out.str().c_str());
+    if (d == NULL && errno == ENOENT) {
       break;
-
-    // TODO(benh): Does one need to do any sort of closedir?
-
+    }
+    closedir(d);
     out.str(dir);
   }
 

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Jan 11 22:28:41 2012
@@ -26,6 +26,7 @@
 #include "slave/http.hpp"
 #include "slave/isolation_module.hpp"
 
+#include "common/attributes.hpp"
 #include "common/resources.hpp"
 #include "common/hashmap.hpp"
 #include "common/type_utils.hpp"
@@ -162,6 +163,7 @@ private:
   UPID master;
 
   Resources resources;
+  Attributes attributes;
 
   hashmap<FrameworkID, Framework*> frameworks;
 

Modified: incubator/mesos/trunk/src/tests/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/Makefile.in?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/Makefile.in (original)
+++ incubator/mesos/trunk/src/tests/Makefile.in Wed Jan 11 22:28:41 2012
@@ -148,7 +148,8 @@ TESTS_OBJ = main.o utils.o master_tests.
 	    protobuf_io_tests.o lxc_isolation_tests.o utils_tests.o	\
 	    jvm.o zookeeper_server.o base_zookeeper_test.o		\
 	    zookeeper_server_tests.o zookeeper_tests.o			\
-	    url_processor_tests.o killtree_tests.o exception_tests.o
+	    url_processor_tests.o killtree_tests.o exception_tests.o    \
+	    values_tests.o attributes_test.o
 
 ALLTESTS_EXE = $(BINDIR)/tests/all-tests
 

Modified: incubator/mesos/trunk/src/tests/base_zookeeper_test.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/base_zookeeper_test.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/base_zookeeper_test.cpp (original)
+++ incubator/mesos/trunk/src/tests/base_zookeeper_test.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #include <signal.h>
 
 #include <queue>

Modified: incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp (original)
+++ incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #ifndef __BASE_ZOOKEEPER_TEST_HPP__
 #define __BASE_ZOOKEEPER_TEST_HPP__
 

Modified: incubator/mesos/trunk/src/tests/exception_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/exception_tests.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/exception_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/exception_tests.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #include <gmock/gmock.h>
 
 #include <mesos/executor.hpp>

Modified: incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #include <gmock/gmock.h>
 
 #include <mesos/executor.hpp>

Modified: incubator/mesos/trunk/src/tests/jvm.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/jvm.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/jvm.cpp (original)
+++ incubator/mesos/trunk/src/tests/jvm.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #include <jni.h>
 #include <stdarg.h>
 

Modified: incubator/mesos/trunk/src/tests/jvm.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/jvm.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/jvm.hpp (original)
+++ incubator/mesos/trunk/src/tests/jvm.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #ifndef __TESTING_JVM_HPP__
 #define __TESTING_JVM_HPP__
 

Modified: incubator/mesos/trunk/src/tests/killtree_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/killtree_tests.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/killtree_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/killtree_tests.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 #include <gtest/gtest.h>
 
 #include "tests/external_test.hpp"