You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/01/11 23:28:44 UTC
svn commit: r1230289 [2/3] - in /incubator/mesos/trunk: ./ include/mesos/
src/ src/common/ src/detector/ src/examples/ src/examples/java/
src/examples/python/ src/exec/ src/java/jni/ src/java/src/org/apache/mesos/
src/log/ src/master/ src/messages/ src...
Modified: incubator/mesos/trunk/src/log/coordinator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/coordinator.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/coordinator.cpp (original)
+++ incubator/mesos/trunk/src/log/coordinator.cpp Wed Jan 11 22:28:41 2012
@@ -1,8 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#include <algorithm>
#include <process/dispatch.hpp>
#include <process/future.hpp>
-#include <process/timeout.hpp>
#include "common/foreach.hpp"
#include "common/option.hpp"
@@ -34,9 +51,15 @@ Coordinator::Coordinator(int _quorum,
Coordinator::~Coordinator() {}
-Result<uint64_t> Coordinator::elect()
+Result<uint64_t> Coordinator::elect(const Timeout& timeout)
{
- CHECK(!elected);
+ LOG(INFO) << "Coordinator attempting to get elected within "
+ << timeout.remaining() << " seconds";
+
+ if (elected) {
+ // TODO(benh): No-op instead of error?
+ return Result<uint64_t>::error("Coordinator already elected");
+ }
// Get the highest known promise from our local replica.
Future<uint64_t> promise = replica->promised();
@@ -61,8 +84,6 @@ Result<uint64_t> Coordinator::elect()
Option<Future<PromiseResponse> > option;
int okays = 0;
- Timeout timeout = 1.0; // TODO(benh): Have timeout get passed in!
-
do {
option = select(futures, timeout.remaining());
if (option.isSome()) {
@@ -87,7 +108,7 @@ Result<uint64_t> Coordinator::elect()
// Either we have a quorum or we timed out.
if (okays >= quorum) {
- LOG(INFO) << "Coordinator elected!";
+ LOG(INFO) << "Coordinator elected, attempting to fill missing positions";
elected = true;
// Need to "catchup" local replica (i.e., fill in any unlearned
@@ -99,19 +120,22 @@ Result<uint64_t> Coordinator::elect()
Future<set<uint64_t> > positions = replica->missing(index);
- positions.await(); // TODO(benh): Have timeout get passed in!
+ positions.await(timeout.remaining());
if (positions.isFailed()) {
+ elected = false;
return Result<uint64_t>::error(positions.failure());
}
CHECK(positions.isReady()) << "Not expecting a discarded future!";
foreach (uint64_t position, positions.get()) {
- Result<Action> result = fill(position);
+ Result<Action> result = fill(position, timeout);
if (result.isError()) {
+ elected = false;
return Result<uint64_t>::error(result.error());
} else if (result.isNone()) {
+ elected = false;
return Result<uint64_t>::none();
} else {
CHECK(result.isSome());
@@ -136,7 +160,9 @@ Result<uint64_t> Coordinator::demote()
}
-Result<uint64_t> Coordinator::append(const string& bytes)
+Result<uint64_t> Coordinator::append(
+ const string& bytes,
+ const Timeout& timeout)
{
if (!elected) {
return Result<uint64_t>::error("Coordinator not elected");
@@ -150,7 +176,7 @@ Result<uint64_t> Coordinator::append(con
Action::Append* append = action.mutable_append();
append->set_bytes(bytes);
- Result<uint64_t> result = write(action);
+ Result<uint64_t> result = write(action, Timeout(timeout));
if (result.isSome()) {
CHECK(result.get() == index);
@@ -161,7 +187,9 @@ Result<uint64_t> Coordinator::append(con
}
-Result<uint64_t> Coordinator::truncate(uint64_t to)
+Result<uint64_t> Coordinator::truncate(
+ uint64_t to,
+ const Timeout& timeout)
{
if (!elected) {
return Result<uint64_t>::error("Coordinator not elected");
@@ -175,7 +203,7 @@ Result<uint64_t> Coordinator::truncate(u
Action::Truncate* truncate = action.mutable_truncate();
truncate->set_to(to);
- Result<uint64_t> result = write(action);
+ Result<uint64_t> result = write(action, timeout);
if (result.isSome()) {
CHECK(result.get() == index);
@@ -186,11 +214,14 @@ Result<uint64_t> Coordinator::truncate(u
}
-Result<uint64_t> Coordinator::write(const Action& action)
+Result<uint64_t> Coordinator::write(
+ const Action& action,
+ const Timeout& timeout)
{
LOG(INFO) << "Coordinator attempting to write "
<< Action::Type_Name(action.type())
- << " action at position " << action.position();
+ << " action at position " << action.position()
+ << " within " << timeout.remaining() << " seconds";
CHECK(elected);
@@ -238,8 +269,6 @@ Result<uint64_t> Coordinator::write(cons
Option<Future<WriteResponse> > option;
int okays = 0;
- Timeout timeout = 1.0; // TODO(benh): Have timeout get passed in!
-
do {
option = select(futures, timeout.remaining());
if (option.isSome()) {
@@ -307,16 +336,20 @@ Result<uint64_t> Coordinator::commit(con
LOG(FATAL) << "Unknown Action::Type!";
}
+ // TODO(benh): Add a non-message based way to do this write.
+ Future<WriteResponse> future = protocol::write(replica->pid(), request);
+
// We send a write request to the *local* replica just as the
// others: asynchronously via messages. However, rather than add the
// complications of dealing with timeouts for local operations
// (especially since we are trying to commit something), we make
// things simpler and block on the response from the local replica.
- // TODO(benh): Add a non-message based way to do this write.
+ // Maybe we can let it timeout, but consider it a failure? This
+ // might be sound because we don't send the learned messages ... so
+ // this should be the same as if we just failed before we even do
+ // the write ... a client should just retry this write later.
- Future<WriteResponse> future = protocol::write(replica->pid(), request);
-
- future.await(); // TODO(benh): Let it timeout, but consider it a failure.
+ future.await(); // TODO(benh): Don't wait forever, see comment above.
if (future.isFailed()) {
return Result<uint64_t>::error(future.failure());
@@ -349,7 +382,7 @@ Result<uint64_t> Coordinator::commit(con
}
-Result<Action> Coordinator::fill(uint64_t position)
+Result<Action> Coordinator::fill(uint64_t position, const Timeout& timeout)
{
LOG(INFO) << "Coordinator attempting to fill position "
<< position << " in the log";
@@ -367,8 +400,6 @@ Result<Action> Coordinator::fill(uint64_
Option<Future<PromiseResponse> > option;
list<PromiseResponse> responses;
- Timeout timeout = 1.0; // TODO(benh): Have timeout get passed in!
-
do {
option = select(futures, timeout.remaining());
if (option.isSome()) {
@@ -433,7 +464,7 @@ Result<Action> Coordinator::fill(uint64_
action.set_performed(id);
}
- Result<uint64_t> result = write(action);
+ Result<uint64_t> result = write(action, timeout);
if (result.isError()) {
return Result<Action>::error(result.error());
Modified: incubator/mesos/trunk/src/log/coordinator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/coordinator.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/coordinator.hpp (original)
+++ incubator/mesos/trunk/src/log/coordinator.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#ifndef __LOG_COORDINATOR_HPP__
#define __LOG_COORDINATOR_HPP__
@@ -5,8 +23,10 @@
#include <vector>
#include <process/process.hpp>
+#include <process/timeout.hpp>
#include "common/result.hpp"
+#include "common/seconds.hpp"
#include "log/network.hpp"
#include "log/replica.hpp"
@@ -20,9 +40,6 @@ namespace log {
using namespace process;
-// TODO(benh): Pass timeouts into the coordinator functions rather
-// than have hard coded timeouts within.
-
class Coordinator
{
public:
@@ -36,32 +53,32 @@ public:
// coordinator failed to achieve a quorum (e.g., due to timeout) but
// can be retried. A some result returns the last committed log
// position.
- Result<uint64_t> elect();
+ Result<uint64_t> elect(const Timeout& timeout);
Result<uint64_t> demote();
// Returns the result of trying to append the specified bytes. A
// result of none means the append failed (e.g., due to timeout),
// but can be retried.
- Result<uint64_t> append(const std::string& bytes);
+ Result<uint64_t> append(const std::string& bytes, const Timeout& timeout);
// Returns the result of trying to truncate the log (from the
// beginning to the specified position exclusive). A result of
// none means the truncate failed (e.g., due to timeout), but can be
// retried.
- Result<uint64_t> truncate(uint64_t to);
+ Result<uint64_t> truncate(uint64_t to, const Timeout& timeout);
private:
// Helper that tries to achieve consensus of the specified action. A
// result of none means the write failed (e.g., due to timeout), but
// can be retried.
- Result<uint64_t> write(const Action& action);
+ Result<uint64_t> write(const Action& action, const Timeout& timeout);
// Helper that handles commiting an action (i.e., writing to the
// local replica and then sending out learned messages).
Result<uint64_t> commit(const Action& action);
// Helper that tries to fill a position in the log.
- Result<Action> fill(uint64_t position);
+ Result<Action> fill(uint64_t position, const Timeout& timeout);
// Helper that uses the specified protocol to broadcast a request to
// our group and return a set of futures.
Modified: incubator/mesos/trunk/src/log/log.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/log.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/log.cpp (original)
+++ incubator/mesos/trunk/src/log/log.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
// TODO(benh): Optimize LearnedMessage (and the "commit" stage in
// general) by figuring out a way to not send the entire action
// contents a second time (should cut bandwidth used in half).
@@ -603,7 +621,7 @@ int main(int argc, char** argv)
}
args = argv;
-
+
int quorum = atoi(argv[1]);
string file = argv[2];
string servers = argv[3];
@@ -619,7 +637,7 @@ int main(int argc, char** argv)
int at = atoi(argv[i]);
int to = atoi(argv[i + 1]);
-
+
truncations[at] = to;
}
Modified: incubator/mesos/trunk/src/log/log.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/log.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/log.hpp (original)
+++ incubator/mesos/trunk/src/log/log.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#ifndef __LOG_HPP__
#define __LOG_HPP__
@@ -6,9 +24,11 @@
#include <string>
#include <process/process.hpp>
+#include <process/timeout.hpp>
#include "common/foreach.hpp"
#include "common/result.hpp"
+#include "common/seconds.hpp"
#include "common/try.hpp"
#include "log/coordinator.hpp"
@@ -103,7 +123,9 @@ public:
// Returns all entries between the specified positions, unless
// those positions are invalid, in which case returns an error.
- Result<std::list<Entry> > read(const Position& from, const Position& to);
+ Result<std::list<Entry> > read(const Position& from,
+ const Position& to,
+ const seconds& timeout);
// Returns the beginning position of the log from the perspective
// of the local replica (which may be out of date if the log has
@@ -127,20 +149,20 @@ public:
// one writer (local and remote) is valid at a time. A writer
// becomes invalid if any operation returns an error, and a new
// writer must be created in order perform subsequent operations.
- Writer(Log* log, int retries = 3);
+ Writer(Log* log, const seconds& timeout, int retries = 3);
~Writer();
// Attempts to append the specified data to the log. A none result
// means the operation timed out, otherwise the new ending
// position of the log is returned or an error. Upon error a new
// Writer must be created.
- Result<Position> append(const std::string& data);
+ Result<Position> append(const std::string& data, const seconds& timeout);
// Attempts to truncate the log up to but not including the
// specificed position. A none result means the operation timed
// out, otherwise the new ending position of the log is returned
// or an error. Upon error a new Writer must be created.
- Result<Position> truncate(const Position& to);
+ Result<Position> truncate(const Position& to, const seconds& timeout);
private:
Option<std::string> error;
@@ -258,19 +280,19 @@ Log::Reader::~Reader() {}
Result<std::list<Log::Entry> > Log::Reader::read(
const Log::Position& from,
- const Log::Position& to)
+ const Log::Position& to,
+ const seconds& timeout)
{
process::Future<std::list<Action> > actions =
replica->read(from.value, to.value);
- // TODO(benh): Take a timeout!
- actions.await();
-
- if (actions.isFailed()) {
+ if (!actions.await(timeout.value)) {
+ return Result<std::list<Log::Entry> >::none();
+ } else if (actions.isFailed()) {
return Result<std::list<Log::Entry> >::error(actions.failure());
}
- CHECK(actions.isReady()) << "Not expecting discarded future!";
+ CHECK(actions.isReady()) << "Not expecting discarded future!";
std::list<Log::Entry> entries;
@@ -319,12 +341,14 @@ Log::Position Log::Reader::ending()
}
-Log::Writer::Writer(Log* log, int retries)
+Log::Writer::Writer(Log* log, const seconds& timeout, int retries)
: coordinator(log->quorum, log->replica, log->network),
error(Option<std::string>::none())
{
+ LOG(INFO) << "Number of retries: " << retries;
+
do {
- Result<uint64_t> result = coordinator.elect();
+ Result<uint64_t> result = coordinator.elect(Timeout(timeout.value));
if (result.isNone()) {
retries--;
} else if (result.isSome()) {
@@ -343,7 +367,9 @@ Log::Writer::~Writer()
}
-Result<Log::Position> Log::Writer::append(const std::string& data)
+Result<Log::Position> Log::Writer::append(
+ const std::string& data,
+ const seconds& timeout)
{
if (error.isSome()) {
return Result<Log::Position>::error(error.get());
@@ -351,7 +377,7 @@ Result<Log::Position> Log::Writer::appen
LOG(INFO) << "Attempting to append " << data.size() << " bytes to the log";
- Result<uint64_t> result = coordinator.append(data);
+ Result<uint64_t> result = coordinator.append(data, Timeout(timeout.value));
if (result.isError()) {
error = result.error();
@@ -366,7 +392,9 @@ Result<Log::Position> Log::Writer::appen
}
-Result<Log::Position> Log::Writer::truncate(const Log::Position& to)
+Result<Log::Position> Log::Writer::truncate(
+ const Log::Position& to,
+ const seconds& timeout)
{
if (error.isSome()) {
return Result<Log::Position>::error(error.get());
@@ -374,7 +402,8 @@ Result<Log::Position> Log::Writer::trunc
LOG(INFO) << "Attempting to truncate the log to " << to.value;
- Result<uint64_t> result = coordinator.truncate(to.value);
+ Result<uint64_t> result =
+ coordinator.truncate(to.value, Timeout(timeout.value));
if (result.isError()) {
error = result.error();
Modified: incubator/mesos/trunk/src/log/main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/main.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/main.cpp (original)
+++ incubator/mesos/trunk/src/log/main.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#include <iostream>
#include <list>
#include <string>
Modified: incubator/mesos/trunk/src/log/network.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/network.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/network.hpp (original)
+++ incubator/mesos/trunk/src/log/network.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#ifndef __NETWORK_HPP__
#define __NETWORK_HPP__
Modified: incubator/mesos/trunk/src/log/replica.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/replica.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/replica.cpp (original)
+++ incubator/mesos/trunk/src/log/replica.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#include <leveldb/comparator.h>
#include <leveldb/db.h>
#include <leveldb/write_batch.h>
@@ -7,6 +25,7 @@
#include <process/dispatch.hpp>
#include <process/protobuf.hpp>
+#include "common/timer.hpp"
#include "common/utils.hpp"
#include "log/replica.hpp"
@@ -108,48 +127,55 @@ private:
}
};
- static leveldb::Slice slice(uint64_t position)
+ // Returns a string representing the specified position. Note that
+ // we adjust the actual position by incrementing it by 1 because we
+ // reserve 0 for storing the promise record (Record::Promise).
+ static string encode(uint64_t position, bool adjust = true)
{
- // TODO(benh): Use varint comparator.
- LOG(FATAL) << "Unimplemented";
+ // Adjusted stringified represenation is plus 1 of actual position.
+ position = adjust ? position + 1 : position;
+
+ // TODO(benh): Use varint encoding for VarInt64Comparator!
// string s;
// google::protobuf::io::StringOutputStream _stream(&s);
// google::protobuf::io::CodedOutputStream stream(&_stream);
+ // position = adjust ? position + 1 : position;
// stream.WriteVarint64(position);
// return s;
- }
- static string stringify(uint64_t position)
- {
- // TODO(benh): Eliminate this once we can use custom comparators!
Try<string> s = strings::format("%.*d", 10, position);
CHECK(s.isSome());
return s.get();
}
- static uint64_t position(const leveldb::Slice& s)
+ // Returns the position as represented in the specified slice
+ // (performing a decrement as necessary to determine the actual
+ // position represented).
+ static uint64_t decode(const leveldb::Slice& s)
{
- // TODO(benh): Use varint comparator.
+ // TODO(benh): Use varint decoding for VarInt64Comparator!
// uint64_t position;
// google::protobuf::io::ArrayInputStream _stream(s.data(), s.size());
// google::protobuf::io::CodedInputStream stream(&_stream);
// bool success = stream.ReadVarint64(&position);
// CHECK(success);
- // return position;
+ // return position - 1; // Actual position is less 1 of stringified.
Try<uint64_t> position =
utils::numify<uint64_t>(string(s.data(), s.size()));
CHECK(position.isSome());
- return position.get();
+ return position.get() - 1; // Actual position is less 1 of stringified.
}
// Varint64Comparator comparator; // TODO(benh): Use varint comparator.
leveldb::DB* db;
+
+ uint64_t first; // First position still in leveldb, used during truncation.
};
LevelDBStorage::LevelDBStorage()
- : db(NULL)
+ : db(NULL), first(0)
{
// Nothing to see here.
}
@@ -173,9 +199,9 @@ Try<State> LevelDBStorage::recover(const
// string produces a stable ordering. Checks below.
// options.comparator = &comparator;
- const string& one = stringify(1);
- const string& two = stringify(2);
- const string& ten = stringify(10);
+ const string& one = encode(1);
+ const string& two = encode(2);
+ const string& ten = encode(10);
CHECK(leveldb::BytewiseComparator()->Compare(one, two) < 0);
CHECK(leveldb::BytewiseComparator()->Compare(two, one) > 0);
@@ -195,6 +221,11 @@ Try<State> LevelDBStorage::recover(const
state.begin = 0;
state.end = 0;
+ // TODO(benh): Consider just reading the "promise" record (e.g.,
+ // 'encode(0, false)') and then iterating over the rest of the
+ // records and confirming that they are all indeed of type
+ // Record::Action.
+
leveldb::Iterator* iterator = db->NewIterator(leveldb::ReadOptions());
iterator->SeekToFirst();
@@ -243,6 +274,17 @@ Try<State> LevelDBStorage::recover(const
iterator->Next();
}
+ // Determine the first position still in leveldb so during a
+ // truncation we can attempt to delete all positions from the first
+ // position up to the truncate position. Note that this is not the
+ // beginning position of the log, but rather the first position that
+ // remains (i.e., hasn't been deleted) in leveldb.
+ iterator->Seek(encode(0));
+
+ if (iterator->Valid()) {
+ first = decode(iterator->key());
+ }
+
delete iterator;
return state;
@@ -251,6 +293,9 @@ Try<State> LevelDBStorage::recover(const
Try<void> LevelDBStorage::persist(const Promise& promise)
{
+ Timer timer;
+ timer.start();
+
leveldb::WriteOptions options;
options.sync = true;
@@ -264,45 +309,24 @@ Try<void> LevelDBStorage::persist(const
return Try<void>::error("Failed to serialize record");
}
- leveldb::Status status = db->Put(options, stringify(0), value);
+ leveldb::Status status = db->Put(options, encode(0, false), value);
if (!status.ok()) {
return Try<void>::error(status.ToString());
}
+ LOG(INFO) << "Persisting promise (" << value.size()
+ << " bytes) to leveldb took "
+ << timer.elapsed().millis() << " milliseconds";
+
return Try<void>::some();
}
Try<void> LevelDBStorage::persist(const Action& action)
{
- leveldb::WriteBatch batch;
-
- // Delete positions only if a truncate action has been *learned*.
- // TODO(benh): Consider doing this asynchronously (but will require
- // synchronization on the underlying DB).
- if (action.has_learned() && action.learned() &&
- action.has_type() && action.type() == Action::TRUNCATE) {
- CHECK(action.has_truncate());
-
- leveldb::Iterator* iterator = db->NewIterator(leveldb::ReadOptions());
-
- iterator->Seek(stringify(1)); // The actual "beginning" of the log.
-
- const string& to = stringify(action.truncate().to() + 1);
-
- while (iterator->Valid()) {
- // Only iterate as far as (but excluding) the truncate position.
- // TODO(benh): Use varint comparator.
- if (leveldb::BytewiseComparator()->Compare(iterator->key(), to) >= 0) {
- break;
- }
- batch.Delete(iterator->key());
- iterator->Next();
- }
-
- delete iterator;
- }
+ Timer timer;
+ timer.start();
Record record;
record.set_type(Record::ACTION);
@@ -314,28 +338,84 @@ Try<void> LevelDBStorage::persist(const
return Try<void>::error("Failed to serialize record");
}
- batch.Put(stringify(action.position() + 1), value);
-
leveldb::WriteOptions options;
options.sync = true;
- leveldb::Status status = db->Write(options, &batch);
+ leveldb::Status status = db->Put(options, encode(action.position()), value);
if (!status.ok()) {
return Try<void>::error(status.ToString());
}
+ LOG(INFO) << "Persisting action (" << value.size()
+ << " bytes) to leveldb took "
+ << timer.elapsed().millis() << " milliseconds";
+
+ // Delete positions if a truncate action has been *learned*. Note
+ // that we do this in a best-effort fashion (i.e., we ignore any
+ // failures to the database since we can always try again).
+ if (action.has_type() && action.type() == Action::TRUNCATE &&
+ action.has_learned() && action.learned()) {
+ CHECK(action.has_truncate());
+
+ timer.start(); // Restart the timer.
+
+ // To actually perform the truncation in leveldb we need to remove
+ // all the keys that represent positions no longer in the log. We
+ // do this by attempting to delete all keys that represent the
+ // first position we know is still in leveldb up to (but
+ // excluding) the truncate position. Note that this works because
+ // the semantics of WriteBatch are such that even if the position
+ // doesn't exist (which is possible because this replica has some
+ // holes), we can attempt to delete the key that represents it and
+ // it will just ignore that key. This is *much* cheaper than
+ // actually iterating through the entire database instead (which
+ // was, for posterity, the original implementation). In addition,
+ // caching the "first" position we know is in the database is
+ // cheaper than using an iterator to determine the first position
+ // (which was, for posterity, the second implementation).
+
+ leveldb::WriteBatch batch;
+
+ // Add positions up to (but excluding) the truncate position to
+ // the batch starting at the first position still in leveldb.
+ uint64_t index = 0;
+ while ((first + index) < action.truncate().to()) {
+ batch.Delete(encode(first + index));
+ index++;
+ }
+
+ // If we added any positions, attempt to delete them!
+ if (index > 0) {
+ // We do this write asynchronously (e.g., using default options).
+ leveldb::Status status = db->Write(leveldb::WriteOptions(), &batch);
+
+ if (!status.ok()) {
+ LOG(WARNING) << "Ignoring leveldb batch delete failure: "
+ << status.ToString();
+ } else {
+ first = action.truncate().to(); // Save the new first position!
+
+ LOG(INFO) << "Deleting ~" << index << " keys from leveldb took "
+ << timer.elapsed().millis() << " milliseconds";
+ }
+ }
+ }
+
return Try<void>::some();
}
Try<Action> LevelDBStorage::read(uint64_t position)
{
+ Timer timer;
+ timer.start();
+
string value;
leveldb::ReadOptions options;
- leveldb::Status status = db->Get(options, stringify(position + 1), &value);
+ leveldb::Status status = db->Get(options, encode(position), &value);
if (!status.ok()) {
return Try<Action>::error(status.ToString());
@@ -353,6 +433,9 @@ Try<Action> LevelDBStorage::read(uint64_
return Try<Action>::error("Bad record");
}
+ LOG(INFO) << "Reading position from leveldb took "
+ << timer.elapsed().millis() << " milliseconds";
+
return record.action();
}
@@ -661,6 +744,8 @@ void ReplicaProcess::promise(const Promi
void ReplicaProcess::write(const WriteRequest& request)
{
+ LOG(INFO) << "Replica received write request for position " << request.position();
+
Result<Action> result = read(request.position());
if (result.isError()) {
@@ -759,6 +844,8 @@ void ReplicaProcess::write(const WriteRe
void ReplicaProcess::learned(const Action& action)
{
+ LOG(INFO) << "Replica received learned notice for position " << action.position();
+
CHECK(action.learned());
if (persist(action)) {
@@ -771,6 +858,8 @@ void ReplicaProcess::learned(const Actio
void ReplicaProcess::learn(uint64_t position)
{
+ LOG(INFO) << "Replica received learn request for position " << position;
+
Result<Action> result = read(position);
if (result.isError()) {
Modified: incubator/mesos/trunk/src/log/replica.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/replica.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/replica.hpp (original)
+++ incubator/mesos/trunk/src/log/replica.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#ifndef __LOG_REPLICA_HPP__
#define __LOG_REPLICA_HPP__
Modified: incubator/mesos/trunk/src/master/constants.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/constants.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/constants.hpp (original)
+++ incubator/mesos/trunk/src/master/constants.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#ifndef __MASTER_CONSTANTS_HPP__
#define __MASTER_CONSTANTS_HPP__
@@ -32,15 +50,15 @@ const double SLAVE_PONG_TIMEOUT = 15.0;
// Maximum number of timeouts until slave is considered failed.
const int MAX_SLAVE_TIMEOUTS = 5;
-// Default time to wait for a framework scheduler to failover.
-const int DEFAULT_FAILOVER_TIMEOUT = 1;
+// Time to wait for a framework to failover.
+const double FRAMEWORK_FAILOVER_TIMEOUT = 1.0;
// Maximum number of completed frameworks to store in the cache.
-// TODO(thomasm): make configurable
+// TODO(thomasm): Make configurable.
const int MAX_COMPLETED_FRAMEWORKS = 100;
-// Maximum number of completed tasks per framework to store in the cache.
-// TODO(thomasm): make configurable
+// Maximum number of completed tasks per framework to store in the
+// cache. TODO(thomasm): Make configurable.
const int MAX_COMPLETED_TASKS_PER_FRAMEWORK = 500;
} // namespace mesos {
Modified: incubator/mesos/trunk/src/master/frameworks_manager.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/frameworks_manager.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/frameworks_manager.cpp (original)
+++ incubator/mesos/trunk/src/master/frameworks_manager.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#include <glog/logging.h>
#include <process/timer.hpp>
Modified: incubator/mesos/trunk/src/master/frameworks_manager.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/frameworks_manager.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/frameworks_manager.hpp (original)
+++ incubator/mesos/trunk/src/master/frameworks_manager.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#ifndef __MASTER_FRAMEWORKS_MANAGER_HPP__
#define __MASTER_FRAMEWORKS_MANAGER_HPP__
Modified: incubator/mesos/trunk/src/master/http.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/http.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/http.cpp (original)
+++ incubator/mesos/trunk/src/master/http.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#include <iomanip>
#include <sstream>
#include <string>
@@ -31,9 +49,9 @@ namespace master {
JSON::Object model(const Resources& resources)
{
// TODO(benh): Add all of the resources.
- Resource::Scalar none;
- Resource::Scalar cpus = resources.get("cpus", none);
- Resource::Scalar mem = resources.get("mem", none);
+ Value::Scalar none;
+ Value::Scalar cpus = resources.get("cpus", none);
+ Value::Scalar mem = resources.get("mem", none);
JSON::Object object;
object.values["cpus"] = cpus.value();
@@ -93,6 +111,7 @@ JSON::Object model(const Framework& fram
object.values["tasks"] = array;
}
+ // Model all of the completed tasks of a framework.
{
JSON::Array array;
foreach (const Task& task, framework.completedTasks) {
@@ -196,7 +215,7 @@ Promise<HttpResponse> stats(
}
foreach (const Resource& resource, totalResources) {
- if (resource.type() == Resource::SCALAR) {
+ if (resource.type() == Value::SCALAR) {
CHECK(resource.has_scalar());
double total = resource.scalar().value();
object.values[resource.name() + "_total"] = total;
@@ -214,7 +233,7 @@ Promise<HttpResponse> stats(
JSON::render(out, object);
HttpOKResponse response;
- response.headers["Content-Type"] = "text/x-json";
+ response.headers["Content-Type"] = "application/json";
response.headers["Content-Length"] = utils::stringify(out.str().size());
response.body = out.str().data();
return response;
@@ -270,7 +289,7 @@ Promise<HttpResponse> state(
JSON::render(out, object);
HttpOKResponse response;
- response.headers["Content-Type"] = "text/x-json";
+ response.headers["Content-Type"] = "application/json";
response.headers["Content-Length"] = utils::stringify(out.str().size());
response.body = out.str().data();
return response;
Modified: incubator/mesos/trunk/src/master/http.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/http.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/http.hpp (original)
+++ incubator/mesos/trunk/src/master/http.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#ifndef __MASTER_HTTP_HPP__
#define __MASTER_HTTP_HPP__
Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Wed Jan 11 22:28:41 2012
@@ -229,7 +229,7 @@ void Master::registerOptions(Configurato
configurator->addOption<int>(
"failover_timeout",
"Framework failover timeout in seconds",
- DEFAULT_FAILOVER_TIMEOUT);
+ FRAMEWORK_FAILOVER_TIMEOUT);
}
@@ -311,7 +311,7 @@ void Master::initialize()
nextSlaveId = 0;
nextOfferId = 0;
- failoverTimeout = conf.get<int>("failover_timeout", DEFAULT_FAILOVER_TIMEOUT);
+ failoverTimeout = conf.get<int>("failover_timeout", FRAMEWORK_FAILOVER_TIMEOUT);
// Start all the statistics at 0.
CHECK(TASK_STARTING == TaskState_MIN);
@@ -694,13 +694,15 @@ void Master::launchTasks(const Framework
} else {
// The offer is gone (possibly rescinded, lost slave, re-reply
// to same offer, etc). Report all tasks in it as failed.
+ // TODO: Consider adding a new task state TASK_INVALID for
+ // situations like these.
foreach (const TaskDescription& task, tasks) {
StatusUpdateMessage message;
StatusUpdate* update = message.mutable_update();
update->mutable_framework_id()->MergeFrom(frameworkId);
TaskStatus* status = update->mutable_status();
status->mutable_task_id()->MergeFrom(task.task_id());
- status->set_state(TASK_FAILED);
+ status->set_state(TASK_LOST);
status->set_message("Task launched with invalid offer");
update->set_timestamp(elapsedTime());
update->set_uuid(UUID::random().toBytes());
@@ -759,6 +761,7 @@ void Master::killTask(const FrameworkID&
TaskStatus* status = update->mutable_status();
status->mutable_task_id()->MergeFrom(taskId);
status->set_state(TASK_LOST);
+ status->set_message("Task not found");
update->set_timestamp(elapsedTime());
update->set_uuid(UUID::random().toBytes());
send(framework->pid, message);
@@ -1017,7 +1020,7 @@ void Master::exitedExecutor(const SlaveI
// Tell the framework which tasks have been lost.
foreachvalue (Task* task, utils::copy(framework->tasks)) {
if (task->slave_id() == slave->id &&
- task->executor_id() == executorId) {
+ task->executor_id() == executorId) {
StatusUpdateMessage message;
StatusUpdate* update = message.mutable_update();
update->mutable_framework_id()->MergeFrom(task->framework_id());
@@ -1026,6 +1029,7 @@ void Master::exitedExecutor(const SlaveI
TaskStatus* status = update->mutable_status();
status->mutable_task_id()->MergeFrom(task->task_id());
status->set_state(TASK_LOST);
+ status->set_message("Lost executor");
update->set_timestamp(elapsedTime());
update->set_uuid(UUID::random().toBytes());
send(framework->pid, message);
@@ -1040,7 +1044,7 @@ void Master::exitedExecutor(const SlaveI
}
}
- // Remove executor from slave.
+ // Remove executor from slave and framework.
slave->removeExecutor(frameworkId, executorId);
framework->removeExecutor(slave->id, executorId);
@@ -1161,6 +1165,7 @@ void Master::makeOffers(Framework* frame
offer->mutable_slave_id()->MergeFrom(slave->id);
offer->set_hostname(slave->info.hostname());
offer->mutable_resources()->MergeFrom(resources);
+ offer->mutable_attributes()->MergeFrom(slave->info.attributes());
// Add all framework's executors running on this slave.
if (slave->executors.contains(framework->id)) {
@@ -1306,7 +1311,12 @@ struct ResourceUsageChecker : TaskDescri
if (!slave->hasExecutor(framework->id, executorInfo.executor_id())) {
taskResources += executorInfo.resources();
if (!((usedResources + taskResources) <= offer->resources())) {
- return TaskDescriptionError::some(
+ LOG(WARNING) << "Task " << task.task_id() << " attempted to use "
+ << taskResources << " combined with already used "
+ << usedResources << " is greater than offered "
+ << offer->resources();
+
+ return TaskDescriptionError::some(
"Task + executor uses more resources than offered");
}
}
@@ -1363,7 +1373,7 @@ void Master::processTasks(Offer* offer,
update->mutable_framework_id()->MergeFrom(framework->id);
TaskStatus* status = update->mutable_status();
status->mutable_task_id()->MergeFrom(task.task_id());
- status->set_state(TASK_FAILED);
+ status->set_state(TASK_LOST);
status->set_message(error.get());
update->set_timestamp(elapsedTime());
update->set_uuid(UUID::random().toBytes());
@@ -1451,6 +1461,7 @@ Resources Master::launchTask(const TaskD
resources += task.resources();
LOG(INFO) << "Launching task " << task.task_id()
+ << " with resources " << task.resources()
<< " on slave " << slave->id;
RunTaskMessage message;
@@ -1492,15 +1503,6 @@ void Master::failoverFramework(Framework
{
const UPID& oldPid = framework->pid;
- // Remove the framework's offers (if they weren't removed before).
- // TODO(benh): Consider just reoffering these to the new framework.
- foreach (Offer* offer, utils::copy(framework->offers)) {
- allocator->resourcesRecovered(offer->framework_id(),
- offer->slave_id(),
- offer->resources());
- removeOffer(offer);
- }
-
{
FrameworkErrorMessage message;
message.set_code(1);
@@ -1518,9 +1520,23 @@ void Master::failoverFramework(Framework
framework->reregisteredTime = elapsedTime();
- FrameworkRegisteredMessage message;
- message.mutable_framework_id()->MergeFrom(framework->id);
- send(newPid, message);
+ {
+ FrameworkRegisteredMessage message;
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ send(newPid, message);
+ }
+
+ // Remove the framework's offers (if they weren't removed before).
+ // We do this after we have updated the pid and sent the framework
+ // registered message so that the allocator can immediately re-offer
+ // these resources to this framework if it wants.
+ // TODO(benh): Consider just reoffering these to
+ foreach (Offer* offer, utils::copy(framework->offers)) {
+ allocator->resourcesRecovered(offer->framework_id(),
+ offer->slave_id(),
+ offer->resources());
+ removeOffer(offer);
+ }
}
@@ -1631,9 +1647,9 @@ void Master::readdSlave(Slave* slave,
// Find the executor running this task and add it to the slave.
foreach (const ExecutorInfo& executorInfo, executorInfos) {
if (executorInfo.executor_id() == task.executor_id()) {
- if (!slave->hasExecutor(task.framework_id(), task.executor_id())) {
- slave->addExecutor(task.framework_id(), executorInfo);
- }
+ if (!slave->hasExecutor(task.framework_id(), task.executor_id())) {
+ slave->addExecutor(task.framework_id(), executorInfo);
+ }
// Also add it to the framework if it has re-registered with us.
Framework* framework = getFramework(task.framework_id());
@@ -1641,7 +1657,7 @@ void Master::readdSlave(Slave* slave,
CHECK(!framework->hasExecutor(slave->id, task.executor_id()));
framework->addExecutor(slave->id, executorInfo);
}
- break;
+ break;
}
}
@@ -1700,6 +1716,7 @@ void Master::removeSlave(Slave* slave)
TaskStatus* status = update->mutable_status();
status->mutable_task_id()->MergeFrom(task->task_id());
status->set_state(TASK_LOST);
+ status->set_message("Slave removed");
update->set_timestamp(elapsedTime());
update->set_uuid(UUID::random().toBytes());
send(framework->pid, message);
@@ -1722,7 +1739,7 @@ void Master::removeSlave(Slave* slave)
}
}
}
-
+
// Remove slave from any filters.
foreachvalue (Framework* framework, frameworks) {
framework->slaveFilter.erase(slave);
Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Wed Jan 11 22:28:41 2012
@@ -216,6 +216,8 @@ private:
std::list<Framework> completedFrameworks;
+ double failoverTimeout; // Failover timeout for frameworks, in seconds.
+
int64_t nextFrameworkId; // Used to give each framework a unique ID.
int64_t nextOfferId; // Used to give each slot offer a unique ID.
int64_t nextSlaveId; // Used to give each slave a unique ID.
@@ -229,11 +231,7 @@ private:
uint64_t invalidFrameworkMessages;
} stats;
- // Start time used to calculate uptime.
- double startTime;
-
- // Failover timeout for frameworks, in seconds.
- int failoverTimeout;
+ double startTime; // Start time used to calculate uptime.
};
@@ -271,6 +269,8 @@ struct Slave
std::make_pair(task->framework_id(), task->task_id());
CHECK(tasks.count(key) == 0);
tasks[key] = task;
+ VLOG(1) << "Adding task with resources " << task->resources()
+ << " on slave " << id;
resourcesInUse += task->resources();
}
@@ -280,6 +280,8 @@ struct Slave
std::make_pair(task->framework_id(), task->task_id());
CHECK(tasks.count(key) > 0);
tasks.erase(key);
+ VLOG(1) << "Removing task with resources " << task->resources()
+ << " on slave " << id;
resourcesInUse -= task->resources();
}
@@ -287,6 +289,8 @@ struct Slave
{
CHECK(!offers.contains(offer));
offers.insert(offer);
+ VLOG(1) << "Adding offer with resources " << offer->resources()
+ << " on slave " << id;
resourcesOffered += offer->resources();
}
@@ -294,6 +298,8 @@ struct Slave
{
CHECK(offers.contains(offer));
offers.erase(offer);
+ VLOG(1) << "Removing offer with resources " << offer->resources()
+ << " on slave " << id;
resourcesOffered -= offer->resources();
}
@@ -330,7 +336,13 @@ struct Slave
Resources resourcesFree()
{
- return info.resources() - (resourcesOffered + resourcesInUse);
+ Resources resources = info.resources() - (resourcesOffered + resourcesInUse);
+ VLOG(1) << "Calculating resources free on slave " << id << std::endl
+ << " Resources: " << info.resources() << std::endl
+ << " Resources Offered: " << resourcesOffered << std::endl
+ << " Resources In Use: " << resourcesInUse << std::endl
+ << " Resources Free: " << resources << std::endl;
+ return resources;
}
const SlaveID id;
@@ -419,14 +431,14 @@ struct Framework
}
bool hasExecutor(const SlaveID& slaveId,
- const ExecutorID& executorId)
+ const ExecutorID& executorId)
{
return executors.contains(slaveId) &&
executors[slaveId].contains(executorId);
}
void addExecutor(const SlaveID& slaveId,
- const ExecutorInfo& executorInfo)
+ const ExecutorInfo& executorInfo)
{
CHECK(!hasExecutor(slaveId, executorInfo.executor_id()));
executors[slaveId][executorInfo.executor_id()] = executorInfo;
@@ -436,7 +448,7 @@ struct Framework
}
void removeExecutor(const SlaveID& slaveId,
- const ExecutorID& executorId)
+ const ExecutorID& executorId)
{
if (hasExecutor(slaveId, executorId)) {
// Update our resources to reflect removing this executor.
@@ -444,7 +456,7 @@ struct Framework
executors[slaveId].erase(executorId);
if (executors[slaveId].size() == 0) {
- executors.erase(slaveId);
+ executors.erase(slaveId);
}
}
}
@@ -481,6 +493,7 @@ struct Framework
hashset<Offer*> offers; // Active offers for framework.
Resources resources; // Total resources (tasks + offers + executors).
+
hashmap<SlaveID, hashmap<ExecutorID, ExecutorInfo> > executors;
// Contains a time of unfiltering for each slave we've filtered,
Modified: incubator/mesos/trunk/src/master/simple_allocator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/simple_allocator.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/simple_allocator.cpp (original)
+++ incubator/mesos/trunk/src/master/simple_allocator.cpp Wed Jan 11 22:28:41 2012
@@ -16,8 +16,6 @@
* limitations under the License.
*/
-#include <algorithm>
-
#include <glog/logging.h>
#include <algorithm>
@@ -172,14 +170,14 @@ struct DominantShareComparator
// scalars.
foreach (const Resource& resource, resources) {
- if (resource.type() == Resource::SCALAR) {
+ if (resource.type() == Value::SCALAR) {
double total = resource.scalar().value();
if (total > 0) {
- Resource::Scalar none;
- const Resource::Scalar& scalar1 =
+ Value::Scalar none;
+ const Value::Scalar& scalar1 =
framework1->resources.get(resource.name(), none);
- const Resource::Scalar& scalar2 =
+ const Value::Scalar& scalar2 =
framework2->resources.get(resource.name(), none);
share1 = max(share1, scalar1.value() / total);
share2 = max(share2, scalar2.value() / total);
@@ -257,9 +255,9 @@ void SimpleAllocator::makeNewOffers(cons
// resources, rather than the master pushing resources out to
// frameworks.
- Resource::Scalar none;
- Resource::Scalar cpus = resources.get("cpus", none);
- Resource::Scalar mem = resources.get("mem", none);
+ Value::Scalar none;
+ Value::Scalar cpus = resources.get("cpus", none);
+ Value::Scalar mem = resources.get("mem", none);
if (cpus.value() >= MIN_CPUS && mem.value() > MIN_MEM) {
VLOG(1) << "Found available resources: " << resources
Modified: incubator/mesos/trunk/src/messages/log.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/log.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages/log.hpp (original)
+++ incubator/mesos/trunk/src/messages/log.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#ifndef __MESSAGES_LOG_HPP__
#define __MESSAGES_LOG_HPP__
Modified: incubator/mesos/trunk/src/messages/log.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/log.proto?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages/log.proto (original)
+++ incubator/mesos/trunk/src/messages/log.proto Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package mesos.internal.log;
Modified: incubator/mesos/trunk/src/messages/messages.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/messages.proto?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages/messages.proto (original)
+++ incubator/mesos/trunk/src/messages/messages.proto Wed Jan 11 22:28:41 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Modified: incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp (original)
+++ incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp Wed Jan 11 22:28:41 2012
@@ -91,7 +91,7 @@ PyTypeObject MesosExecutorDriverImplType
PyMethodDef MesosExecutorDriverImpl_methods[] = {
{"start", (PyCFunction) MesosExecutorDriverImpl_start, METH_NOARGS,
"Start the driver to connect to Mesos"},
- {"stop", (PyCFunction) MesosExecutorDriverImpl_stop, METH_VARARGS,
+ {"stop", (PyCFunction) MesosExecutorDriverImpl_stop, METH_NOARGS,
"Stop the driver, disconnecting from Mesos"},
{"abort", (PyCFunction) MesosExecutorDriverImpl_abort, METH_NOARGS,
"Abort the driver, disallowing calls from and to the driver"},
@@ -223,21 +223,14 @@ PyObject* MesosExecutorDriverImpl_start(
}
-PyObject* MesosExecutorDriverImpl_stop(MesosExecutorDriverImpl* self,
- PyObject* args)
+PyObject* MesosExecutorDriverImpl_stop(MesosExecutorDriverImpl* self)
{
if (self->driver == NULL) {
PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is NULL");
return NULL;
}
- bool failover = false;
-
- if (!PyArg_ParseTuple(args, "b", &failover)) {
- return NULL;
- }
-
- Status status = self->driver->stop(failover);
+ Status status = self->driver->stop();
return PyInt_FromLong(status); // Sets an exception if creating the int fails
}
Modified: incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.hpp (original)
+++ incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.hpp Wed Jan 11 22:28:41 2012
@@ -83,8 +83,7 @@ int MesosExecutorDriverImpl_clear(MesosE
// MesosExecutorDriverImpl methods
PyObject* MesosExecutorDriverImpl_start(MesosExecutorDriverImpl* self);
-PyObject* MesosExecutorDriverImpl_stop(MesosExecutorDriverImpl* self,
- PyObject* args);
+PyObject* MesosExecutorDriverImpl_stop(MesosExecutorDriverImpl* self);
PyObject* MesosExecutorDriverImpl_abort(MesosExecutorDriverImpl* self);
Modified: incubator/mesos/trunk/src/python/src/mesos.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/src/mesos.py?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/src/mesos.py (original)
+++ incubator/mesos/trunk/src/python/src/mesos.py Wed Jan 11 22:28:41 2012
@@ -38,12 +38,12 @@ class Scheduler:
# in mock objects for tests.
class SchedulerDriver:
def start(self): pass
- def stop(self, failover = False): pass
+ def stop(self, failover=False): pass
def abort(self) : pass
def join(self): pass
def run(self): pass
def requestResources(self, requests): pass
- def launchTasks(self, offerId, tasks, filters = None): pass
+ def launchTasks(self, offerId, tasks, filters=None): pass
def killTask(self, taskId): pass
def reviveOffers(self): pass
def sendFrameworkMessage(self, slaveId, executorId, data): pass
@@ -68,7 +68,7 @@ class Executor:
# in mock objects for tests.
class ExecutorDriver:
def start(self): pass
- def stop(self, failover = False): pass
+ def stop(self): pass
def abort(self): pass
def join(self): pass
def run(self): pass
Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Wed Jan 11 22:28:41 2012
@@ -281,7 +281,8 @@ protected:
invoke(bind(&Scheduler::statusUpdate, sched, driver, status));
- if (pid) {
+ // Send a status update acknowledgement ONLY if not aborted!
+ if (!aborted && pid) {
// Acknowledge the message (we do this last, after we invoked
// the scheduler, if we did at all, in case it causes a crash,
// since this way the message might get resent/routed after the
@@ -409,14 +410,14 @@ protected:
if (!connected) {
VLOG(1) << "Ignoring launch tasks message as master is disconnected";
// NOTE: Reply to the framework with TASK_LOST messages for each
- // task. This is a hack for now, to not let the scheduler believe the
- // tasks are forever in PENDING state, when actually the master
- // never received the launchTask message. Also, realize that this
- // hack doesn't capture the case when the scheduler process sends it
- // but the master never receives it (message lost, master failover etc).
- // In the future, this should be solved by the replicated log and timeouts.
+ // task. This is a hack for now, to not let the scheduler
+ // believe the tasks are forever in PENDING state, when actually
+ // the master never received the launchTask message. Also,
+ // realize that this hack doesn't capture the case when the
+ // scheduler process sends it but the master never receives it
+ // (message lost, master failover etc). In the future, this
+ // should be solved by the replicated log and timeouts.
foreach (const TaskDescription& task, tasks) {
- VLOG(1) << "Sending TASK_LOST update for task" << task.task_id().value();
StatusUpdate update;
update.mutable_framework_id()->MergeFrom(frameworkId);
TaskStatus* status = update.mutable_status();
@@ -437,10 +438,18 @@ protected:
message.mutable_filters()->MergeFrom(filters);
foreach (const TaskDescription& task, tasks) {
- VLOG(1) << "Launching task id: " << task.task_id().value() << " name: " << task.name();
// Keep only the slave PIDs where we run tasks so we can send
// framework messages directly.
- savedSlavePids[task.slave_id()] = savedOffers[offerId][task.slave_id()];
+ if (savedOffers.count(offerId) > 0) {
+ if (savedOffers[offerId].count(task.slave_id()) > 0) {
+ savedSlavePids[task.slave_id()] =
+ savedOffers[offerId][task.slave_id()];
+ } else {
+ VLOG(1) << "Attempting to launch a task with the wrong slave id";
+ }
+ } else {
+ VLOG(1) << "Attempting to launch a task with an unknown offer";
+ }
message.add_tasks()->MergeFrom(task);
}
@@ -464,8 +473,8 @@ protected:
}
void sendFrameworkMessage(const SlaveID& slaveId,
- const ExecutorID& executorId,
- const string& data)
+ const ExecutorID& executorId,
+ const string& data)
{
if (!connected) {
VLOG(1) << "Ignoring send framework message as master is disconnected";
Modified: incubator/mesos/trunk/src/slave/constants.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/constants.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/constants.hpp (original)
+++ incubator/mesos/trunk/src/slave/constants.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#ifndef __SLAVE_CONSTANTS_HPP__
#define __SLAVE_CONSTANTS_HPP__
Modified: incubator/mesos/trunk/src/slave/http.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/http.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/http.cpp (original)
+++ incubator/mesos/trunk/src/slave/http.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#include <iomanip>
#include <sstream>
#include <string>
@@ -31,9 +49,9 @@ namespace slave {
JSON::Object model(const Resources& resources)
{
// TODO(benh): Add all of the resources.
- Resource::Scalar none;
- Resource::Scalar cpus = resources.get("cpus", none);
- Resource::Scalar mem = resources.get("mem", none);
+ Value::Scalar none;
+ Value::Scalar cpus = resources.get("cpus", none);
+ Value::Scalar mem = resources.get("mem", none);
JSON::Object object;
object.values["cpus"] = cpus.value();
@@ -150,7 +168,7 @@ Promise<HttpResponse> stats(
JSON::render(out, object);
HttpOKResponse response;
- response.headers["Content-Type"] = "text/x-json";
+ response.headers["Content-Type"] = "application/json";
response.headers["Content-Length"] = utils::stringify(out.str().size());
response.body = out.str().data();
return response;
@@ -185,7 +203,7 @@ Promise<HttpResponse> state(
JSON::render(out, object);
HttpOKResponse response;
- response.headers["Content-Type"] = "text/x-json";
+ response.headers["Content-Type"] = "application/json";
response.headers["Content-Length"] = utils::stringify(out.str().size());
response.body = out.str().data();
return response;
Modified: incubator/mesos/trunk/src/slave/http.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/http.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/http.hpp (original)
+++ incubator/mesos/trunk/src/slave/http.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#ifndef __SLAVE_HTTP_HPP__
#define __SLAVE_HTTP_HPP__
Modified: incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp Wed Jan 11 22:28:41 2012
@@ -294,7 +294,7 @@ void LxcIsolationModule::resourcesChange
string property;
uint64_t value;
- double cpu = resources.get("cpu", Resource::Scalar()).value();
+ double cpu = resources.get("cpu", Value::Scalar()).value();
int32_t cpu_shares = max(CPU_SHARES_PER_CPU * (int32_t) cpu, MIN_CPU_SHARES);
property = "cpu.shares";
@@ -306,7 +306,7 @@ void LxcIsolationModule::resourcesChange
return;
}
- double mem = resources.get("mem", Resource::Scalar()).value();
+ double mem = resources.get("mem", Value::Scalar()).value();
int64_t limit_in_bytes = max((int64_t) mem, MIN_MEMORY_MB) * 1024LL * 1024LL;
property = "memory.limit_in_bytes";
@@ -377,7 +377,7 @@ vector<string> LxcIsolationModule::getCo
std::ostringstream out;
- double cpu = resources.get("cpu", Resource::Scalar()).value();
+ double cpu = resources.get("cpu", Value::Scalar()).value();
int32_t cpu_shares = max(CPU_SHARES_PER_CPU * (int32_t) cpu, MIN_CPU_SHARES);
options.push_back("-s");
@@ -386,7 +386,7 @@ vector<string> LxcIsolationModule::getCo
out.str("");
- double mem = resources.get("mem", Resource::Scalar()).value();
+ double mem = resources.get("mem", Value::Scalar()).value();
int64_t limit_in_bytes = max((int64_t) mem, MIN_MEMORY_MB) * 1024LL * 1024LL;
options.push_back("-s");
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Jan 11 22:28:41 2012
@@ -92,6 +92,9 @@ Slave::Slave(const Configuration& _conf,
resources =
Resources::parse(conf.get<string>("resources", "cpus:1;mem:1024"));
+ attributes =
+ Attributes::parse(conf.get<string>("attributes", ""));
+
initialize();
}
@@ -277,10 +280,11 @@ void Slave::operator () ()
}
// Initialize slave info.
+ info.set_hostname(hostname);
info.set_webui_hostname(webui_hostname);
info.set_webui_port(conf.get<int>("webui_port", 8081));
- info.set_hostname(hostname);
info.mutable_resources()->MergeFrom(resources);
+ info.mutable_attributes()->MergeFrom(attributes);
// Spawn and initialize the isolation module.
// TODO(benh): Seems like the isolation module should really be
@@ -780,6 +784,7 @@ void Slave::registerExecutor(const Frame
foreachvalue (const TaskDescription& task, executor->queuedTasks) {
stats.tasks[TASK_STARTING]++;
+
RunTaskMessage message;
message.mutable_framework_id()->MergeFrom(framework->id);
message.mutable_framework()->MergeFrom(framework->info);
@@ -1388,9 +1393,12 @@ string Slave::createUniqueWorkDirectory(
LOG(INFO) << "Generating a unique work directory for executor '"
<< executorId << "' of framework " << frameworkId;
- string workDir = "work"; // No relevant conf options set.
+ string workDir = "work"; // Default work directory.
+
+ // Now look for configured work directory.
Option<string> option = conf.get("work_dir");
- if (!option.isSome()) {
+ if (option.isNone()) {
+ // Okay, then look for a home directory instead.
option = conf.get("home");
if (option.isSome()) {
workDir = option.get() + "/work";
@@ -1399,7 +1407,6 @@ string Slave::createUniqueWorkDirectory(
workDir = option.get();
}
-
std::ostringstream out(std::ios_base::app | std::ios_base::out);
out << workDir << "/slaves/" << id
<< "/frameworks/" << frameworkId
@@ -1415,11 +1422,11 @@ string Slave::createUniqueWorkDirectory(
for (int i = 0; i < INT_MAX; i++) {
out << i;
- if (opendir(out.str().c_str()) == NULL && errno == ENOENT)
+ DIR* d = opendir(out.str().c_str());
+ if (d == NULL && errno == ENOENT) {
break;
-
- // TODO(benh): Does one need to do any sort of closedir?
-
+ }
+ closedir(d);
out.str(dir);
}
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Jan 11 22:28:41 2012
@@ -26,6 +26,7 @@
#include "slave/http.hpp"
#include "slave/isolation_module.hpp"
+#include "common/attributes.hpp"
#include "common/resources.hpp"
#include "common/hashmap.hpp"
#include "common/type_utils.hpp"
@@ -162,6 +163,7 @@ private:
UPID master;
Resources resources;
+ Attributes attributes;
hashmap<FrameworkID, Framework*> frameworks;
Modified: incubator/mesos/trunk/src/tests/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/Makefile.in?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/Makefile.in (original)
+++ incubator/mesos/trunk/src/tests/Makefile.in Wed Jan 11 22:28:41 2012
@@ -148,7 +148,8 @@ TESTS_OBJ = main.o utils.o master_tests.
protobuf_io_tests.o lxc_isolation_tests.o utils_tests.o \
jvm.o zookeeper_server.o base_zookeeper_test.o \
zookeeper_server_tests.o zookeeper_tests.o \
- url_processor_tests.o killtree_tests.o exception_tests.o
+ url_processor_tests.o killtree_tests.o exception_tests.o \
+ values_tests.o attributes_test.o
ALLTESTS_EXE = $(BINDIR)/tests/all-tests
Modified: incubator/mesos/trunk/src/tests/base_zookeeper_test.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/base_zookeeper_test.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/base_zookeeper_test.cpp (original)
+++ incubator/mesos/trunk/src/tests/base_zookeeper_test.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#include <signal.h>
#include <queue>
Modified: incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp (original)
+++ incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#ifndef __BASE_ZOOKEEPER_TEST_HPP__
#define __BASE_ZOOKEEPER_TEST_HPP__
Modified: incubator/mesos/trunk/src/tests/exception_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/exception_tests.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/exception_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/exception_tests.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#include <gmock/gmock.h>
#include <mesos/executor.hpp>
Modified: incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#include <gmock/gmock.h>
#include <mesos/executor.hpp>
Modified: incubator/mesos/trunk/src/tests/jvm.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/jvm.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/jvm.cpp (original)
+++ incubator/mesos/trunk/src/tests/jvm.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#include <jni.h>
#include <stdarg.h>
Modified: incubator/mesos/trunk/src/tests/jvm.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/jvm.hpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/jvm.hpp (original)
+++ incubator/mesos/trunk/src/tests/jvm.hpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#ifndef __TESTING_JVM_HPP__
#define __TESTING_JVM_HPP__
Modified: incubator/mesos/trunk/src/tests/killtree_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/killtree_tests.cpp?rev=1230289&r1=1230288&r2=1230289&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/killtree_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/killtree_tests.cpp Wed Jan 11 22:28:41 2012
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#include <gtest/gtest.h>
#include "tests/external_test.hpp"