You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/12/04 01:25:20 UTC

[1/4] git commit: Fixed MESOS-858: Ignore launch / kill requests from non-leading masters.

Updated Branches:
  refs/heads/master 4173ec935 -> d0cb03f95


Fixed MESOS-858: Ignore launch / kill requests from non-leading masters.

Review: https://reviews.apache.org/r/15952


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/52c286d0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/52c286d0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/52c286d0

Branch: refs/heads/master
Commit: 52c286d01d3a1e719ab8265664fadc27207bf3b2
Parents: b9d604d
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Mon Dec 2 16:15:48 2013 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Tue Dec 3 14:20:08 2013 -0800

----------------------------------------------------------------------
 src/slave/slave.cpp | 27 +++++++++++++++++++--------
 src/slave/slave.hpp |  6 +++++-
 2 files changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/52c286d0/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8b69da1..75d9e5d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -711,20 +711,22 @@ Future<bool> Slave::unschedule(const string& path)
 // TODO(vinod): Instead of crashing the slave on checkpoint errors,
 // send TASK_LOST to the framework.
 void Slave::runTask(
+    const UPID& from,
     const FrameworkInfo& frameworkInfo,
     const FrameworkID& frameworkId,
     const string& pid,
     const TaskInfo& task)
 {
-  // TODO(bmahler): Consider ignoring requests not originating from the
-  // expected master.
+  if (!master.isSome() || from != master.get()) {
+    LOG(WARNING) << "Ignoring run task message from " << from
+                 << " because it is not the expected master: "
+                 << (master.isSome() ? master.get() : "None");
+    return;
+  }
 
   LOG(INFO) << "Got assigned task " << task.task_id()
             << " for framework " << frameworkId;
 
-  // TODO(vinod): These ignored tasks should be consolidated by
-  // the master when the slave re-registers.
-
   if (!(task.slave_id() == info.id())) {
     LOG(WARNING) << "Slave " << info.id() << " ignoring task " << task.task_id()
                  << " because it was intended for old slave " << task.slave_id();
@@ -735,6 +737,7 @@ void Slave::runTask(
         state == RUNNING || state == TERMINATING)
     << state;
 
+  // TODO(bmahler): Also ignore if we're DISCONNECTED.
   if (state == RECOVERING || state == TERMINATING) {
     LOG(WARNING) << "Ignoring task " << task.task_id()
                  << " because the slave is " << state;
@@ -986,10 +989,17 @@ void Slave::_runTask(
 }
 
 
-void Slave::killTask(const FrameworkID& frameworkId, const TaskID& taskId)
+void Slave::killTask(
+    const UPID& from,
+    const FrameworkID& frameworkId,
+    const TaskID& taskId)
 {
-  // TODO(bmahler): Consider ignoring requests not originating from the
-  // expected master.
+  if (!master.isSome() || from != master.get()) {
+    LOG(WARNING) << "Ignoring kill task message from " << from
+                 << " because it is not the expected master: "
+                 << (master.isSome() ? master.get() : "None");
+    return;
+  }
 
   LOG(INFO) << "Asked to kill task " << taskId
             << " of framework " << frameworkId;
@@ -998,6 +1008,7 @@ void Slave::killTask(const FrameworkID& frameworkId, const TaskID& taskId)
         state == RUNNING || state == TERMINATING)
     << state;
 
+  // TODO(bmahler): Also ignore if we're DISCONNECTED.
   if (state == RECOVERING || state == TERMINATING) {
     LOG(WARNING) << "Cannot kill task " << taskId
                  << " of framework " << frameworkId

http://git-wip-us.apache.org/repos/asf/mesos/blob/52c286d0/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 2d093a3..71fa4f0 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -95,6 +95,7 @@ public:
   void doReliableRegistration();
 
   void runTask(
+      const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const FrameworkID& frameworkId,
       const std::string& pid,
@@ -109,7 +110,10 @@ public:
 
   Future<bool> unschedule(const std::string& path);
 
-  void killTask(const FrameworkID& frameworkId, const TaskID& taskId);
+  void killTask(
+      const process::UPID& from,
+      const FrameworkID& frameworkId,
+      const TaskID& taskId);
 
   void shutdownFramework(
       const process::UPID& from,


[4/4] git commit: Fixed cgroups code to use processes rather than threads.

Posted by bm...@apache.org.
Fixed cgroups code to use processes rather than threads.

The cgroup 'tasks' file lists the threads in a cgroup.
We should use the 'cgroup.procs' file which lists processes.

See: https://issues.apache.org/jira/browse/MESOS-859

From: Ian Downes <ia...@gmail.com>
Review: https://reviews.apache.org/r/15954


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d0cb03f9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d0cb03f9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d0cb03f9

Branch: refs/heads/master
Commit: d0cb03f9535ae1b6997f60f4de50ca96f6b19e70
Parents: fb9edfd
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Dec 3 14:24:15 2013 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Tue Dec 3 14:24:15 2013 -0800

----------------------------------------------------------------------
 src/linux/cgroups.cpp       | 25 +++++++++++++++----------
 src/linux/cgroups.hpp       |  2 +-
 src/tests/cgroups_tests.cpp |  4 ++--
 3 files changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d0cb03f9/src/linux/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.cpp b/src/linux/cgroups.cpp
index 5a95e75..19ab1f3 100644
--- a/src/linux/cgroups.cpp
+++ b/src/linux/cgroups.cpp
@@ -829,9 +829,9 @@ Try<Nothing> kill(
     return error.get();
   }
 
-  Try<set<pid_t> > pids = tasks(hierarchy, cgroup);
+  Try<set<pid_t> > pids = processes(hierarchy, cgroup);
   if (pids.isError()) {
-    return Error("Failed to get tasks of cgroup: " + pids.error());
+    return Error("Failed to get processes of cgroup: " + pids.error());
   }
 
   foreach (pid_t pid, pids.get()) {
@@ -894,14 +894,19 @@ Try<bool> exists(
 }
 
 
-Try<set<pid_t> > tasks(const string& hierarchy, const string& cgroup)
+Try<set<pid_t> > processes(const string& hierarchy, const string& cgroup)
 {
-  Try<string> value = cgroups::read(hierarchy, cgroup, "tasks");
+  // Note: (from cgroups/cgroups.txt documentation)
+  // cgroup.procs: list of thread group IDs in the cgroup. This list is not
+  // guaranteed to be sorted or free of duplicate TGIDs, and userspace should
+  // sort/uniquify the list if this property is required.
+  Try<string> value = cgroups::read(hierarchy, cgroup, "cgroup.procs");
   if (value.isError()) {
-    return Error("Failed to read cgroups control 'tasks': " + value.error());
+    return Error("Failed to read cgroups control 'cgroup.procs': " + value.error());
   }
 
-  // Parse the value read from the control file.
+  // Parse the values read from the control file and insert into a set. This
+  // ensures they are unique (and also sorted).
   set<pid_t> pids;
   std::istringstream ss(value.get());
   ss >> std::dec;
@@ -1255,9 +1260,9 @@ private:
       // make sure that the freezer can finish.
       // TODO(jieyu): This code can be removed in the future as the newer
       // version of the kernel solves this problem (e.g. Linux-3.2.0).
-      Try<set<pid_t> > pids = tasks(hierarchy, cgroup);
+      Try<set<pid_t> > pids = processes(hierarchy, cgroup);
       if (pids.isError()) {
-        promise.fail("Failed to get tasks of cgroup: " + pids.error());
+        promise.fail("Failed to get processes of cgroup: " + pids.error());
         terminate(self());
         return;
       }
@@ -1453,9 +1458,9 @@ protected:
 private:
   void check(unsigned int attempt = 0)
   {
-    Try<set<pid_t> > pids = tasks(hierarchy, cgroup);
+    Try<set<pid_t> > pids = processes(hierarchy, cgroup);
     if (pids.isError()) {
-      promise.fail("Failed to get tasks of cgroup: " + pids.error());
+      promise.fail("Failed to get processes of cgroup: " + pids.error());
       terminate(self());
       return;
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0cb03f9/src/linux/cgroups.hpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.hpp b/src/linux/cgroups.hpp
index 6f60c43..cefa476 100644
--- a/src/linux/cgroups.hpp
+++ b/src/linux/cgroups.hpp
@@ -256,7 +256,7 @@ Try<bool> exists(
 // @param   hierarchy   Path to the hierarchy root.
 // @param   cgroup      Path to the cgroup relative to the hierarchy root.
 // @return  The set of process ids.
-Try<std::set<pid_t> > tasks(
+Try<std::set<pid_t> > processes(
     const std::string& hierarchy,
     const std::string& cgroup);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0cb03f9/src/tests/cgroups_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/cgroups_tests.cpp b/src/tests/cgroups_tests.cpp
index bc5f961..0e9316d 100644
--- a/src/tests/cgroups_tests.cpp
+++ b/src/tests/cgroups_tests.cpp
@@ -338,7 +338,7 @@ TEST_F(CgroupsAnyHierarchyTest, ROOT_CGROUPS_NestedCgroups)
 
 TEST_F(CgroupsAnyHierarchyTest, ROOT_CGROUPS_Tasks)
 {
-  Try<std::set<pid_t> > pids = cgroups::tasks(hierarchy, "/");
+  Try<std::set<pid_t> > pids = cgroups::processes(hierarchy, "/");
   ASSERT_SOME(pids);
   EXPECT_NE(0u, pids.get().count(1));
   EXPECT_NE(0u, pids.get().count(::getpid()));
@@ -370,7 +370,7 @@ TEST_F(CgroupsAnyHierarchyTest, ROOT_CGROUPS_Write)
     ASSERT_SOME(
         cgroups::write(hierarchy, TEST_CGROUPS_ROOT, "tasks", stringify(pid)));
 
-    Try<std::set<pid_t> > pids = cgroups::tasks(hierarchy, TEST_CGROUPS_ROOT);
+    Try<std::set<pid_t> > pids = cgroups::processes(hierarchy, TEST_CGROUPS_ROOT);
     ASSERT_SOME(pids);
 
     EXPECT_NE(0u, pids.get().count(pid));


[3/4] git commit: Fixed Group to retry if authentication fails due to retryable errors.

Posted by bm...@apache.org.
Fixed Group to retry if authentication fails due to retryable errors.

See summary.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/15706


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fb9edfd0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fb9edfd0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fb9edfd0

Branch: refs/heads/master
Commit: fb9edfd04ca4dfae5c916c78dedbce86215b0a74
Parents: 52c286d
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Dec 3 14:21:22 2013 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Tue Dec 3 14:21:22 2013 -0800

----------------------------------------------------------------------
 src/tests/group_tests.cpp |  60 ++++++++
 src/zookeeper/group.cpp   | 304 ++++++++++++++++++++++++-----------------
 src/zookeeper/group.hpp   |  38 ++++--
 3 files changed, 262 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fb9edfd0/src/tests/group_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/group_tests.cpp b/src/tests/group_tests.cpp
index fb4c2f1..957256e 100644
--- a/src/tests/group_tests.cpp
+++ b/src/tests/group_tests.cpp
@@ -23,10 +23,12 @@
 #include <string>
 
 #include <process/future.hpp>
+#include <process/gmock.hpp>
 #include <process/gtest.hpp>
 
 #include <stout/gtest.hpp>
 #include <stout/option.hpp>
+#include <stout/os.hpp>
 
 #include "tests/zookeeper.hpp"
 
@@ -37,9 +39,11 @@ using namespace mesos::internal;
 using namespace mesos::internal::tests;
 
 using zookeeper::Group;
+using zookeeper::GroupProcess;
 
 using process::Future;
 
+using testing::_;
 
 class GroupTest : public ZooKeeperTest {};
 
@@ -310,3 +314,59 @@ TEST_F(GroupTest, GroupPathWithRestrictivePerms)
 
   AWAIT_READY(successGroup.join("succeed"));
 }
+
+
+// Verifies that the Group operations can recover from retryable
+// errors caused by session expiration. This test does not guarantee
+// but rather, attempts to probabilistically trigger the retries with
+// repeated session expirations.
+TEST_F(GroupTest, RetryableErrors)
+{
+  Future<Nothing> connected = FUTURE_DISPATCH(_, &GroupProcess::connected);
+
+  zookeeper::Authentication auth("digest", "creator:creator");
+  Group group(server->connectString(), NO_TIMEOUT, "/test/", auth);
+
+  // Wait for Group to connect to get hold of the session.
+  AWAIT_READY(connected);
+  Future<Option<int64_t> > session = group.session();
+  AWAIT_READY(session);
+  ASSERT_SOME(session.get());
+
+  // We repeatedly expire the session while Group operations are
+  // on-going. This causes retries of authenticate() and group
+  // create().
+  connected = FUTURE_DISPATCH(_, &GroupProcess::connected);
+  server->expireSession(session.get().get());
+
+  Future<Group::Membership> membership = group.join("hello world");
+
+  // Wait for Group to connect to get hold of the session.
+  AWAIT_READY(connected);
+  session = group.session();
+  AWAIT_READY(session);
+  ASSERT_SOME(session.get());
+  connected = FUTURE_DISPATCH(_, &GroupProcess::connected);
+  server->expireSession(session.get().get());
+
+  AWAIT_READY(membership);
+
+  // Wait for Group to connect to get hold of the session.
+  AWAIT_READY(connected);
+  session = group.session();
+  AWAIT_READY(session);
+  ASSERT_SOME(session.get());
+  connected = FUTURE_DISPATCH(_, &GroupProcess::connected);
+  server->expireSession(session.get().get());
+
+  Future<bool> cancellation = group.cancel(membership.get());
+
+  // Wait for Group to connect to get hold of the session.
+  AWAIT_READY(connected);
+  session = group.session();
+  AWAIT_READY(session);
+  ASSERT_SOME(session.get());
+  server->expireSession(session.get().get());
+
+  AWAIT_READY(cancellation);
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/fb9edfd0/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index 5218286..2ddc65e 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -68,7 +68,7 @@ GroupProcess::GroupProcess(
         : ZOO_OPEN_ACL_UNSAFE),
     watcher(NULL),
     zk(NULL),
-    state(DISCONNECTED),
+    state(CONNECTING),
     retrying(false)
 {}
 
@@ -87,7 +87,7 @@ GroupProcess::GroupProcess(
         : ZOO_OPEN_ACL_UNSAFE),
     watcher(NULL),
     zk(NULL),
-    state(DISCONNECTED),
+    state(CONNECTING),
     retrying(false)
 {}
 
@@ -116,9 +116,7 @@ void GroupProcess::initialize()
 
 Future<Group::Membership> GroupProcess::join(const string& data)
 {
-  if (error.isSome()) {
-    return Failure(error.get());
-  } else if (state != CONNECTED) {
+  if (state != READY) {
     Join* join = new Join(data);
     pending.joins.push(join);
     return join->promise.future();
@@ -152,9 +150,7 @@ Future<Group::Membership> GroupProcess::join(const string& data)
 
 Future<bool> GroupProcess::cancel(const Group::Membership& membership)
 {
-  if (error.isSome()) {
-    return Failure(error.get());
-  } else if (owned.count(membership.id()) == 0) {
+  if (owned.count(membership.id()) == 0) {
     // TODO(benh): Should this be an error? Right now a user can't
     // differentiate when 'false' means they can't cancel because it's
     // not owned or because it's already been cancelled (explicitly by
@@ -163,7 +159,7 @@ Future<bool> GroupProcess::cancel(const Group::Membership& membership)
     return false;
   }
 
-  if (state != CONNECTED) {
+  if (state != READY) {
     Cancel* cancel = new Cancel(membership);
     pending.cancels.push(cancel);
     return cancel->promise.future();
@@ -193,9 +189,7 @@ Future<bool> GroupProcess::cancel(const Group::Membership& membership)
 
 Future<string> GroupProcess::data(const Group::Membership& membership)
 {
-  if (error.isSome()) {
-    return Failure(error.get());
-  } else if (state != CONNECTED) {
+  if (state != READY) {
     Data* data = new Data(membership);
     pending.datas.push(data);
     return data->promise.future();
@@ -222,9 +216,7 @@ Future<string> GroupProcess::data(const Group::Membership& membership)
 Future<set<Group::Membership> > GroupProcess::watch(
     const set<Group::Membership>& expected)
 {
-  if (error.isSome()) {
-    return Failure(error.get());
-  } else if (state != CONNECTED) {
+  if (state != READY) {
     Watch* watch = new Watch(expected);
     pending.watches.push(watch);
     return watch->promise.future();
@@ -241,17 +233,29 @@ Future<set<Group::Membership> > GroupProcess::watch(
   // membership "roll call" for each watch in order to make sure all
   // causal relationships are satisfied.
 
-  memberships.isSome() || cache();
-
-  if (memberships.isNone()) { // Try again later.
-    if (!retrying) {
-      delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
-      retrying = true;
+  if (memberships.isNone()) {
+    Try<bool> cached = cache();
+
+    if (cached.isError()) {
+      // Non-retryable error.
+      return Failure(cached.error());
+    } else if (!cached.get()) {
+      CHECK(memberships.isNone());
+
+      // Try again later.
+      if (!retrying) {
+        delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
+        retrying = true;
+      }
+      Watch* watch = new Watch(expected);
+      pending.watches.push(watch);
+      return watch->promise.future();
     }
-    Watch* watch = new Watch(expected);
-    pending.watches.push(watch);
-    return watch->promise.future();
-  } else if (memberships.get() == expected) { // Just wait for updates.
+  }
+
+  CHECK_SOME(memberships);
+
+  if (memberships.get() == expected) { // Just wait for updates.
     Watch* watch = new Watch(expected);
     pending.watches.push(watch);
     return watch->promise.future();
@@ -263,11 +267,7 @@ Future<set<Group::Membership> > GroupProcess::watch(
 
 Future<Option<int64_t> > GroupProcess::session()
 {
-  if (error.isSome()) {
-    Promise<Option<int64_t> > promise;
-    promise.fail(error.get());
-    return promise.future();
-  } else if (state != CONNECTED) {
+  if (state == CONNECTING) {
     return None();
   }
 
@@ -277,64 +277,87 @@ Future<Option<int64_t> > GroupProcess::session()
 
 void GroupProcess::connected(bool reconnect)
 {
-  if (!reconnect) {
-    // Authenticate if necessary (and we are connected for the first
-    // time, or after a session expiration).
-    if (auth.isSome()) {
-      LOG(INFO) << "Authenticating with ZooKeeper using " << auth.get().scheme;
+  LOG(INFO) << "Group process (" << self() << ") "
+            << (reconnect ? "reconnected" : "connected") << " to ZooKeeper";
 
-      int code = zk->authenticate(auth.get().scheme, auth.get().credentials);
+  state = CONNECTED;
 
-      if (code != ZOK) { // TODO(benh): Authentication retries?
-        error = "Failed to authenticate with ZooKeeper: " + zk->message(code);
-        abort(); // Cancels everything pending.
-        return;
-      }
-    }
+  // Cancel and cleanup the reconnect timer (if necessary).
+  if (timer.isSome()) {
+    Timer::cancel(timer.get());
+    timer = None();
+  }
 
-    // Create znode path (including intermediate znodes) as necessary.
-    CHECK(znode.size() == 0 || znode.at(znode.size() - 1) != '/');
-
-    LOG(INFO) << "Trying to create path '" << znode << "' in ZooKeeper";
-
-    int code = zk->create(znode, "", acl, 0, NULL, true);
-
-    // We fail all non-OK return codes except ZNONODEEXISTS (since
-    // that means the path we were trying to create exists) and
-    // ZNOAUTH (since it's possible that the ACLs on 'dirname(znode)'
-    // don't allow us to create a child znode but we are allowed to
-    // create children of 'znode' itself, which will be determined
-    // when we first do a Group::join). Note that it's also possible
-    // we got back a ZNONODE because we could not create one of the
-    // intermediate znodes (in which case we'll abort in the 'else'
-    // below since ZNONODE is non-retryable). TODO(benh): Need to
-    // check that we also can put a watch on the children of 'znode'.
-    if (code == ZINVALIDSTATE ||
-        (code != ZOK &&
-         code != ZNODEEXISTS &&
-         code != ZNOAUTH &&
-         zk->retryable(code))) {
-      CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
-      return; // Try again later.
-    } else if (code != ZOK && code != ZNODEEXISTS && code != ZNOAUTH) {
-      error =
-        "Failed to create '" + znode + "' in ZooKeeper: " + zk->message(code);
-      abort(); // Cancels everything pending.
-      return;
+  // Sync group operations (and set up the group on ZK).
+  Try<bool> synced = sync();
+
+  if (synced.isError()) {
+    // Non-retryable error. Abort.
+    abort(synced.error());
+  } else if (!synced.get()) {
+    // Retryable error.
+    if (!retrying) {
+      delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
+      retrying = true;
     }
-  } else {
-    LOG(INFO) << "Group process (" << self() << ")  reconnected to Zookeeper";
+  }
+}
+
+
+Try<bool> GroupProcess::authenticate()
+{
+  CHECK_EQ(state, CONNECTED);
+
+  // Authenticate if necessary.
+  if (auth.isSome()) {
+    LOG(INFO) << "Authenticating with ZooKeeper using " << auth.get().scheme;
+
+    int code = zk->authenticate(auth.get().scheme, auth.get().credentials);
 
-    // Cancel and cleanup the reconnect timer (if necessary).
-    if (timer.isSome()) {
-      Timer::cancel(timer.get());
-      timer = None();
+    if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+      return false;
+    } else if (code != ZOK) {
+      return Error(
+          "Failed to authenticate with ZooKeeper: " + zk->message(code));
     }
   }
 
-  state = CONNECTED;
+  state = AUTHENTICATED;
+  return true;
+}
+
+
+Try<bool> GroupProcess::create()
+{
+  CHECK_EQ(state, AUTHENTICATED);
+
+  // Create znode path (including intermediate znodes) as necessary.
+  CHECK(znode.size() == 0 || znode.at(znode.size() - 1) != '/');
+
+  LOG(INFO) << "Trying to create path '" << znode << "' in ZooKeeper";
+
+  int code = zk->create(znode, "", acl, 0, NULL, true);
 
-  sync(); // Handle pending (and cache memberships).
+  // We fail all non-retryable return codes except ZNONODEEXISTS (
+  // since that means the path we were trying to create exists) and
+  // ZNOAUTH (since it's possible that the ACLs on 'dirname(znode)'
+  // don't allow us to create a child znode but we are allowed to
+  // create children of 'znode' itself, which will be determined
+  // when we first do a Group::join). Note that it's also possible
+  // we got back a ZNONODE because we could not create one of the
+  // intermediate znodes (in which case we'll abort in the 'else'
+  // below since ZNONODE is non-retryable). TODO(benh): Need to
+  // check that we also can put a watch on the children of 'znode'.
+  if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+    CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
+    return false;
+  } else if (code != ZOK && code != ZNODEEXISTS && code != ZNOAUTH) {
+    return Error(
+        "Failed to create '" + znode + "' in ZooKeeper: " + zk->message(code));
+  }
+
+  state = READY;
+  return true;
 }
 
 
@@ -369,8 +392,7 @@ void GroupProcess::timedout(const int64_t& sessionId)
     std::ostringstream error_;
     error_ << "Timed out waiting to reconnect to ZooKeeper (sessionId="
            << std::hex << sessionId << ")";
-    error = error_.str();
-    abort();
+    abort(error_.str());
   }
 }
 
@@ -400,24 +422,27 @@ void GroupProcess::expired()
   // then. We could imagine doing this for owned memberships too, but
   // for now we proactively cancel them above.
 
-  state = DISCONNECTED;
+  state = CONNECTING;
 
   delete CHECK_NOTNULL(zk);
   delete CHECK_NOTNULL(watcher);
   watcher = new ProcessWatcher<GroupProcess>(self());
   zk = new ZooKeeper(servers, timeout, watcher);
-
-  state = CONNECTING;
 }
 
 
 void GroupProcess::updated(const string& path)
 {
-  CHECK(znode == path);
+  CHECK_EQ(znode, path);
+
+  Try<bool> cached = cache(); // Update cache (will invalidate first).
 
-  cache(); // Update cache (will invalidate first).
+  if (cached.isError()) {
+    abort(cached.error()); // Cancel everything pending.
+  } else if (!cached.get()) {
+    CHECK(memberships.isNone());
 
-  if (memberships.isNone()) { // Something changed so we must try again later.
+    // Try again later.
     if (!retrying) {
       delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
       retrying = true;
@@ -442,8 +467,7 @@ void GroupProcess::deleted(const string& path)
 
 Result<Group::Membership> GroupProcess::doJoin(const string& data)
 {
-  CHECK(error.isNone()) << ": " << error.get();
-  CHECK(state == CONNECTED);
+  CHECK_EQ(state, READY);
 
   // Create a new ephemeral node to represent a new member and use the
   // the specified data as it's contents.
@@ -453,7 +477,7 @@ Result<Group::Membership> GroupProcess::doJoin(const string& data)
                         ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
 
   if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
-    CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+    CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
     return None();
   } else if (code != ZOK) {
     return Error(
@@ -484,8 +508,7 @@ Result<Group::Membership> GroupProcess::doJoin(const string& data)
 
 Result<bool> GroupProcess::doCancel(const Group::Membership& membership)
 {
-  CHECK(error.isNone()) << ": " << error.get();
-  CHECK(state == CONNECTED);
+  CHECK_EQ(state, READY);
 
   Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
 
@@ -499,7 +522,7 @@ Result<bool> GroupProcess::doCancel(const Group::Membership& membership)
   int code = zk->remove(path, -1);
 
   if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
-    CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+    CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
     return None();
   } else if (code == ZNONODE) {
     // This can happen because the membership could have expired but
@@ -528,8 +551,7 @@ Result<bool> GroupProcess::doCancel(const Group::Membership& membership)
 
 Result<string> GroupProcess::doData(const Group::Membership& membership)
 {
-  CHECK(error.isNone()) << ": " << error.get();
-  CHECK(state == CONNECTED);
+  CHECK_EQ(state, READY);
 
   Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
 
@@ -545,7 +567,7 @@ Result<string> GroupProcess::doData(const Group::Membership& membership)
   int code = zk->get(path, false, &result, NULL);
 
   if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
-    CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+    CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
     return None();
   } else if (code != ZOK) {
     return Error(
@@ -557,7 +579,7 @@ Result<string> GroupProcess::doData(const Group::Membership& membership)
 }
 
 
-bool GroupProcess::cache()
+Try<bool> GroupProcess::cache()
 {
   // Invalidate first (if it's not already).
   memberships = None();
@@ -568,14 +590,11 @@ bool GroupProcess::cache()
   int code = zk->getChildren(znode, true, &results); // Sets the watch!
 
   if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
-    CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+    CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
     return false;
   } else if (code != ZOK) {
-    error =
-      "Non-retryable error attempting to get children of '" + znode + "'"
-      " in ZooKeeper: " + zk->message(code);
-    abort(); // Cancels everything pending.
-    return false;
+    return Error("Non-retryable error attempting to get children of '" + znode +
+                 "' in ZooKeeper: " + zk->message(code));
   }
 
   // Convert results to sequence numbers.
@@ -651,10 +670,30 @@ void GroupProcess::update()
 }
 
 
-bool GroupProcess::sync()
+Try<bool> GroupProcess::sync()
 {
-  CHECK(error.isNone()) << ": " << error.get();
-  CHECK(state == CONNECTED);
+  LOG(INFO)
+    << "Syncing group operations: queue size (joins, cancels, datas) = ("
+    << pending.joins.size() << ", " << pending.cancels.size() << ", "
+    << pending.datas.size() << ")";
+
+  CHECK_NE(state, CONNECTING);
+
+  // Authenticate with ZK if not already created.
+  if (state == CONNECTED) {
+    Try<bool> authenticated = authenticate();
+    if (authenticated.isError() || !authenticated.get()) {
+      return authenticated;
+    }
+  }
+
+  // Create group base path if not already created.
+  if (state == AUTHENTICATED) {
+    Try<bool> created = create();
+    if (created.isError() || !created.get()) {
+      return created;
+    }
+  }
 
   // Do joins.
   while (!pending.joins.empty()) {
@@ -709,8 +748,10 @@ bool GroupProcess::sync()
   // cancels first through any explicit futures for them rather than
   // watches.
   if (memberships.isNone()) {
-    if (!cache()) {
-      return false; // Try again later (if no error).
+    Try<bool> cached = cache();
+    if (cached.isError() || !cached.get()) {
+      CHECK(memberships.isNone());
+      return cached;
     } else {
       update(); // Update any pending watches.
     }
@@ -722,31 +763,38 @@ bool GroupProcess::sync()
 
 void GroupProcess::retry(const Duration& duration)
 {
-  if (error.isSome() || state != CONNECTED) {
-    retrying = false; // Stop retrying, we'll sync at reconnect (if no error).
-  } else if (error.isNone() && state == CONNECTED) {
-    bool synced = sync(); // Might get another retryable error.
-    if (!synced && error.isNone()) {
-      // Backoff.
-      Seconds seconds = std::min(duration * 2, Duration(Seconds(60)));
-      delay(seconds, self(), &GroupProcess::retry, seconds);
-    } else {
-      retrying = false;
-    }
+  CHECK(retrying);
+
+  // Will reset it to true if another retry is necessary.
+  retrying = false;
+
+  if (state == CONNECTING) {
+    // The delayed retry can be invoked while group is trying to
+    // (re)connect to ZK. In this case we directly return and we will
+    // sync when group is connected to ZK.
+    return;
   }
-}
 
+  Try<bool> synced = sync();
 
-void GroupProcess::abort()
-{
-  CHECK_SOME(error);
+  if (synced.isError()) {
+    // Non-retryable error. Abort.
+    abort(synced.error());
+  } else if (!synced.get()) {
+    // Backoff and keep retrying.
+    retrying = true;
+    Seconds seconds = std::min(duration * 2, Duration(Seconds(60)));
+    delay(seconds, self(), &GroupProcess::retry, seconds);
+  }
+}
 
-  fail(&pending.joins, error.get());
-  fail(&pending.cancels, error.get());
-  fail(&pending.datas, error.get());
-  fail(&pending.watches, error.get());
 
-  error = None();
+void GroupProcess::abort(const string& error)
+{
+  fail(&pending.joins, error);
+  fail(&pending.cancels, error);
+  fail(&pending.datas, error);
+  fail(&pending.watches, error);
 
   // If we decide to abort, make sure we expire the session
   // (cleaning up any ephemeral ZNodes as necessary). We also

http://git-wip-us.apache.org/repos/asf/mesos/blob/fb9edfd0/src/zookeeper/group.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.hpp b/src/zookeeper/group.hpp
index 04068e3..27b2ee9 100644
--- a/src/zookeeper/group.hpp
+++ b/src/zookeeper/group.hpp
@@ -171,15 +171,28 @@ private:
   Result<bool> doCancel(const Group::Membership& membership);
   Result<std::string> doData(const Group::Membership& membership);
 
-  // Attempts to cache the current set of memberships.
-  bool cache();
+  // Returns true if authentication is successful, false if the
+  // failure is retryable and Error otherwise.
+  Try<bool> authenticate();
 
-  // Updates any pending watches.
-  void update();
+  // Creates the group (which means creating its base path) on ZK.
+  // Returns true if successful, false if the failure is retryable
+  // and Error otherwise.
+  Try<bool> create();
+
+  // Attempts to cache the current set of memberships.
+  // Returns true if successful, false if the failure is retryable
+  // and Error otherwise.
+  Try<bool> cache();
 
   // Synchronizes pending operations with ZooKeeper and also attempts
   // to cache the current set of memberships if necessary.
-  bool sync();
+  // Returns true if successful, false if the failure is retryable
+  // and Error otherwise.
+  Try<bool> sync();
+
+  // Updates any pending watches.
+  void update();
 
   // Generic retry method. This mechanism is "generic" in the sense
   // that it is not specific to any particular operation, but rather
@@ -188,12 +201,10 @@ private:
   void retry(const Duration& duration);
 
   // Fails all pending operations.
-  void abort();
+  void abort(const std::string& error);
 
   void timedout(const int64_t& sessionId);
 
-  Option<std::string> error; // Potential non-retryable error.
-
   const std::string servers;
   const Duration timeout;
   const std::string znode;
@@ -205,10 +216,12 @@ private:
   Watcher* watcher;
   ZooKeeper* zk;
 
-  enum State { // ZooKeeper connection state.
-    DISCONNECTED,
-    CONNECTING,
-    CONNECTED,
+  enum State {     // Group connection state.
+    CONNECTING,    // ZooKeeper connecting.
+    CONNECTED,     // ZooKeeper connected but before group setup.
+    AUTHENTICATED, // ZooKeeper connected and authenticated.
+    READY,         // ZooKeeper connected, session authenticated and
+                   // base path for the group created.
   } state;
 
   struct Join
@@ -249,6 +262,7 @@ private:
     std::queue<Watch*> watches;
   } pending;
 
+  // Indicates there is a pending delayed retry.
   bool retrying;
 
   // Expected ZooKeeper sequence numbers (either owned/created by this


[2/4] git commit: Cleaned up some log messages in the slave.

Posted by bm...@apache.org.
Cleaned up some log messages in the slave.

Review: https://reviews.apache.org/r/15951


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b9d604df
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b9d604df
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b9d604df

Branch: refs/heads/master
Commit: b9d604dfe986de1193911869d55382c1c264ebdc
Parents: 4173ec9
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Mon Dec 2 16:18:00 2013 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Tue Dec 3 14:20:08 2013 -0800

----------------------------------------------------------------------
 src/slave/slave.cpp | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b9d604df/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 91afe03..8b69da1 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -432,7 +432,7 @@ void Slave::shutdown(const UPID& from)
   if (from && (!master.isSome() || from != master.get())) {
     LOG(WARNING) << "Ignoring shutdown message from " << from
                  << " because it is not from the registered master: "
-                 << (master.isSome() ? master.get() : "None/Error");
+                 << (master.isSome() ? master.get() : "None");
     return;
   }
 
@@ -532,7 +532,7 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
   if (!master.isSome() || from != master.get()) {
     LOG(WARNING) << "Ignoring registration message from " << from
                  << " because it is not the expected master: "
-                 << (master.isSome() ? master.get() : "NONE/ERROR");
+                 << (master.isSome() ? master.get() : "None");
     return;
   }
 
@@ -582,7 +582,7 @@ void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
   if (!master.isSome() || from != master.get()) {
     LOG(WARNING) << "Ignoring re-registration message from " << from
                  << " because it is not the expected master: "
-                 << (master.isSome() ? master.get() : "NONE/ERROR");
+                 << (master.isSome() ? master.get() : "None");
     return;
   }
 
@@ -1115,7 +1115,7 @@ void Slave::shutdownFramework(
     LOG(WARNING) << "Ignoring shutdown framework message for " << frameworkId
                  << " from " << from
                  << " because it is not from the registered master ("
-                 << (master.isSome() ? master.get() : "NONE/ERROR") << ")";
+                 << (master.isSome() ? master.get() : "None") << ")";
     return;
   }