You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:04:48 UTC

svn commit: r1132231 [1/2] - in /incubator/mesos/trunk: include/ src/ src/common/ src/examples/ src/examples/java/ src/exec/ src/local/ src/master/ src/messaging/ src/sched/ src/slave/ src/tests/

Author: benh
Date: Sun Jun  5 09:04:47 2011
New Revision: 1132231

URL: http://svn.apache.org/viewvc?rev=1132231&view=rev
Log:
Made resources be "generic" in Mesos. You can now launch a slave with
the --resources flag. See the description in src/common/resources.hpp
for a description of how to construct resources from
strings. Unfortuanately, there are a few parts of our design that are
still very "cpus" and "mem" dependent. In particular, by making
resources generic we might end up sending a resource offer that only
has mem. Since a scheduler will obviously reject this, it could then
filter any resources from that slave for a few seconds (making tests
take longer and giving weird semantics to existing users). This will
need to change, but for now the allocator only sends resource offers
that contain BOTH cpus and mem. :( The webui also heavily relies on
their being specific resources for cpu and mem.

Added:
    incubator/mesos/trunk/src/common/resources.cpp
    incubator/mesos/trunk/src/common/resources.hpp
    incubator/mesos/trunk/src/common/tokenize.cpp
    incubator/mesos/trunk/src/common/tokenize.hpp
Modified:
    incubator/mesos/trunk/include/mesos.proto
    incubator/mesos/trunk/src/Makefile.in
    incubator/mesos/trunk/src/common/type_utils.hpp
    incubator/mesos/trunk/src/examples/cpp_test_framework.cpp
    incubator/mesos/trunk/src/examples/java/TestFramework.java
    incubator/mesos/trunk/src/examples/memhog.cpp
    incubator/mesos/trunk/src/examples/scheduled_memhog.cpp
    incubator/mesos/trunk/src/exec/exec.cpp
    incubator/mesos/trunk/src/local/local.cpp
    incubator/mesos/trunk/src/local/local.hpp
    incubator/mesos/trunk/src/master/master.cpp
    incubator/mesos/trunk/src/master/master.hpp
    incubator/mesos/trunk/src/master/simple_allocator.cpp
    incubator/mesos/trunk/src/messaging/messages.proto
    incubator/mesos/trunk/src/sched/sched.cpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp
    incubator/mesos/trunk/src/tests/master_test.cpp
    incubator/mesos/trunk/src/tests/offer_reply_errors_test.cpp
    incubator/mesos/trunk/src/tests/resources_test.cpp
    incubator/mesos/trunk/src/tests/utils.hpp

Modified: incubator/mesos/trunk/include/mesos.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/mesos.proto?rev=1132231&r1=1132230&r2=1132231&view=diff
==============================================================================
--- incubator/mesos/trunk/include/mesos.proto (original)
+++ incubator/mesos/trunk/include/mesos.proto Sun Jun  5 09:04:47 2011
@@ -43,42 +43,37 @@ message Params {
 // }
 
 
-message Resources {
-  required int32 cpus = 1;
-  required int32 mem = 2;
-}
-
 // TODO(benh): Add better support for resources.
-// message Resource {
-//   enum Type {
-//     SCALAR = 0;
-//     RANGES = 1;
-//     SET = 2;
-//   }
-
-//   message Scalar {
-//     required double value = 1;
-//   }
-
-//   message Range {
-//     required int64 begin = 1;
-//     required int64 end = 2;
-//   }
-
-//   message Ranges {
-//     repeated Range range = 1;
-//   }
-
-//   message Set {
-//     repeated string item = 1; 
-//   }
-
-//   required string name = 1;
-//   required Type type = 2;
-//   optional Scalar scalar = 3;
-//   optional Ranges ranges = 4;
-//   optional Set set = 5;
-// }
+message Resource {
+  enum Type {
+    SCALAR = 0;
+    RANGES = 1;
+    SET = 2;
+  }
+
+  message Scalar {
+    required double value = 1;
+  }
+
+  message Range {
+    required uint64 begin = 1;
+    required uint64 end = 2;
+  }
+
+  message Ranges {
+    repeated Range range = 1;
+  }
+
+  message Set {
+    repeated string item = 1; 
+  }
+
+  required string name = 1;
+  required Type type = 2;
+  optional Scalar scalar = 3;
+  optional Ranges ranges = 4;
+  optional Set set = 5;
+}
 
 
 message ExecutorArgs {
@@ -102,7 +97,7 @@ message TaskDescription {
   required string name = 1;
   required TaskID task_id = 2;
   required SlaveID slave_id = 3;
-  required Params params = 4; // TODO(benh): This will be replaced by Resources.
+  repeated Resource resources = 4;
   optional bytes data = 5;
 }
 
@@ -142,7 +137,7 @@ message FrameworkInfo {
 message SlaveInfo {
   required string hostname = 1;
   required string public_hostname = 2;
-  required Resources resources = 3;
+  repeated Resource resources = 3;
 //   repeated Attribute attribute = ;
 }
 
@@ -156,11 +151,5 @@ message FrameworkMessage {
 message SlaveOffer {
   required SlaveID slave_id = 1;
   required string hostname = 2;
-  required Params params = 3;
+  repeated Resource resources = 3;
 }
-
-
-// message ResourceOffer {
-//   required SlaveInfo slave = 1;
-//   repeated Resource resource = 2;
-// }

Modified: incubator/mesos/trunk/src/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.in?rev=1132231&r1=1132230&r2=1132231&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.in (original)
+++ incubator/mesos/trunk/src/Makefile.in Sun Jun  5 09:04:47 2011
@@ -134,7 +134,7 @@ SWIG_WEBUI_OBJ = $(MASTER_SWIG_WEBUI_OBJ
 COMMON_OBJ = common/fatal.o common/lock.o detector/detector.o		\
 	     detector/url_processor.o configurator/configurator.o	\
 	     common/string_utils.o common/logging.o			\
-	     common/date_utils.o
+	     common/date_utils.o common/tokenize.o common/resources.o
 
 ifeq ($(WITH_ZOOKEEPER),1)
   COMMON_OBJ += detector/zookeeper.o
@@ -334,7 +334,7 @@ $(MESOS_PROJD_EXE): $(SRCDIR)/slave/proj
 
 java: $(MESOS_JAVA_LIB) $(MESOS_JAVA_JAR)
 
-$(MESOS_JAVA_JAR): $(SRCDIR)/java/src/mesos/*.java | $(LIBDIR)/java
+$(MESOS_JAVA_JAR): $(SRCDIR)/java/src/mesos/*.java @top_srcdir@/include/mesos.proto | $(LIBDIR)/java
 ifdef JAVA_HOME
 	mkdir -p @top_builddir@/$(PROTOBUF)/java/src/main/java
 	$(PROTOC) --java_out=@top_builddir@/$(PROTOBUF)/java/src/main/java -I@top_srcdir@/$(PROTOBUF)/src @top_srcdir@/$(PROTOBUF)/src/google/protobuf/descriptor.proto

Added: incubator/mesos/trunk/src/common/resources.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/resources.cpp?rev=1132231&view=auto
==============================================================================
--- incubator/mesos/trunk/src/common/resources.cpp (added)
+++ incubator/mesos/trunk/src/common/resources.cpp Sun Jun  5 09:04:47 2011
@@ -0,0 +1,649 @@
+#include <iostream>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <boost/lexical_cast.hpp>
+
+#include "common/foreach.hpp"
+#include "common/resources.hpp"
+#include "common/tokenize.hpp"
+
+
+using boost::bad_lexical_cast;
+using boost::lexical_cast;
+
+using std::ostream;
+using std::string;
+using std::vector;
+
+
+namespace mesos { namespace internal {
+
+bool operator == (const Resource::Scalar& left, const Resource::Scalar& right)
+{
+  return left.value() == right.value();
+}
+
+
+bool operator <= (const Resource::Scalar& left, const Resource::Scalar& right)
+{
+  return left.value() <= right.value();
+}
+
+
+Resource::Scalar operator + (const Resource::Scalar& left, const Resource::Scalar& right)
+{
+  Resource::Scalar result;
+  result.set_value(left.value() + right.value());
+  return result;
+}
+
+  
+Resource::Scalar operator - (const Resource::Scalar& left, const Resource::Scalar& right)
+{
+  Resource::Scalar result;
+  result.set_value(left.value() - right.value());
+  return result;
+}
+
+  
+Resource::Scalar& operator += (Resource::Scalar& left, const Resource::Scalar& right)
+{
+  left.set_value(left.value() + right.value());
+  return left;
+}
+
+
+Resource::Scalar& operator -= (Resource::Scalar& left, const Resource::Scalar& right)
+{
+  left.set_value(left.value() - right.value());
+  return left;
+}
+
+
+static void coalesce(Resource::Ranges* ranges, const Resource::Range& range)
+{
+  // Note that we assume that ranges has already been coalesced.
+
+  bool coalesced = false;
+
+  for (int i = 0; i < ranges->range_size(); i++) {
+    int64_t begin = ranges->range(i).begin();
+    int64_t end = ranges->range(i).end();
+
+    if (begin <= range.begin() && range.end() <= end) {
+      // Ignore range since it is subsumed by a range in ranges.
+      coalesced = true;
+      break;
+    } else if (begin <= range.begin() && end < range.end()) {
+      // Grow the end of the range in ranges.
+      ranges->mutable_range(i)->set_end(range.end());
+      coalesced = true;
+      break;
+    } else if (range.begin() < begin && range.end() <= end) {
+      // Grow the beginning of the range in ranges.
+      ranges->mutable_range(i)->set_begin(range.begin());
+      coalesced = true;
+      break;
+    } else if (range.begin() < begin && end < range.end()) {
+      // Replace (grow both the beginning and the end) of the range in ranges.
+      ranges->mutable_range(i)->set_begin(range.begin());
+      ranges->mutable_range(i)->set_end(range.end());
+      coalesced = true;
+      break;
+    }
+  }
+  
+  if (!coalesced) {
+    ranges->add_range()->MergeFrom(range);
+  }
+}
+
+
+static void remove(Resource::Ranges* ranges, const Resource::Range& range)
+{
+  // Note that we assume that ranges has already been coalesced.
+
+  Resource::Ranges result;
+
+  for (int i = 0; i < ranges->range_size(); i++) {
+    int64_t begin = ranges->range(i).begin();
+    int64_t end = ranges->range(i).end();
+
+    if (begin == range.begin() && range.end() == end) {
+      // Remove range from ranges, but keep everything else.
+      for (int j = i; j < ranges->range_size(); j++) {
+        result.add_range()->MergeFrom(ranges->range(j));
+      }
+      break;
+    } else if (begin <= range.begin() && range.end() < end) {
+      // Shrink range in ranges.
+      Resource::Range* temp = result.add_range();
+      temp->set_begin(range.end() + 1);
+      temp->set_end(end);
+      break;
+    } else if (begin < range.begin() && range.end() >= end) {
+      // Shrink end of range in ranges.
+      Resource::Range* temp = result.add_range();
+      temp->set_begin(begin);
+      temp->set_end(range.begin() - 1);
+      break;
+    } else if (begin < range.begin() && range.end() < end) {
+      // Split range in ranges.
+      Resource::Range* temp = result.add_range();
+      temp->set_begin(begin);
+      temp->set_end(range.begin() - 1);
+      temp = result.add_range();
+      temp->set_begin(range.end() + 1);
+      temp->set_end(end);
+      break;
+    }
+  }
+
+  *ranges = result;
+}
+
+
+bool operator == (const Resource::Ranges& left, const Resource::Ranges& right)
+{
+  if (left.range_size() == right.range_size()) {
+    for (int i = 0; i < left.range_size(); i++) {
+      // Make sure this range is equal to a range in the right.
+      bool found = false;
+      for (int j = 0; j < right.range_size(); j++) {
+        if (left.range(i).begin() == right.range(j).begin() &&
+            left.range(i).end() == right.range(j).end()) {
+          found = true;
+          break;
+        }
+      }
+
+      if (!found) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  return false;
+}
+
+
+bool operator <= (const Resource::Ranges& left, const Resource::Ranges& right)
+{
+  if (left.range_size() <= right.range_size()) {
+    for (int i = 0; i < left.range_size(); i++) {
+      // Make sure this range is a subset of every range in right.
+      for (int j = 0; j < right.range_size(); j++) {
+        if ((left.range(i).begin() <= right.range(j).begin() &&
+             left.range(i).end() > right.range(j).end()) ||
+            (left.range(i).begin() < right.range(j).begin() &&
+             left.range(i).end() >= right.range(j).end())) {
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
+  return false;
+}
+
+
+Resource::Ranges operator + (const Resource::Ranges& left, const Resource::Ranges& right)
+{
+  Resource::Ranges result;
+
+  for (int i = 0; i < left.range_size(); i++) {
+    coalesce(&result, left.range(i));
+  }
+
+  for (int i = 0; i < right.range_size(); i++) {
+    coalesce(&result, right.range(i));
+  }
+
+  return result;
+}
+
+  
+Resource::Ranges operator - (const Resource::Ranges& left, const Resource::Ranges& right)
+{
+  Resource::Ranges result;
+
+  for (int i = 0; i < left.range_size(); i++) {
+    coalesce(&result, left.range(i));
+  }
+
+  for (int i = 0; i < right.range_size(); i++) {
+    coalesce(&result, right.range(i));
+  }
+
+  for (int i = 0; i < right.range_size(); i++) {
+    remove(&result, right.range(i));
+  }
+
+  return result;
+}
+
+  
+Resource::Ranges& operator += (Resource::Ranges& left, const Resource::Ranges& right)
+{
+  Resource::Ranges temp;
+
+  for (int i = 0; i < left.range_size(); i++) {
+    coalesce(&temp, left.range(i));
+  }
+
+  left = temp;
+
+  for (int i = 0; i < right.range_size(); i++) {
+    coalesce(&left, right.range(i));
+  }
+
+  return left;
+}
+
+
+Resource::Ranges& operator -= (Resource::Ranges& left, const Resource::Ranges& right)
+{
+  Resource::Ranges temp;
+
+  for (int i = 0; i < left.range_size(); i++) {
+    coalesce(&temp, left.range(i));
+  }
+
+  for (int i = 0; i < right.range_size(); i++) {
+    coalesce(&temp, right.range(i));
+  }
+
+  left = temp;
+
+  for (int i = 0; i < right.range_size(); i++) {
+    remove(&left, right.range(i));
+  }
+
+  return left;
+}
+
+
+bool operator == (const Resource::Set& left, const Resource::Set& right)
+{
+  if (left.item_size() == right.item_size()) {
+    for (int i = 0; i < left.item_size(); i++) {
+      // Make sure this item is equal to an item in the right.
+      bool found = false;
+      for (int j = 0; j < right.item_size(); j++) {
+        if (left.item(i) == right.item(i)) {
+          found = true;
+          break;
+        }
+      }
+
+      if (!found) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  return false;
+}
+
+
+bool operator <= (const Resource::Set& left, const Resource::Set& right)
+{
+  if (left.item_size() <= right.item_size()) {
+    for (int i = 0; i < left.item_size(); i++) {
+      // Make sure this item is equal to an item in the right.
+      bool found = false;
+      for (int j = 0; j < right.item_size(); j++) {
+        if (left.item(i) == right.item(i)) {
+          found = true;
+          break;
+        }
+      }
+
+      if (!found) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  return false;
+}
+
+
+Resource::Set operator + (const Resource::Set& left, const Resource::Set& right)
+{
+  Resource::Set result;
+
+  for (int i = 0; i < left.item_size(); i++) {
+    result.add_item(left.item(i));
+  }
+
+  // A little bit of extra logic to avoid adding duplicates from right.
+  for (int i = 0; i < right.item_size(); i++) {
+    bool found = false;
+    for (int j = 0; j < result.item_size(); j++) {
+      if (right.item(i) == result.item(j)) {
+        found = true;
+        break;
+      }
+    }
+
+    if (!found) {
+      result.add_item(right.item(i));
+    }
+  }
+
+  return result;
+}
+
+  
+Resource::Set operator - (const Resource::Set& left, const Resource::Set& right)
+{
+  Resource::Set result;
+
+  // Look for the same item in right as we add left to result.
+  for (int i = 0; i < left.item_size(); i++) {
+    bool found = false;
+    for (int j = 0; j < right.item_size(); j++) {
+      if (left.item(i) == right.item(j)) {
+        found = true;
+        break;
+      }
+    }
+
+    if (!found) {
+      result.add_item(left.item(i));
+    }
+  }
+
+  return result;
+}
+
+  
+Resource::Set& operator += (Resource::Set& left, const Resource::Set& right)
+{
+  // A little bit of extra logic to avoid adding duplicates from right.
+  for (int i = 0; i < right.item_size(); i++) {
+    bool found = false;
+    for (int j = 0; j < left.item_size(); j++) {
+      if (right.item(i) == left.item(j)) {
+        found = true;
+        break;
+      }
+    }
+
+    if (!found) {
+      left.add_item(right.item(i));
+    }
+  }
+
+  return left;
+}
+
+
+Resource::Set& operator -= (Resource::Set& left, const Resource::Set& right)
+{
+  // For each item in right check if it's in left and add it if not.
+  for (int i = 0; i < right.item_size(); i++) {
+    bool found = false;
+    for (int j = 0; j < left.item_size(); j++) {
+      if (right.item(i) == left.item(j)) {
+        found = true;
+        break;
+      }
+    }
+
+    if (!found) {
+      left.add_item(right.item(i));
+    }
+  }
+
+  return left;
+}
+
+
+bool operator == (const Resource& left, const Resource& right)
+{
+  if (left.name() == right.name() && left.type() == right.type()) {
+    if (left.type() == Resource::SCALAR) {
+      return left.scalar() == right.scalar();
+    } else if (left.type() == Resource::RANGES) {
+      return left.ranges() == right.ranges();
+    } else if (left.type() == Resource::SET) {
+      return left.set() == right.set();
+    }
+  }
+
+  return false;
+}
+
+
+bool operator <= (const Resource& left, const Resource& right)
+{
+  if (left.name() == right.name() && left.type() == right.type()) {
+    if (left.type() == Resource::SCALAR) {
+      return left.scalar() <= right.scalar();
+    } else if (left.type() == Resource::RANGES) {
+      return left.ranges() <= right.ranges();
+    } else if (left.type() == Resource::SET) {
+      return left.set() <= right.set();
+    }
+  }
+
+  return false;
+}
+
+
+Resource operator + (const Resource& left, const Resource& right)
+{
+  Resource result = left;
+
+  if (left.name() == right.name() && left.type() == right.type()) {
+    if (left.type() == Resource::SCALAR) {
+      result.mutable_scalar()->MergeFrom(left.scalar() + right.scalar());
+    } else if (left.type() == Resource::RANGES) {
+      result.mutable_ranges()->Clear();
+      result.mutable_ranges()->MergeFrom(left.ranges() + right.ranges());
+    } else if (left.type() == Resource::SET) {
+      result.mutable_set()->Clear();
+      result.mutable_set()->MergeFrom(left.set() + right.set());
+    }
+  }
+  
+  return result;
+}
+
+  
+Resource operator - (const Resource& left, const Resource& right)
+{
+  Resource result = left;
+
+  if (left.name() == right.name() && left.type() == right.type()) {
+    if (left.type() == Resource::SCALAR) {
+      result.mutable_scalar()->MergeFrom(left.scalar() - right.scalar());
+    } else if (left.type() == Resource::RANGES) {
+      result.mutable_ranges()->Clear();
+      result.mutable_ranges()->MergeFrom(left.ranges() - right.ranges());
+    } else if (left.type() == Resource::SET) {
+      result.mutable_set()->Clear();
+      result.mutable_set()->MergeFrom(left.set() - right.set());
+    }
+  }
+  
+  return result;
+}
+
+  
+Resource& operator += (Resource& left, const Resource& right)
+{
+  if (left.name() == right.name() && left.type() == right.type()) {
+    if (left.type() == Resource::SCALAR) {
+      left.mutable_scalar()->MergeFrom(left.scalar() + right.scalar());
+    } else if (left.type() == Resource::RANGES) {
+      left.mutable_ranges()->Clear();
+      left.mutable_ranges()->MergeFrom(left.ranges() + right.ranges());
+    } else if (left.type() == Resource::SET) {
+      left.mutable_set()->Clear();
+      left.mutable_set()->MergeFrom(left.set() + right.set());
+    }
+  }
+  
+  return left;
+}
+
+
+Resource& operator -= (Resource& left, const Resource& right)
+{
+  if (left.name() == right.name() && left.type() == right.type()) {
+    if (left.type() == Resource::SCALAR) {
+      left.mutable_scalar()->MergeFrom(left.scalar() - right.scalar());
+    } else if (left.type() == Resource::RANGES) {
+      left.mutable_ranges()->Clear();
+      left.mutable_ranges()->MergeFrom(left.ranges() - right.ranges());
+    } else if (left.type() == Resource::SET) {
+      left.mutable_set()->Clear();
+      left.mutable_set()->MergeFrom(left.set() - right.set());
+    }
+  }
+  
+  return left;
+}
+
+
+ostream& operator << (ostream& stream, const Resource& resource)
+{
+  stream << resource.name() << "=";
+  if (resource.type() == Resource::SCALAR) {
+    stream << resource.scalar().value();
+  } else if (resource.type() == Resource::RANGES) {
+    stream << "[";
+    for (int i = 0; i < resource.ranges().range_size(); i++) {
+      stream << resource.ranges().range(i).begin()
+             << "-"
+             << resource.ranges().range(i).end();
+      if (i + 1 < resource.ranges().range_size()) {
+        stream << ", ";
+      }
+    }
+    stream << "]";
+  } else if (resource.type() == Resource::SET) {
+    stream << "{";
+    for (int i = 0; i < resource.set().item_size(); i++) {
+      stream << resource.set().item(i);
+      if (i + 1 < resource.set().item_size()) {
+        stream << ", ";
+      }
+    }
+    stream << "}";
+  }
+
+  return stream;
+}
+
+
+Resource Resources::parse(const string& name, const string& value)
+{
+  Resource resource;
+  resource.set_name(name);
+
+  // Remove any spaces from the value.
+  string temp;
+  foreach (const char c, value) {
+    if (c != ' ') {
+      temp += c;
+    }
+  }
+
+  size_t index = temp.find('[');
+  if (index == 0) {
+    // This is a ranges.
+    Resource::Ranges ranges;
+    const vector<string>& tokens = tokenize(temp, "[]-,\n");
+    if (tokens.size() % 2 != 0) {
+      LOG(FATAL) << "Error parsing value for " << name
+                 << ", expecting one or more \"ranges\"";
+    } else {
+      for (int i = 0; i < tokens.size(); i += 2) {
+        Resource::Range *range = ranges.add_range();
+
+        int j = i;
+        try {
+          range->set_begin(lexical_cast<uint64_t>((tokens[j++])));
+          range->set_end(lexical_cast<uint64_t>(tokens[j++]));
+        } catch (const bad_lexical_cast&) {
+          LOG(FATAL) << "Error parsing value for " << name
+                     << ", expecting non-negative integers in '"
+                     << tokens[j - 1] << "'";
+        }
+      }
+
+      resource.set_type(Resource::RANGES);
+      resource.mutable_ranges()->MergeFrom(ranges);
+    }
+  } else if (index == string::npos) {
+    size_t index = temp.find('{');
+    if (index == 0) {
+      // This is a set.
+      Resource::Set set;
+      const vector<string>& tokens = tokenize(temp, "{},\n");
+      for (int i = 0; i < tokens.size(); i++) {
+        set.add_item(tokens[i]);
+      }
+
+      resource.set_type(Resource::SET);
+      resource.mutable_set()->MergeFrom(set);
+    } else if (index == string::npos) {
+      // This *should* be a scalar.
+      Resource::Scalar scalar;
+      try {
+        scalar.set_value(lexical_cast<double>(temp));
+      } catch (const bad_lexical_cast&) {
+        LOG(FATAL) << "Error parsing value for " << name
+                   << ", expecting a number from '" << temp << "'";
+      }
+
+      resource.set_type(Resource::SCALAR);
+      resource.mutable_scalar()->MergeFrom(scalar);
+    } else {
+      LOG(FATAL) << "Error parsing value for " << name
+                 << ", bad '{' found";
+    }
+  } else {
+    LOG(FATAL) << "Error parsing value for " << name
+               << ", bad '[' found";
+  }
+
+  return resource;
+}
+
+
+Resources Resources::parse(const string& s)
+{
+  // Tokenize and parse the value of "resources".
+  Resources resources;
+
+  vector<string> tokens = tokenize(s, ";\n");
+
+  for (int i = 0; i < tokens.size(); i++) {
+    const vector<string>& pairs = tokenize(tokens[i], ":");
+    if (pairs.size() != 2) {
+      LOG(FATAL) << "bad value for resources, missing ':' within " << pairs[0];
+    }
+
+    resources += parse(pairs[0], pairs[1]);
+  }
+
+  return resources;
+}
+
+
+}} // namespace mesos { namespace internal {

Added: incubator/mesos/trunk/src/common/resources.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/resources.hpp?rev=1132231&view=auto
==============================================================================
--- incubator/mesos/trunk/src/common/resources.hpp (added)
+++ incubator/mesos/trunk/src/common/resources.hpp Sun Jun  5 09:04:47 2011
@@ -0,0 +1,447 @@
+#ifndef __RESOURCES_HPP__
+#define __RESOURCES_HPP__
+
+#include <iterator>
+#include <string>
+
+#include <mesos.hpp>
+
+
+// Resources come in three types: scalar, ranges, and sets. These are
+// represented using protocol buffers. To make manipulation of
+// resources easier within the Mesos core we provide generic
+// overloaded opertors (see below) as well as a general Resources
+// class that encapsulates a collection of protocol buffer Resource
+// objects. The Resources class also provides a few static routines to
+// allow parsing resources (e.g., from the command line), as well as
+// determining whether or not a Resource object is valid or
+// allocatable. In particular, a scalar is allocatable if it's value
+// is greater than zero, a ranges is allocatable if there is at least
+// one valid range in it, and a set is allocatable if it has at least
+// one item. One can get only the allocatable resources by calling the
+// allocatable routine on a resources object. Note that many of these
+// operations have not been optimized but instead just written for
+// correct semantics.
+
+
+// Note! A resource is described by a tuple (name, type). Doing
+// "arithmetic" operations (those defined below) on two resources of
+// the same name but different type doesn't make sense, so it's
+// semantics are as though the second operand was actually just and
+// empty resource (as though you didn't do the operation at all). In
+// addition, doing operations on two resources of the same type but
+// different names is a no-op.
+
+
+namespace mesos { namespace internal {
+
+bool operator == (const Resource::Scalar& left, const Resource::Scalar& right);
+bool operator <= (const Resource::Scalar& left, const Resource::Scalar& right);
+Resource::Scalar operator + (const Resource::Scalar& left, const Resource::Scalar& right);
+Resource::Scalar operator - (const Resource::Scalar& left, const Resource::Scalar& right);
+Resource::Scalar& operator += (Resource::Scalar& left, const Resource::Scalar& right);
+Resource::Scalar& operator -= (Resource::Scalar& left, const Resource::Scalar& right);
+
+
+bool operator == (const Resource::Ranges& left, const Resource::Ranges& right);
+bool operator <= (const Resource::Ranges& left, const Resource::Ranges& right);
+Resource::Ranges operator + (const Resource::Ranges& left, const Resource::Ranges& right);
+Resource::Ranges operator - (const Resource::Ranges& left, const Resource::Ranges& right);
+Resource::Ranges& operator += (Resource::Ranges& left, const Resource::Ranges& right);
+Resource::Ranges& operator -= (Resource::Ranges& left, const Resource::Ranges& right);
+
+
+bool operator == (const Resource::Set& left, const Resource::Set& right);
+bool operator <= (const Resource::Set& left, const Resource::Set& right);
+Resource::Set operator + (const Resource::Set& left, const Resource::Set& right);
+Resource::Set operator - (const Resource::Set& left, const Resource::Set& right);
+Resource::Set& operator += (Resource::Set& left, const Resource::Set& right);
+Resource::Set& operator -= (Resource::Set& left, const Resource::Set& right);
+
+
+bool operator == (const Resource& left, const Resource& right);
+bool operator <= (const Resource& left, const Resource& right);
+Resource operator + (const Resource& left, const Resource& right);
+Resource operator - (const Resource& left, const Resource& right);
+Resource& operator += (Resource& left, const Resource& right);
+Resource& operator -= (Resource& left, const Resource& right);
+std::ostream& operator << (std::ostream& stream, const Resource& resource);
+
+
+class Resources
+{
+public:
+  Resources() {}
+
+  Resources(const google::protobuf::RepeatedPtrField<Resource>& _resources)
+  {
+    resources.MergeFrom(_resources);
+  }
+
+  Resources(const Resources& that)
+  {
+    resources.MergeFrom(that.resources);
+  }
+
+  Resources& operator = (const Resources& that)
+  {
+    if (this != &that) {
+      resources.Clear();
+      resources.MergeFrom(that.resources);
+    }
+
+    return *this;
+  }
+
+
+  // Returns a Resources object with only the allocatable resources.
+  Resources allocatable() const
+  {
+    Resources result;
+
+    foreach (const Resource& resource, resources) {
+      if (isAllocatable(resource)) {
+        result.resources.Add()->MergeFrom(resource);
+      }
+    }
+
+    return result;
+  }
+
+
+  size_t size() const
+  {
+    return resources.size();
+  }
+
+
+  // Using this operator makes it easy to copy a resources object into
+  // a protocol buffer field.
+  operator const google::protobuf::RepeatedPtrField<Resource>& () const
+  {
+    return resources;
+  }
+
+
+  bool operator == (const Resources& that) const
+  {
+    foreach (const Resource& resource, resources) {
+      if (!that.contains(resource.name(), resource.type())) {
+        return false;
+      } else {
+        if (!(resource == that.get(resource.name(), Resource()))) {
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
+
+  bool operator <= (const Resources& that) const
+  {
+    foreach (const Resource& resource, resources) {
+      if (!that.contains(resource.name(), resource.type())) {
+        return false;
+      } else {
+        if (!(resource <= that.get(resource.name(), Resource()))) {
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
+
+  Resources operator + (const Resources& that) const
+  {
+    Resources result(*this);
+
+    foreach (const Resource& resource, that.resources) {
+      result += resource;
+    }
+
+    return result;
+  }
+
+  
+  Resources operator - (const Resources& that) const
+  {
+    Resources result(*this);
+
+    foreach (const Resource& resource, that.resources) {
+      result -= resource;
+    }
+
+    return result;
+  }
+
+  
+  Resources& operator += (const Resources& that)
+  {
+    foreach (const Resource& resource, that.resources) {
+      *this += resource;
+    }
+
+    return *this;
+  }
+
+
+  Resources& operator -= (const Resources& that)
+  {
+    foreach (const Resource& resource, that.resources) {
+      *this -= resource;
+    }
+
+    return *this;
+  }
+
+
+  Resources operator + (const Resource& that) const
+  {
+    Resources result;
+
+    bool added = false;
+
+    foreach (const Resource& resource, resources) {
+      if (resource.name() == that.name() && resource.type() == that.type()) {
+        result.resources.Add()->MergeFrom(resource + that);
+        added = true;
+      } else {
+        result.resources.Add()->MergeFrom(resource);
+      }
+    }
+
+    if (!added) {
+      result.resources.Add()->MergeFrom(that);
+    }
+
+    return result;
+  }
+
+
+  Resources operator - (const Resource& that) const
+  {
+    Resources result;
+
+    foreach (const Resource& resource, resources) {
+      if (resource.name() == that.name() && resource.type() == that.type()) {
+        result.resources.Add()->MergeFrom(resource - that);
+      } else {
+        result.resources.Add()->MergeFrom(resource);
+      }
+    }
+
+    return result;
+  }
+
+  
+  Resources& operator += (const Resource& that)
+  {
+    *this = *this + that;
+    return *this;
+  }
+
+
+  Resources& operator -= (const Resource& that)
+  {
+    *this = *this - that;
+    return *this;
+  }
+
+
+  bool contains(const std::string& name, const Resource::Type& type) const
+  {
+    foreach (const Resource& resource, resources) {
+      if (name == resource.name() && type == resource.type()) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+
+  Resource get(const std::string& name, const Resource& resource) const
+  {
+    foreach (const Resource& resource, resources) {
+      if (resource.name() == name) {
+        return resource;
+      }
+    }
+
+    return resource;
+  }
+
+
+  Resource::Scalar getScalar(const std::string& name, const Resource::Scalar& scalar) const
+  {
+    foreach (const Resource& resource, resources) {
+      if (resource.name() == name && resource.type() == Resource::SCALAR) {
+        return resource.scalar();
+      }
+    }
+
+    return scalar;
+  }
+
+
+  Resource::Ranges getRanges(const std::string& name, const Resource::Ranges& ranges) const
+  {
+    foreach (const Resource& resource, resources) {
+      if (resource.name() == name && resource.type() == Resource::RANGES) {
+        return resource.ranges();
+      }
+    }
+
+    return ranges;
+  }
+
+
+  Resource::Set getSet(const std::string& name, const Resource::Set& set) const
+  {
+    foreach (const Resource& resource, resources) {
+      if (resource.name() == name && resource.type() == Resource::SET) {
+        return resource.set();
+      }
+    }
+
+    return set;
+  }
+
+
+  typedef google::protobuf::RepeatedPtrField<Resource>::iterator
+  iterator;
+
+
+  typedef google::protobuf::RepeatedPtrField<Resource>::const_iterator
+  const_iterator;
+
+
+  iterator begin() { return resources.begin(); }
+  iterator end() { return resources.end(); }
+
+
+  const_iterator begin() const { return resources.begin(); }
+  const_iterator end() const { return resources.end(); }
+
+
+  static Resource parse(const std::string& name, const std::string& value);
+  static Resources parse(const std::string& s);
+
+
+  static bool isValid(const Resource& resource)
+  {
+    if (!resource.has_name() ||
+        resource.name() == "" ||
+        !resource.has_type() ||
+        !Resource::Type_IsValid(resource.type())) {
+      return false;
+    }
+
+    if (resource.type() == Resource::SCALAR) {
+      return resource.has_scalar();
+    } else if (resource.type() == Resource::RANGES) {
+      return resource.has_ranges();
+    } else if (resource.type() == Resource::SET) {
+      return resource.has_ranges();
+    }
+
+    return false;
+  }
+
+
+  static bool isAllocatable(const Resource& resource)
+  {
+    if (isValid(resource)) {
+      if (resource.type() == Resource::SCALAR) {
+        if (resource.scalar().value() <= 0) {
+          return false;
+        }
+      } else if (resource.type() == Resource::RANGES) {
+        if (resource.ranges().range_size() == 0) {
+          return false;
+        } else {
+          for (int i = 0; i < resource.ranges().range_size(); i++) {
+            const Resource::Range& range = resource.ranges().range(i);
+
+            // Ensure the range make sense (isn't inverted).
+            if (range.begin() > range.end()) {
+              return false;
+            }
+
+            // Ensure ranges don't overlap (but not necessarily coalesced).
+            for (int j = j + 1; j < resource.ranges().range_size(); j++) {
+              if (range.begin() <= resource.ranges().range(j).begin() &&
+                  resource.ranges().range(j).begin() <= range.end()) {
+                return false;
+              }
+            }
+          }
+        }
+      } else if (resource.type() == Resource::SET) {
+        if (resource.set().item_size() == 0) {
+          return false;
+        } else {
+          for (int i = 0; i < resource.set().item_size(); i++) {
+            const std::string& item = resource.set().item(i);
+            
+            // Ensure no duplicates.
+            for (int j = i + 1; j < resource.set().item_size(); j++) {
+              if (item == resource.set().item(j)) {
+                return false;
+              }
+            }
+          }
+        }
+      }
+
+      return true;
+    }
+
+    return false;
+  }
+
+private:
+  google::protobuf::RepeatedPtrField<Resource> resources;
+};
+
+
+inline
+std::ostream& operator << (std::ostream& stream, const Resources& resources)
+{
+  Resources::const_iterator it = resources.begin();
+
+  while (it != resources.end()) {
+    stream << *it;
+    if (++it != resources.end()) {
+      stream << "; ";
+    }
+  }
+
+  return stream;
+}
+
+}} // namespace mesos { namespace internal {
+
+
+// namespace boost {
+
+// template <>
+// struct range_iterator<mesos::internal::Resources>
+// {
+//   typedef mesos::internal::Resources::iterator type;
+// };
+
+// template <>
+// struct range_const_iterator<mesos::internal::Resources>
+// {
+//   typedef mesos::internal::Resources::const_iterator type;
+// };
+
+// } // namespace boost {
+
+
+#endif // __RESOURCES_HPP__

Added: incubator/mesos/trunk/src/common/tokenize.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/tokenize.cpp?rev=1132231&view=auto
==============================================================================
--- incubator/mesos/trunk/src/common/tokenize.cpp (added)
+++ incubator/mesos/trunk/src/common/tokenize.cpp Sun Jun  5 09:04:47 2011
@@ -0,0 +1,33 @@
+#include "tokenize.hpp"
+
+using std::string;
+using std::vector;
+
+
+namespace mesos { namespace internal {
+
+vector<string> tokenize(const string& s, const string& delims)
+{
+  size_t offset = 0;
+  vector<string> tokens;
+
+  while (true) {
+    size_t i = s.find_first_not_of(delims, offset);
+    if (string::npos == i) {
+      offset = s.length();
+      return tokens;
+    }
+
+    size_t j = s.find_first_of(delims, i);
+    if (string::npos == j) {
+      tokens.push_back(s.substr(i));
+      offset = s.length();
+      continue;
+    }
+
+    tokens.push_back(s.substr(i, j - i));
+    offset = j;
+  }
+}
+
+}} /* namespace mesos { namespace internal { */

Added: incubator/mesos/trunk/src/common/tokenize.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/tokenize.hpp?rev=1132231&view=auto
==============================================================================
--- incubator/mesos/trunk/src/common/tokenize.hpp (added)
+++ incubator/mesos/trunk/src/common/tokenize.hpp Sun Jun  5 09:04:47 2011
@@ -0,0 +1,22 @@
+#ifndef __TOKENIZE_HPP__
+#define __TOKENIZE_HPP__
+
+#include <string>
+#include <vector>
+
+
+namespace mesos { namespace internal {
+
+/**
+ * Utility function to tokenize a string based on some delimiters.
+ */
+std::vector<std::string> tokenize(const std::string& s, const std::string& delims);
+
+}} /* namespace mesos { namespace internal { */
+
+#endif // __TOKENIZE_HPP__
+
+
+
+
+

Modified: incubator/mesos/trunk/src/common/type_utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/type_utils.hpp?rev=1132231&r1=1132230&r2=1132231&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/type_utils.hpp (original)
+++ incubator/mesos/trunk/src/common/type_utils.hpp Sun Jun  5 09:04:47 2011
@@ -13,7 +13,7 @@ const int32_t Megabyte = 1;
 const int32_t Gigabyte = 1024 * Megabyte;
 
 
-namespace mesos { 
+namespace mesos {
 
 inline std::ostream& operator << (std::ostream& stream,
                                   const FrameworkID& frameworkId)
@@ -127,51 +127,8 @@ inline size_t hash_value(const TaskID& t
 }
 
 
-
-inline Resources operator + (const Resources& left, const Resources& right)
-{
-  Resources result;
-  result.set_cpus(left.cpus() + right.cpus());
-  result.set_mem(left.mem() + right.mem());
-  return result;
-}
-
-  
-inline Resources operator - (const Resources& left, const Resources& right)
-{
-  Resources result;
-  result.set_cpus(left.cpus() - right.cpus());
-  result.set_mem(left.mem() - right.mem());
-  return result;
-}
-
-  
-inline Resources& operator += (Resources& left, const Resources& right)
-{
-  left.set_cpus(left.cpus() + right.cpus());
-  left.set_mem(left.mem() + right.mem());
-  return left;
-}
-
-inline Resources& operator -= (Resources& left, const Resources& right)
-{
-  left.set_cpus(left.cpus() - right.cpus());
-  left.set_mem(left.mem() - right.mem());
-  return left;
-}
-
-
-inline std::ostream& operator << (std::ostream& stream,
-                                  const Resources& resources)
-{
-  stream << "<" << resources.cpus() << " CPUs, " << resources.mem() << " MEM>";
-  return stream;
-}
-
-
 namespace internal {
 
-
 inline std::ostream& operator << (std::ostream& stream,
                                   const Task *task)
 {
@@ -179,7 +136,6 @@ inline std::ostream& operator << (std::o
   return stream;
 }
 
-
-}} /* namespace mesos::internal */
+}} // namespace mesos { namespace internal {
 
 #endif // __TYPE_UTILS_HPP__

Modified: incubator/mesos/trunk/src/examples/cpp_test_framework.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/cpp_test_framework.cpp?rev=1132231&r1=1132230&r2=1132231&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/cpp_test_framework.cpp (original)
+++ incubator/mesos/trunk/src/examples/cpp_test_framework.cpp Sun Jun  5 09:04:47 2011
@@ -55,14 +55,17 @@ public:
       // Lookup resources we care about.
       // TODO(benh): It would be nice to ultimately have some helper
       // functions for looking up resources.
-      int32_t cpus = 0;
-      int32_t mem = 0;
+      double cpus = 0;
+      double mem = 0;
 
-      for (int i = 0; i < offer.params().param_size(); i++) {
-        if (offer.params().param(i).key() == "cpus") {
-          cpus = lexical_cast<int32_t>(offer.params().param(i).value());
-        } else if (offer.params().param(i).key() == "mem") {
-          mem = lexical_cast<int32_t>(offer.params().param(i).value());
+      for (int i = 0; i < offer.resources_size(); i++) {
+        const Resource& resource = offer.resources(i);
+        if (resource.name() == "cpus" &&
+            resource.type() == Resource::SCALAR) {
+          cpus = resource.scalar().value();
+        } else if (resource.name() == "mem" &&
+                   resource.type() == Resource::SCALAR) {
+          mem = resource.scalar().value();
         }
       }
 
@@ -78,18 +81,19 @@ public:
         TaskDescription task;
         task.set_name("Task " + lexical_cast<string>(taskId));
         task.mutable_task_id()->set_value(lexical_cast<string>(taskId));
-        *task.mutable_slave_id() = offer.slave_id();
+        task.mutable_slave_id()->MergeFrom(offer.slave_id());
 
-        Params* params = task.mutable_params();
+        Resource* resource;
 
-        Param* param;
-        param = params->add_param();
-        param->set_key("cpus");
-        param->set_value(lexical_cast<string>(CPUS_PER_TASK));
-
-        param = params->add_param();
-        param->set_key("mem");
-        param->set_value(lexical_cast<string>(MEM_PER_TASK));
+        resource = task.add_resources();
+        resource->set_name("cpus");
+        resource->set_type(Resource::SCALAR);
+        resource->mutable_scalar()->set_value(CPUS_PER_TASK);
+
+        resource = task.add_resources();
+        resource->set_name("mem");
+        resource->set_type(Resource::SCALAR);
+        resource->mutable_scalar()->set_value(MEM_PER_TASK);
 
         tasks.push_back(task);
 
@@ -98,9 +102,7 @@ public:
       }
     }
 
-    map<string, string> params;
-    params["timeout"] = "-1";
-    driver->replyToOffer(offerId, tasks, params);
+    driver->replyToOffer(offerId, tasks);
   }
 
   virtual void offerRescinded(SchedulerDriver* driver,

Modified: incubator/mesos/trunk/src/examples/java/TestFramework.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/java/TestFramework.java?rev=1132231&r1=1132230&r2=1132231&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/java/TestFramework.java (original)
+++ incubator/mesos/trunk/src/examples/java/TestFramework.java Sun Jun  5 09:04:47 2011
@@ -61,13 +61,21 @@ public class TestFramework {
             .setName("task " + taskId.getValue())
             .setTaskId(taskId)
             .setSlaveId(offer.getSlaveId())
-            .setParams(Params.newBuilder()
-                       .addParam(Param.newBuilder()
-                                 .setKey("cpus")
-                                 .setValue("1").build())
-                       .addParam(Param.newBuilder()
-                                 .setKey("mem")
-                                 .setValue("128").build()).build()).build();
+            .addResources(Resource.newBuilder()
+                          .setName("cpus")
+                          .setType(Resource.Type.SCALAR)
+                          .setScalar(Resource.Scalar.newBuilder()
+                                     .setValue(1)
+                                     .build())
+                          .build())
+            .addResources(Resource.newBuilder()
+                          .setName("mem")
+                          .setType(Resource.Type.SCALAR)
+                          .setScalar(Resource.Scalar.newBuilder()
+                                     .setValue(128)
+                                     .build())
+                          .build())
+            .build();
 
           tasks.add(task);
         }

Modified: incubator/mesos/trunk/src/examples/memhog.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/memhog.cpp?rev=1132231&r1=1132230&r2=1132231&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/memhog.cpp (original)
+++ incubator/mesos/trunk/src/examples/memhog.cpp Sun Jun  5 09:04:47 2011
@@ -54,14 +54,17 @@ public:
       // Lookup resources we care about.
       // TODO(benh): It would be nice to ultimately have some helper
       // functions for looking up resources.
-      int32_t cpus = 0;
-      int32_t mem = 0;
+      double cpus = 0;
+      double mem = 0;
 
-      for (int i = 0; i < offer.params().param_size(); i++) {
-        if (offer.params().param(i).key() == "cpus") {
-          cpus = lexical_cast<int32_t>(offer.params().param(i).value());
-        } else if (offer.params().param(i).key() == "mem") {
-          mem = lexical_cast<int32_t>(offer.params().param(i).value());
+      for (int i = 0; i < offer.resources_size(); i++) {
+        const Resource& resource = offer.resources(i);
+        if (resource.name() == "cpus" &&
+            resource.type() == Resource::SCALAR) {
+          cpus = resource.scalar().value();
+        } else if (resource.name() == "mem" &&
+                   resource.type() == Resource::SCALAR) {
+          mem = resource.scalar().value();
         }
       }
 
@@ -75,18 +78,19 @@ public:
         TaskDescription task;
         task.set_name("Task " + lexical_cast<string>(taskId));
         task.mutable_task_id()->set_value(lexical_cast<string>(taskId));
-        *task.mutable_slave_id() = offer.slave_id();
+        task.mutable_slave_id()->MergeFrom(offer.slave_id());
 
-        Params* params = task.mutable_params();
+        Resource* resource;
 
-        Param* param;
-        param = params->add_param();
-        param->set_key("cpus");
-        param->set_value("1");
-
-        param = params->add_param();
-        param->set_key("mem");
-        param->set_value(lexical_cast<string>(memToRequest));
+        resource = task.add_resources();
+        resource->set_name("cpus");
+        resource->set_type(Resource::SCALAR);
+        resource->mutable_scalar()->set_value(1);
+
+        resource = task.add_resources();
+        resource->set_name("mem");
+        resource->set_type(Resource::SCALAR);
+        resource->mutable_scalar()->set_value(memToRequest);
 
         ostringstream data;
         data << memToHog << " " << taskLen << " " << threadsPerTask;
@@ -96,9 +100,7 @@ public:
       }
     }
 
-    map<string, string> params;
-    params["timeout"] = "-1";
-    driver->replyToOffer(offerId, tasks, params);
+    driver->replyToOffer(offerId, tasks);
   }
 
   virtual void offerRescinded(SchedulerDriver* driver,

Modified: incubator/mesos/trunk/src/examples/scheduled_memhog.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/scheduled_memhog.cpp?rev=1132231&r1=1132230&r2=1132231&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/scheduled_memhog.cpp (original)
+++ incubator/mesos/trunk/src/examples/scheduled_memhog.cpp Sun Jun  5 09:04:47 2011
@@ -103,14 +103,17 @@ public:
       // Lookup resources we care about.
       // TODO(benh): It would be nice to ultimately have some helper
       // functions for looking up resources.
-      int32_t cpus = 0;
-      int32_t mem = 0;
+      double cpus = 0;
+      double mem = 0;
 
-      for (int i = 0; i < offer.params().param_size(); i++) {
-        if (offer.params().param(i).key() == "cpus") {
-          cpus = lexical_cast<int32_t>(offer.params().param(i).value());
-        } else if (offer.params().param(i).key() == "mem") {
-          mem = lexical_cast<int32_t>(offer.params().param(i).value());
+      for (int i = 0; i < offer.resources_size(); i++) {
+        const Resource& resource = offer.resources(i);
+        if (resource.name() == "cpus" &&
+            resource.type() == Resource::SCALAR) {
+          cpus = resource.scalar().value();
+        } else if (resource.name() == "mem" &&
+                   resource.type() == Resource::SCALAR) {
+          mem = resource.scalar().value();
         }
       }
 
@@ -128,18 +131,19 @@ public:
         TaskDescription task;
         task.set_name("Task " + lexical_cast<string>(taskId));
         task.mutable_task_id()->set_value(lexical_cast<string>(taskId));
-        *task.mutable_slave_id() = offer.slave_id();
+        task.mutable_slave_id()->MergeFrom(offer.slave_id());
 
-        Params* params = task.mutable_params();
+        Resource* resource;
 
-        Param* param;
-        param = params->add_param();
-        param->set_key("cpus");
-        param->set_value("1");
-
-        param = params->add_param();
-        param->set_key("mem");
-        param->set_value(lexical_cast<string>(tasks[taskId].memToRequest));
+        resource = task.add_resources();
+        resource->set_name("cpus");
+        resource->set_type(Resource::SCALAR);
+        resource->mutable_scalar()->set_value(1);
+
+        resource = task.add_resources();
+        resource->set_name("mem");
+        resource->set_type(Resource::SCALAR);
+        resource->mutable_scalar()->set_value(tasks[taskId].memToRequest);
 
         ostringstream data;
         data << tasks[taskId].memToHog << " " << tasks[taskId].duration

Modified: incubator/mesos/trunk/src/exec/exec.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/exec/exec.cpp?rev=1132231&r1=1132230&r2=1132231&view=diff
==============================================================================
--- incubator/mesos/trunk/src/exec/exec.cpp (original)
+++ incubator/mesos/trunk/src/exec/exec.cpp Sun Jun  5 09:04:47 2011
@@ -50,7 +50,7 @@ protected:
 
     // Register with slave.
     Message<E2S_REGISTER_EXECUTOR> out;
-    *out.mutable_framework_id() = frameworkId;
+    out.mutable_framework_id()->MergeFrom(frameworkId);
     send(slave, out);
 
     while(true) {
@@ -79,10 +79,10 @@ protected:
           const TaskDescription& task = msg.task();
 
           Message<E2S_STATUS_UPDATE> out;
-          *out.mutable_framework_id() = frameworkId;
+          out.mutable_framework_id()->MergeFrom(frameworkId);
           TaskStatus* status = out.mutable_status();
-          *status->mutable_task_id() = task.task_id();
-          *status->mutable_slave_id() = slaveId;
+          status->mutable_task_id()->MergeFrom(task.task_id());
+          status->mutable_slave_id()->MergeFrom(slaveId);
           status->set_state(TASK_RUNNING);
           send(slave, out);
 
@@ -293,9 +293,10 @@ int MesosExecutorDriver::sendStatusUpdat
     return -1;
   }
 
+  // TODO(benh): Do a dispatch to Executor first?
   Message<E2S_STATUS_UPDATE> out;
-  *out.mutable_framework_id() = process->frameworkId;
-  *out.mutable_status() = status;
+  out.mutable_framework_id()->MergeFrom(process->frameworkId);
+  out.mutable_status()->MergeFrom(status);
   process->send(process->slave, out);
 
   return 0;
@@ -316,9 +317,10 @@ int MesosExecutorDriver::sendFrameworkMe
     return -1;
   }
 
+  // TODO(benh): Do a dispatch to Executor first?
   Message<E2S_FRAMEWORK_MESSAGE> out;
-  *out.mutable_framework_id() = process->frameworkId;
-  *out.mutable_message() = message;
+  out.mutable_framework_id()->MergeFrom(process->frameworkId);
+  out.mutable_message()->MergeFrom(message);
   process->send(process->slave, out);
 
   return 0;

Modified: incubator/mesos/trunk/src/local/local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local/local.cpp?rev=1132231&r1=1132230&r2=1132231&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local/local.cpp (original)
+++ incubator/mesos/trunk/src/local/local.cpp Sun Jun  5 09:04:47 2011
@@ -1,6 +1,7 @@
 #include <pthread.h>
 
 #include <map>
+#include <sstream>
 #include <vector>
 
 #include "local.hpp"
@@ -23,6 +24,7 @@ using mesos::internal::slave::IsolationM
 using mesos::internal::slave::ProcessBasedIsolationModule;
 
 using std::map;
+using std::stringstream;
 using std::vector;
 
 
@@ -45,12 +47,12 @@ static map<IsolationModule*, Slave*> sla
 static MasterDetector *detector = NULL;
 
 
-void registerOptions(Configurator* conf)
+void registerOptions(Configurator* configurator)
 {
-  conf->addOption<int>("slaves", 's', "Number of slaves", 1);
-  Logging::registerOptions(conf);
-  Master::registerOptions(conf);
-  Slave::registerOptions(conf);
+  configurator->addOption<int>("slaves", 's', "Number of slaves", 1);
+  Logging::registerOptions(configurator);
+  Master::registerOptions(configurator);
+  Slave::registerOptions(configurator);
 }
 
 
@@ -62,9 +64,12 @@ PID launch(int numSlaves,
 {
   Configuration conf;
   conf.set("slaves", numSlaves);
-  conf.set("cpus", cpus);
-  conf.set("mem", mem);
   conf.set("quiet", quiet);
+
+  stringstream out;
+  out << "cpus:" << cpus << ";" << "mem:" << mem;
+  conf.set("resources", out.str());
+
   return launch(conf, initLogging);
 }
 

Modified: incubator/mesos/trunk/src/local/local.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local/local.hpp?rev=1132231&r1=1132230&r2=1132231&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local/local.hpp (original)
+++ incubator/mesos/trunk/src/local/local.hpp Sun Jun  5 09:04:47 2011
@@ -10,7 +10,7 @@ namespace mesos { namespace internal { n
 
 // Register the options recognized by the local runner (a combination of
 // master and slave options) with a configurator.
-void registerOptions(Configurator* conf);
+void registerOptions(Configurator* configurator);
 
 // Launch a local cluster with a given number of slaves and given numbers
 // of CPUs and memory per slave. Additionally one can also toggle whether

Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1132231&r1=1132230&r2=1132231&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Sun Jun  5 09:04:47 2011
@@ -171,37 +171,76 @@ state::MasterState * Master::getState()
     new state::MasterState(BUILD_DATE, BUILD_USER, self());
 
   foreachpair (_, Slave *s, slaves) {
+    Resources resources(s->info.resources());
+    Resource::Scalar cpus;
+    Resource::Scalar mem;
+    cpus.set_value(-1);
+    mem.set_value(-1);
+    cpus = resources.getScalar("cpus", cpus);
+    mem = resources.getScalar("mem", mem);
+
     state::Slave *slave =
       new state::Slave(s->slaveId.value(), s->info.hostname(),
-                       s->info.public_hostname(), s->info.resources().cpus(),
-                       s->info.resources().mem(), s->connectTime);
+                       s->info.public_hostname(), cpus.value(),
+                       mem.value(), s->connectTime);
+
     state->slaves.push_back(slave);
   }
 
   foreachpair (_, Framework *f, frameworks) {
+    Resources resources(f->resources);
+    Resource::Scalar cpus;
+    Resource::Scalar mem;
+    cpus.set_value(-1);
+    mem.set_value(-1);
+    cpus = resources.getScalar("cpus", cpus);
+    mem = resources.getScalar("mem", mem);
+
     state::Framework *framework =
       new state::Framework(f->frameworkId.value(), f->info.user(),
                            f->info.name(), f->info.executor().uri(),
-                           f->resources.cpus(), f->resources.mem(),
-                           f->connectTime);
+                           cpus.value(), mem.value(), f->connectTime);
+
     state->frameworks.push_back(framework);
+
     foreachpair (_, Task *t, f->tasks) {
+      Resources resources(t->resources());
+      Resource::Scalar cpus;
+      Resource::Scalar mem;
+      cpus.set_value(-1);
+      mem.set_value(-1);
+      cpus = resources.getScalar("cpus", cpus);
+      mem = resources.getScalar("mem", mem);
+
       state::Task *task =
         new state::Task(t->task_id().value(), t->name(),
                         t->framework_id().value(), t->slave_id().value(),
                         TaskState_descriptor()->FindValueByNumber(t->state())->name(),
-                        t->resources().cpus(), t->resources().mem());
+                        cpus.value(), mem.value());
+
       framework->tasks.push_back(task);
     }
+
     foreach (SlotOffer *o, f->slotOffers) {
       state::SlotOffer *offer =
         new state::SlotOffer(o->offerId.value(), o->frameworkId.value());
-      foreach (SlaveResources &r, o->resources) {
-        state::SlaveResources *resources =
+
+      foreach (const SlaveResources &r, o->resources) {
+        Resources resources(r.resources);
+        Resource::Scalar cpus;
+        Resource::Scalar mem;
+        cpus.set_value(-1);
+        mem.set_value(-1);
+        cpus = resources.getScalar("cpus", cpus);
+        mem = resources.getScalar("mem", mem);
+
+        state::SlaveResources *sr =
           new state::SlaveResources(r.slave->slaveId.value(),
-                                    r.resources.cpus(), r.resources.mem());
-        offer->resources.push_back(resources);
+                                    cpus.value(), mem.value());
+
+        offer->resources.push_back(sr);
       }
+
       framework->offers.push_back(offer);
     }
   }
@@ -445,7 +484,7 @@ void Master::operator () ()
       // potential scalability issue ...
       foreachpair (_, Slave *slave, slaves) {
         Message<M2S_UPDATE_FRAMEWORK> out;
-        *out.mutable_framework_id() = msg.framework_id();
+        out.mutable_framework_id()->MergeFrom(msg.framework_id());
         out.set_pid(from());
         send(slave->pid, out);
       }
@@ -488,10 +527,10 @@ void Master::operator () ()
           // the slave was lost; immediately report any tasks in it as lost
           foreach (const TaskDescription &task, tasks) {
             Message<M2F_STATUS_UPDATE> out;
-            *out.mutable_framework_id() = msg.framework_id();
+            out.mutable_framework_id()->MergeFrom(msg.framework_id());
             TaskStatus *status = out.mutable_status();
-            *status->mutable_task_id() = task.task_id();
-            *status->mutable_slave_id() = task.slave_id();
+            status->mutable_task_id()->MergeFrom(task.task_id());
+            status->mutable_slave_id()->MergeFrom(task.slave_id());
             status->set_state(TASK_LOST);
             send(framework->pid, out);
           }
@@ -528,10 +567,10 @@ void Master::operator () ()
 		     << " of framework " << msg.framework_id()
 		     << " because it cannot be found";
           Message<M2F_STATUS_UPDATE> out;
-          *out.mutable_framework_id() = task->framework_id();
+          out.mutable_framework_id()->MergeFrom(task->framework_id());
           TaskStatus *status = out.mutable_status();
-          *status->mutable_task_id() = task->task_id();
-          *status->mutable_slave_id() = task->slave_id();
+          status->mutable_task_id()->MergeFrom(task->task_id());
+          status->mutable_slave_id()->MergeFrom(task->slave_id());
           status->set_state(TASK_LOST);
           send(framework->pid, out);
         }
@@ -547,8 +586,8 @@ void Master::operator () ()
         Slave *slave = lookupSlave(msg.message().slave_id());
         if (slave != NULL) {
           Message<M2S_FRAMEWORK_MESSAGE> out;
-          *out.mutable_framework_id() = msg.framework_id();
-          *out.mutable_message() = msg.message();
+          out.mutable_framework_id()->MergeFrom(msg.framework_id());
+          out.mutable_message()->MergeFrom(msg.message());
           send(slave->pid, out);
         }
       }
@@ -560,7 +599,8 @@ void Master::operator () ()
 
       Slave* slave = new Slave(msg.slave(), newSlaveId(), from(), elapsed());
 
-      LOG(INFO) << "Registering " << slave->slaveId << " at " << slave->pid;
+      LOG(INFO) << "Registering slave " << slave->slaveId
+                << " at " << slave->pid;
 
       slaves[slave->slaveId] = slave;
       pidToSlaveId[slave->pid] = slave->slaveId;
@@ -569,7 +609,7 @@ void Master::operator () ()
       allocator->slaveAdded(slave);
 
       Message<M2S_REGISTER_REPLY> out;
-      *out.mutable_slave_id() = slave->slaveId;
+      out.mutable_slave_id()->MergeFrom(slave->slaveId);
       out.set_heartbeat_interval(HEARTBEAT_INTERVAL);
       send(slave->pid, out);
       break;
@@ -604,7 +644,7 @@ void Master::operator () ()
       link(slave->pid);
 
       Message<M2S_REREGISTER_REPLY> out;
-      *out.mutable_slave_id() = slave->slaveId;
+      out.mutable_slave_id()->MergeFrom(slave->slaveId);
       out.set_heartbeat_interval(HEARTBEAT_INTERVAL);
       send(slave->pid, out);
 
@@ -617,7 +657,7 @@ void Master::operator () ()
         if (framework != NULL) {
           framework->addTask(task);
           Message<M2S_UPDATE_FRAMEWORK> out;
-          *out.mutable_framework_id() = framework->frameworkId;
+          out.mutable_framework_id()->MergeFrom(framework->frameworkId);
           out.set_pid(framework->pid);
           send(slave->pid, out);
         }
@@ -658,8 +698,8 @@ void Master::operator () ()
         if (framework != NULL) {
 	  // Pass on the (transformed) status update to the framework.
           Message<M2F_STATUS_UPDATE> out;
-          *out.mutable_framework_id() = msg.framework_id();
-          *out.mutable_status() = status;
+          out.mutable_framework_id()->MergeFrom(msg.framework_id());
+          out.mutable_status()->MergeFrom(status);
           forward(framework->pid, out);
 
           // No need to reprocess this message if already seen.
@@ -703,8 +743,8 @@ void Master::operator () ()
         Framework *framework = lookupFramework(msg.framework_id());
         if (framework != NULL) {
           Message<M2S_FRAMEWORK_MESSAGE> out;
-          *out.mutable_framework_id() = msg.framework_id();
-          *out.mutable_message() = msg.message();
+          out.mutable_framework_id()->MergeFrom(msg.framework_id());
+          out.mutable_message()->MergeFrom(msg.message());
           send(framework->pid, out);
         }
       }
@@ -731,10 +771,10 @@ void Master::operator () ()
           // Tell the framework they have been lost and remove them.
           foreach (Task* task, tasks) {
             Message<M2F_STATUS_UPDATE> out;
-            *out.mutable_framework_id() = task->framework_id();
+            out.mutable_framework_id()->MergeFrom(task->framework_id());
             TaskStatus *status = out.mutable_status();
-            *status->mutable_task_id() = task->task_id();
-            *status->mutable_slave_id() = task->slave_id();
+            status->mutable_task_id()->MergeFrom(task->task_id());
+            status->mutable_slave_id()->MergeFrom(task->slave_id());
             status->set_state(TASK_LOST);
             send(framework->pid, out);
 
@@ -807,6 +847,12 @@ void Master::operator () ()
 	  // Stop sending offers here for now.
 	  framework->active = false;
 
+          // Remove the framework's slot offers.
+          unordered_set<SlotOffer *> slotOffersCopy = framework->slotOffers;
+          foreach (SlotOffer* offer, slotOffersCopy) {
+            removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
+          }
+
    	  framework->failoverTimer = new FrameworkFailoverTimer(self(), frameworkId);
    	  link(spawn(framework->failoverTimer));
 // 	  removeFramework(framework);
@@ -875,25 +921,15 @@ OfferID Master::makeOffer(Framework *fra
   LOG(INFO) << "Sending " << offer << " to " << framework;
 
   Message<M2F_RESOURCE_OFFER> out;
-  *out.mutable_offer_id() = offerId;
+  out.mutable_offer_id()->MergeFrom(offerId);
 
   foreach (const SlaveResources& r, resources) {
     SlaveOffer* offer = out.add_offer();
-    *offer->mutable_slave_id() = r.slave->slaveId;
+    offer->mutable_slave_id()->MergeFrom(r.slave->slaveId);
     offer->set_hostname(r.slave->info.hostname());
+    offer->mutable_resources()->MergeFrom(r.resources);
 
-    Params* params = offer->mutable_params();
-
-    Param* param = params->add_param();
-    param->set_key("cpus");
-    param->set_value(lexical_cast<string>(r.resources.cpus()));
-
-    param = params->add_param();
-    param->set_key("mem");
-    param->set_value(lexical_cast<string>(r.resources.mem()));
-
-    string* pid = out.add_pid();
-    *pid = r.slave->pid;
+    out.add_pid(r.slave->pid);
   }
 
   send(framework->pid, out);
@@ -904,47 +940,25 @@ OfferID Master::makeOffer(Framework *fra
 
 // Process a resource offer reply (for a non-cancelled offer) by launching
 // the desired tasks (if the offer contains a valid set of tasks) and
-// reporting any unused resources to the allocator
-void Master::processOfferReply(SlotOffer *offer,
+// reporting any unused resources to the allocator.
+void Master::processOfferReply(SlotOffer* offer,
                                const vector<TaskDescription>& tasks,
                                const Params& params)
 {
   LOG(INFO) << "Received reply for " << offer;
 
-  Framework *framework = lookupFramework(offer->frameworkId);
+  Framework* framework = lookupFramework(offer->frameworkId);
   CHECK(framework != NULL);
 
   // Count resources in the offer.
-  unordered_map<Slave *, Resources> resourcesOffered;
-  foreach (const SlaveResources &r, offer->resources) {
+  unordered_map<Slave*, Resources> resourcesOffered;
+  foreach (const SlaveResources& r, offer->resources) {
     resourcesOffered[r.slave] = r.resources;
   }
 
   // Count used resources and check that its tasks are valid.
-  unordered_map<Slave *, Resources> resourcesUsed;
-  foreach (const TaskDescription &task, tasks) {
-    // Check whether this task size is valid.
-    Resources used;
-    used.set_cpus(-1);
-    used.set_mem(-1);
-
-    for (int i = 0; i < task.params().param_size(); i++) {
-      if (task.params().param(i).key() == "cpus") {
-        int32_t cpus = lexical_cast<int32_t>(task.params().param(i).value());
-        used.set_cpus(cpus);
-      } else if (task.params().param(i).key() == "mem") {
-        int32_t mem = lexical_cast<int32_t>(task.params().param(i).value());
-        used.set_mem(mem);
-      }
-    }
-
-    if (used.cpus() < MIN_CPUS || used.mem() < MIN_MEM || 
-        used.cpus() > MAX_CPUS || used.mem() > MAX_MEM) {
-      terminateFramework(framework, 0, "Invalid task size: " +
-                         lexical_cast<string>(used));
-      return;
-    }
-
+  unordered_map<Slave*, Resources> resourcesUsed;
+  foreach (const TaskDescription& task, tasks) {
     // Check whether the task is on a valid slave.
     Slave* slave = lookupSlave(task.slave_id());
     if (slave == NULL || resourcesOffered.count(slave) == 0) {
@@ -952,13 +966,30 @@ void Master::processOfferReply(SlotOffer
       return;
     }
 
-    resourcesUsed[slave] += used;
+    // Check whether or not the resources for the task are valid.
+    // TODO(benh): In the future maybe we can also augment the
+    // protobuf to deal with fragmentation purposes by providing some
+    // sort of minimum amount of resources required per task.
+
+    if (task.resources().size() == 0) {
+      terminateFramework(framework, 0, "Invalid resources for task");
+      return;
+    }
+
+    foreach (const Resource& resource, task.resources()) {
+      if (!Resources::isAllocatable(resource)) {
+        // TODO(benh): Also send back the invalid resources as a string?
+        terminateFramework(framework, 0, "Invalid resources for task");
+        return;
+      }
+    }
+
+    resourcesUsed[slave] += task.resources();
   }
 
   // Check that the total accepted on each slave isn't more than offered.
-  foreachpair (Slave* slave, Resources& used, resourcesUsed) {
-    if (used.cpus() > resourcesOffered[slave].cpus() ||
-        used.mem() > resourcesOffered[slave].mem()) {
+  foreachpair (Slave* slave, const Resources& used, resourcesUsed) {
+    if (!(used <= resourcesOffered[slave])) {
       terminateFramework(framework, 0, "Too many resources accepted");
       return;
     }
@@ -966,7 +997,7 @@ void Master::processOfferReply(SlotOffer
 
   // Check that there are no duplicate task IDs.
   unordered_set<TaskID> idsInResponse;
-  foreach (const TaskDescription &task, tasks) {
+  foreach (const TaskDescription& task, tasks) {
     if (framework->tasks.count(task.task_id()) > 0 ||
         idsInResponse.count(task.task_id()) > 0) {
       terminateFramework(framework, 0, "Duplicate task ID: " +
@@ -977,11 +1008,12 @@ void Master::processOfferReply(SlotOffer
   }
 
   // Launch the tasks in the response.
-  foreach (const TaskDescription &task, tasks) {
+  foreach (const TaskDescription& task, tasks) {
     launchTask(framework, task);
   }
 
-  // Get out the timeout for left over resources (if exists).
+  // Get out the timeout for left over resources (if exists), and use
+  // that to calculate the expiry timeout.
   int timeout = DEFAULT_REFUSAL_TIMEOUT;
 
   for (int i = 0; i < params.param_size(); i++) {
@@ -993,46 +1025,36 @@ void Master::processOfferReply(SlotOffer
 
   double expiry = (timeout == -1) ? 0 : elapsed() + timeout;  
 
-  // If there are unused resources on some slaves, add filters for them.
+  // Now check for unused resources on slaves and add filters for them.
   vector<SlaveResources> resourcesUnused;
 
   foreachpair (Slave* slave, const Resources& offered, resourcesOffered) {
     Resources used = resourcesUsed[slave];
     Resources unused = offered - used;
 
-    if (unused.cpus() > 0 || unused.mem() > 0) {
-      resourcesUnused.push_back(SlaveResources(slave, unused));
+    CHECK(used == used.allocatable());
+
+    Resources allocatable = unused.allocatable();
+
+    if (allocatable.size() > 0) {
+      resourcesUnused.push_back(SlaveResources(slave, allocatable));
     }
 
     // Only add a filter on a slave if none of the resources are used.
-    if (timeout != 0 && used.cpus() == 0 && used.mem() == 0) {
+    if (timeout != 0 && used.size() == 0) {
       LOG(INFO) << "Adding filter on " << slave << " to " << framework
-                << " for  " << timeout << " seconds";
+                << " for " << timeout << " seconds";
       framework->slaveFilter[slave] = expiry;
     }
   }
   
-  // Return the resources left to the allocator
+  // Return the resources left to the allocator.
   removeSlotOffer(offer, ORR_FRAMEWORK_REPLIED, resourcesUnused);
 }
 
 
 void Master::launchTask(Framework* framework, const TaskDescription& task)
 {
-  Resources resources;
-  resources.set_cpus(-1);
-  resources.set_mem(-1);
-
-  for (int i = 0; i < task.params().param_size(); i++) {
-    if (task.params().param(i).key() == "cpus") {
-      int32_t cpus = lexical_cast<int32_t>(task.params().param(i).value());
-      resources.set_cpus(cpus);
-    } else if (task.params().param(i).key() == "mem") {
-      int32_t mem = lexical_cast<int32_t>(task.params().param(i).value());
-      resources.set_mem(mem);
-    }
-  }
-
   // The invariant right now is that launchTask is called only for
   // TaskDescriptions where the slave is still valid (see the code
   // above in processOfferReply).
@@ -1041,10 +1063,10 @@ void Master::launchTask(Framework* frame
 
   Task *t = new Task();
   t->set_name(task.name());
-  *t->mutable_task_id() = task.task_id();
-  *t->mutable_framework_id() = framework->frameworkId;
-  *t->mutable_slave_id() = task.slave_id();
-  *t->mutable_resources() = resources;
+  t->mutable_task_id()->MergeFrom(task.task_id());
+  t->mutable_framework_id()->MergeFrom(framework->frameworkId);
+  t->mutable_slave_id()->MergeFrom(task.slave_id());
+  t->mutable_resources()->MergeFrom(task.resources());
   t->set_state(TASK_STARTING);
 
   framework->addTask(t);
@@ -1055,11 +1077,10 @@ void Master::launchTask(Framework* frame
   LOG(INFO) << "Launching " << t << " on " << slave;
 
   Message<M2S_RUN_TASK> out;
-  *out.mutable_framework() = framework->info;
-  *out.mutable_framework_id() = framework->frameworkId;
+  out.mutable_framework()->MergeFrom(framework->info);
+  out.mutable_framework_id()->MergeFrom(framework->frameworkId);
   out.set_pid(framework->pid);
-  *out.mutable_task() = task;
-  *out.mutable_resources() = resources;
+  out.mutable_task()->MergeFrom(task);
   send(slave->pid, out);
 }
 
@@ -1080,8 +1101,8 @@ void Master::killTask(Task *task)
   CHECK(slave != NULL);
 
   Message<M2S_KILL_TASK> out;
-  *out.mutable_framework_id() = framework->frameworkId;
-  *out.mutable_task_id() = task->task_id();
+  out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+  out.mutable_task_id()->MergeFrom(task->task_id());
   send(slave->pid, out);
 }
 
@@ -1124,7 +1145,7 @@ void Master::removeSlotOffer(SlotOffer *
   // removing the offer is that the framework replied to it
   if (reason != ORR_FRAMEWORK_REPLIED) {
     Message<M2F_RESCIND_OFFER> out;
-    *out.mutable_offer_id() = offer->offerId;
+    out.mutable_offer_id()->MergeFrom(offer->offerId);
     send(framework->pid, out);
   }
   
@@ -1146,7 +1167,7 @@ void Master::addFramework(Framework *fra
   link(framework->pid);
 
   Message<M2F_REGISTER_REPLY> out;
-  *out.mutable_framework_id() = framework->frameworkId;
+  out.mutable_framework_id()->MergeFrom(framework->frameworkId);
   send(framework->pid, out);
 
   allocator->frameworkAdded(framework);
@@ -1159,7 +1180,7 @@ void Master::failoverFramework(Framework
 {
   const PID& oldPid = framework->pid;
 
-  // Remove the framework's slot offers.
+  // Remove the framework's slot offers (if they weren't removed before)..
   // TODO(benh): Consider just reoffering these to the new framework.
   unordered_set<SlotOffer *> slotOffersCopy = framework->slotOffers;
   foreach (SlotOffer* offer, slotOffersCopy) {
@@ -1190,7 +1211,7 @@ void Master::failoverFramework(Framework
   framework->active = true;
 
   Message<M2F_REGISTER_REPLY> reply;
-  *reply.mutable_framework_id() = framework->frameworkId;
+  reply.mutable_framework_id()->MergeFrom(framework->frameworkId);
   send(newPid, reply);
 }
 
@@ -1205,7 +1226,7 @@ void Master::removeFramework(Framework *
   // Tell slaves to kill the framework
   foreachpair (_, Slave *slave, slaves) {
     Message<M2S_KILL_FRAMEWORK> out;
-    *out.mutable_framework_id() = framework->frameworkId;
+    out.mutable_framework_id()->MergeFrom(framework->frameworkId);
     send(slave->pid, out);
   }
 
@@ -1217,7 +1238,7 @@ void Master::removeFramework(Framework *
     removeTask(task, TRR_FRAMEWORK_LOST);
   }
   
-  // Remove the framework's slot offers
+  // Remove the framework's slot offers (if they weren't removed before).
   unordered_set<SlotOffer *> slotOffersCopy = framework->slotOffers;
   foreach (SlotOffer* offer, slotOffersCopy) {
     removeSlotOffer(offer, ORR_FRAMEWORK_LOST, offer->resources);
@@ -1256,10 +1277,10 @@ void Master::removeSlave(Slave *slave)
     // S2M_REREGISTER_SLAVE.
     if (framework != NULL) {
       Message<M2F_STATUS_UPDATE> out;
-      *out.mutable_framework_id() = task->framework_id();
+      out.mutable_framework_id()->MergeFrom(task->framework_id());
       TaskStatus *status = out.mutable_status();
-      *status->mutable_task_id() = task->task_id();
-      *status->mutable_slave_id() = task->slave_id();
+      status->mutable_task_id()->MergeFrom(task->task_id());
+      status->mutable_slave_id()->MergeFrom(task->slave_id());
       status->set_state(TASK_LOST);
       send(framework->pid, out);
     }
@@ -1287,7 +1308,7 @@ void Master::removeSlave(Slave *slave)
   // previously finished tasks whose output was on the lost slave)
   foreachpair (_, Framework *framework, frameworks) {
     Message<M2F_LOST_SLAVE> out;
-    *out.mutable_slave_id() = slave->slaveId;
+    out.mutable_slave_id()->MergeFrom(slave->slaveId);
     send(framework->pid, out);
   }
 

Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1132231&r1=1132230&r2=1132231&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Sun Jun  5 09:04:47 2011
@@ -24,6 +24,7 @@
 
 #include "common/fatal.hpp"
 #include "common/foreach.hpp"
+#include "common/resources.hpp"
 #include "common/type_utils.hpp"
 
 #include "configurator/configurator.hpp"
@@ -139,6 +140,7 @@ struct SlotOffer
     : offerId(_offerId), frameworkId(_frameworkId), resources(_resources) {}
 };
 
+
 // An connected framework.
 struct Framework
 {
@@ -149,8 +151,8 @@ struct Framework
   bool active; // Turns false when framework is being removed
   double connectTime;
 
-  unordered_map<TaskID, Task *> tasks;
-  unordered_set<SlotOffer *> slotOffers; // Active offers for framework.
+  unordered_map<TaskID, Task*> tasks;
+  unordered_set<SlotOffer*> slotOffers; // Active offers for framework.
 
   Resources resources; // Total resources owned by framework (tasks + offers)
   
@@ -189,14 +191,18 @@ struct Framework
   {
     CHECK(tasks.count(task->task_id()) == 0);
     tasks[task->task_id()] = task;
-    this->resources += task->resources();
+    for (int i = 0; i < task->resources_size(); i++) {
+      resources += task->resources(i);
+    }
   }
   
   void removeTask(const TaskID& taskId)
   {
     CHECK(tasks.count(taskId) > 0);
     Task* task = tasks[taskId];
-    this->resources -= task->resources();
+    for (int i = 0; i < task->resources_size(); i++) {
+      resources -= task->resources(i);
+    }
     tasks.erase(taskId);
   }
   
@@ -204,16 +210,18 @@ struct Framework
   {
     CHECK(slotOffers.count(offer) == 0);
     slotOffers.insert(offer);
-    foreach (SlaveResources &r, offer->resources)
-      this->resources += r.resources;
+    foreach (const SlaveResources& sr, offer->resources) {
+      resources += sr.resources;
+    }
   }
 
   void removeOffer(SlotOffer *offer)
   {
     CHECK(slotOffers.find(offer) != slotOffers.end());
     slotOffers.erase(offer);
-    foreach (SlaveResources &r, offer->resources)
-      this->resources -= r.resources;
+    foreach (const SlaveResources& sr, offer->resources) {
+      resources -= sr.resources;
+    }
   }
   
   bool filters(Slave *slave, Resources resources)
@@ -271,19 +279,27 @@ struct Slave
   {
     CHECK(tasks.count(make_pair(task->framework_id(), task->task_id())) == 0);
     tasks[make_pair(task->framework_id(), task->task_id())] = task;
-    resourcesInUse += task->resources();
+    foreach (const Resource& resource, task->resources()) {
+      resourcesInUse += resource;
+    }
   }
   
   void removeTask(Task *task)
   {
     CHECK(tasks.count(make_pair(task->framework_id(), task->task_id())) > 0);
     tasks.erase(make_pair(task->framework_id(), task->task_id()));
-    resourcesInUse -= task->resources();
+    foreach (const Resource& resource, task->resources()) {
+      resourcesInUse -= resource;
+    }
   }
   
   Resources resourcesFree()
   {
-    return info.resources() - (resourcesOffered + resourcesInUse);
+    Resources resources;
+    foreach (const Resource& resource, info.resources()) {
+      resources += resource;
+    }
+    return resources - (resourcesOffered + resourcesInUse);
   }
 };
 

Modified: incubator/mesos/trunk/src/master/simple_allocator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/simple_allocator.cpp?rev=1132231&r1=1132230&r2=1132231&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/simple_allocator.cpp (original)
+++ incubator/mesos/trunk/src/master/simple_allocator.cpp Sun Jun  5 09:04:47 2011
@@ -32,7 +32,8 @@ void SimpleAllocator::frameworkRemoved(F
 
 void SimpleAllocator::slaveAdded(Slave* slave)
 {
-  LOG(INFO) << "Added " << slave;
+  LOG(INFO) << "Added " << slave << " with \n"
+            << Resources(slave->info.resources());
   refusers[slave] = unordered_set<Framework*>();
   totalResources += slave->info.resources();
   makeNewOffers(slave);
@@ -66,19 +67,21 @@ void SimpleAllocator::offerReturned(Slot
                                     const vector<SlaveResources>& resLeft)
 {
   LOG(INFO) << "Offer returned: " << offer << ", reason = " << reason;
-  // If this offer returned due to the framework replying, add it to refusers
+
+  // If this offer returned due to the framework replying, add it to refusers.
   if (reason == ORR_FRAMEWORK_REPLIED) {
     Framework* framework = master->lookupFramework(offer->frameworkId);
     CHECK(framework != 0);
     foreach (const SlaveResources& r, resLeft) {
-      VLOG(1) << "Framework reply leaves " << r.resources 
+      VLOG(1) << "Framework reply leaves " << r.resources.allocatable()
               << " free on " << r.slave;
-      if (r.resources.cpus() > 0 || r.resources.mem() > 0) {
+      if (r.resources.allocatable().size() > 0) {
         VLOG(1) << "Inserting " << framework << " as refuser for " << r.slave;
         refusers[r.slave].insert(framework);
       }
     }
   }
+
   // Make new offers unless the offer returned due to a lost framework or slave
   // (in those cases, frameworkRemoved and slaveRemoved will be called later),
   // or returned due to a framework failover (in which case the framework's
@@ -111,28 +114,42 @@ namespace {
   
 struct DominantShareComparator
 {
-  Resources total;
-  
-  DominantShareComparator(Resources _total) : total(_total)
-  {
-    if (total.cpus() == 0) // Prevent division by zero if there are no slaves
-      total.set_cpus(1);
-    if (total.mem() == 0)
-      total.set_mem(1);
-  }
+  DominantShareComparator(const Resources& _resources)
+    : resources(_resources) {}
   
-  bool operator() (Framework* f1, Framework* f2)
+  bool operator () (Framework* f1, Framework* f2)
   {
-    double share1 = max(f1->resources.cpus() / (double) total.cpus(),
-                        f1->resources.mem()  / (double) total.mem());
-    double share2 = max(f2->resources.cpus() / (double) total.cpus(),
-                        f2->resources.mem()  / (double) total.mem());
+    double share1 = 0;
+    double share2 = 0;
+
+    // TODO(benh): This implementaion of "dominant resource fairness"
+    // currently does not take into account resources that are not
+    // scalars.
+
+    foreach (const Resource& resource, resources) {
+      if (resource.type() == Resource::SCALAR) {
+        double total = resource.scalar().value();
+
+        if (total > 0) {
+          const Resource::Scalar& scalar1 =
+            f1->resources.getScalar(resource.name(), Resource::Scalar());
+          share1 = max(share1, scalar1.value() / total);
+
+          const Resource::Scalar& scalar2 =
+            f2->resources.getScalar(resource.name(), Resource::Scalar());
+          share2 = max(share2, scalar2.value() / total);
+        }
+      }
+    }
+
     if (share1 == share2)
       // Make the sort deterministic for unit testing.
       return f1->frameworkId.value() < f2->frameworkId.value();
     else
       return share1 < share2;
   }
+
+  Resources resources;
 };
 
 }
@@ -172,17 +189,42 @@ void SimpleAllocator::makeNewOffers(cons
     return;
   }
   
-  // Find all the free resources that can be allocated
+  // Find all the free resources that can be allocated.
   unordered_map<Slave* , Resources> freeResources;
   foreach (Slave* slave, slaves) {
     if (slave->active) {
-      Resources res = slave->resourcesFree();
-      if (res.cpus() >= MIN_CPUS && res.mem() >= MIN_MEM) {
-        VLOG(1) << "Found free resources: " << res << " on " << slave;
-        freeResources[slave] = res;
+      Resources resources = slave->resourcesFree();
+      Resources allocatable = resources.allocatable();
+
+      // TODO(benh): For now, only make offers when there is some cpu
+      // and memory left. This is an artifact of the original code
+      // that only offered when there was at least 1 cpu "unit"
+      // available, and without doing this a framework might get
+      // offered resources with only memory available (which it
+      // obviously won't take) and then get added as a refuser for
+      // that slave and therefore have to wait upwards of
+      // DEFAULT_REFUSAL_TIMEOUT until resources come from that slave
+      // again. In the long run, frameworks will poll the master for
+      // resources, rather than the master pushing resources out to
+      // frameworks.
+
+      Resource::Scalar cpus;
+      cpus.set_value(0);
+
+      cpus = allocatable.getScalar("cpus", cpus);
+
+      Resource::Scalar mem;
+      mem.set_value(0);
+
+      mem = allocatable.getScalar("mem", mem);
+
+      if (cpus.value() >= MIN_CPUS && mem.value() > MIN_MEM) {
+        VLOG(1) << "Found free resources: " << allocatable << " on " << slave;
+        freeResources[slave] = allocatable;
       }
     }
   }
+
   if (freeResources.size() == 0) {
     VLOG(1) << "makeNewOffers returning because there are no free resources";
     return;
@@ -197,11 +239,11 @@ void SimpleAllocator::makeNewOffers(cons
       refs.clear();
     }
   }
-  
+
   foreach (Framework* framework, ordering) {
     // See which resources this framework can take (given filters & refusals)
     vector<SlaveResources> offerable;
-    foreachpair (Slave* slave, Resources resources, freeResources) {
+    foreachpair (Slave* slave, const Resources& resources, freeResources) {
       if (refusers[slave].find(framework) == refusers[slave].end() &&
           !framework->filters(slave, resources)) {
         VLOG(1) << "Offering " << resources << " on " << slave
@@ -209,10 +251,12 @@ void SimpleAllocator::makeNewOffers(cons
         offerable.push_back(SlaveResources(slave, resources));
       }
     }
+
     if (offerable.size() > 0) {
       foreach (SlaveResources& r, offerable) {
         freeResources.erase(r.slave);
       }
+
       master->makeOffer(framework, offerable);
     }
   }

Modified: incubator/mesos/trunk/src/messaging/messages.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.proto?rev=1132231&r1=1132230&r2=1132231&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.proto (original)
+++ incubator/mesos/trunk/src/messaging/messages.proto Sun Jun  5 09:04:47 2011
@@ -2,17 +2,26 @@ import "mesos.proto";
 
 package mesos.internal;
 
-
+// TODO(benh): It would be great if this could just be a
+// TaskDescription wherever it gets used! One performance reason why
+// we don't do that now is because storing whatever data is coupled
+// with a TaskDescription could be large and unnecessary.
 message Task {
   required string name = 1;
   required TaskID task_id = 2;
   required FrameworkID framework_id = 3;
   required SlaveID slave_id = 4;
-  required Resources resources = 5;
+  repeated Resource resources = 5;
   required TaskState state = 6;
 }
 
 
+message ResourceOffer {
+  required SlaveInfo slave = 1;
+  repeated Resource resources = 2;
+}
+
+
 message FrameworkMessageMessage { // :(
   required FrameworkID framework_id = 1;
   required FrameworkMessage message = 2;
@@ -71,7 +80,6 @@ message RunTaskMessage {
   required FrameworkID framework_id = 2;
   required string pid = 3;
   required TaskDescription task = 4;
-  required Resources resources = 5;
 }