You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/07/05 22:06:14 UTC

[1/4] mesos git commit: Added test for sending framework information after master failover.

Repository: mesos
Updated Branches:
  refs/heads/master c5e2aae13 -> 6fd4e9a53


Added test for sending framework information after master failover.

We test that the FrameworkInfos for running frameworks are send
from the agent to the master when an agent reregisters.

Review: https://reviews.apache.org/r/49639/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6fd4e9a5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6fd4e9a5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6fd4e9a5

Branch: refs/heads/master
Commit: 6fd4e9a53a808ac199c331c5d193983814fd2229
Parents: 0cb4533
Author: Joerg Schad <jo...@mesosphere.io>
Authored: Tue Jul 5 17:05:43 2016 -0500
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Jul 5 17:06:05 2016 -0500

----------------------------------------------------------------------
 src/tests/master_slave_reconciliation_tests.cpp | 81 ++++++++++++++++++++
 1 file changed, 81 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6fd4e9a5/src/tests/master_slave_reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_slave_reconciliation_tests.cpp b/src/tests/master_slave_reconciliation_tests.cpp
index fc02ca6..701d9a2 100644
--- a/src/tests/master_slave_reconciliation_tests.cpp
+++ b/src/tests/master_slave_reconciliation_tests.cpp
@@ -545,6 +545,87 @@ TEST_F(MasterSlaveReconciliationTest, SlaveReregisterTerminalTask)
   driver.join();
 }
 
+
+// This test verifies that when the slave re-registers, we correctly
+// send the information about actively running frameworks.
+TEST_F(MasterSlaveReconciliationTest, SlaveReregisterActiveFrameworks)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, &containerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task;
+  task.set_name("test task");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  // Send an update right away.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<Nothing> _statusUpdate = FUTURE_DISPATCH(_, &Slave::_statusUpdate);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return()); // Ignore retried update due to update framework.
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(_statusUpdate);
+
+  Future<ReregisterSlaveMessage> reregisterSlave =
+    FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);
+
+  // Simulate a spurious master change event (e.g., due to ZooKeeper
+  // expiration) at the slave to force re-registration.
+  detector.appoint(master.get()->pid);
+
+  // Expect to receive the 'ReregisterSlaveMessage' containing the
+  // active frameworks.
+  AWAIT_READY(reregisterSlave);
+
+  EXPECT_EQ(1u, reregisterSlave.get().active_frameworks().size());
+
+  Clock::pause();
+  Clock::settle();
+
+  AWAIT_READY(status);
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[2/4] mesos git commit: Added support for recovered frameworks.

Posted by vi...@apache.org.
Added support for recovered frameworks.

Previously after a master failover while agents
were reregistering we might not have access
to all FrameworkInfo (which would first be available
once the Framework reregistered). This patch
adds support that once a framework reregisters
we also keep track of recovered FrameworkInfos.

Review: https://reviews.apache.org/r/49607/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3c96ca49
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3c96ca49
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3c96ca49

Branch: refs/heads/master
Commit: 3c96ca497ba2b0a733756f1af5e2dd4b4695d77a
Parents: 90bcedc
Author: Joerg Schad <jo...@mesosphere.io>
Authored: Tue Jul 5 17:05:28 2016 -0500
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Jul 5 17:06:05 2016 -0500

----------------------------------------------------------------------
 src/master/master.cpp       | 54 +++++++++++++++++++++++++++++++++++++---
 src/master/master.hpp       | 10 +++++++-
 src/messages/messages.proto |  1 +
 src/slave/slave.cpp         |  3 ++-
 4 files changed, 63 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96ca49/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 78a8889..007f78e 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -919,6 +919,7 @@ void Master::initialize()
       &ReregisterSlaveMessage::checkpointed_resources,
       &ReregisterSlaveMessage::executor_infos,
       &ReregisterSlaveMessage::tasks,
+      &ReregisterSlaveMessage::frameworks,
       &ReregisterSlaveMessage::completed_frameworks,
       &ReregisterSlaveMessage::version);
 
@@ -4753,6 +4754,7 @@ void Master::reregisterSlave(
     const vector<Resource>& checkpointedResources,
     const vector<ExecutorInfo>& executorInfos,
     const vector<Task>& tasks,
+    const vector<FrameworkInfo>& frameworks,
     const vector<Archive::Framework>& completedFrameworks,
     const string& version)
 {
@@ -4770,6 +4772,7 @@ void Master::reregisterSlave(
                      checkpointedResources,
                      executorInfos,
                      tasks,
+                     frameworks,
                      completedFrameworks,
                      version));
     return;
@@ -4888,7 +4891,7 @@ void Master::reregisterSlave(
 
     // Inform the agent of the master's version of its checkpointed
     // resources and the new framework pids for its tasks.
-    __reregisterSlave(slave, tasks);
+    __reregisterSlave(slave, tasks, frameworks);
 
     return;
   }
@@ -4923,6 +4926,7 @@ void Master::reregisterSlave(
                  checkpointedResources,
                  executorInfos,
                  tasks,
+                 frameworks,
                  completedFrameworks,
                  version,
                  lambda::_1));
@@ -4935,6 +4939,7 @@ void Master::_reregisterSlave(
     const vector<Resource>& checkpointedResources,
     const vector<ExecutorInfo>& executorInfos,
     const vector<Task>& tasks,
+    const vector<FrameworkInfo>& frameworks,
     const vector<Archive::Framework>& completedFrameworks,
     const string& version,
     const Future<bool>& readmit)
@@ -4993,18 +4998,26 @@ void Master::_reregisterSlave(
     LOG(INFO) << "Re-registered agent " << *slave
               << " with " << slave->info.resources();
 
-    __reregisterSlave(slave, tasks);
+    __reregisterSlave(slave, tasks, frameworks);
   }
 }
 
 
-void Master::__reregisterSlave(Slave* slave, const vector<Task>& tasks)
+void Master::__reregisterSlave(
+    Slave* slave,
+    const std::vector<Task>& tasks,
+    const vector<FrameworkInfo>& frameworks)
 {
   CHECK_NOTNULL(slave);
 
   // Send the latest framework pids to the slave.
   hashset<FrameworkID> ids;
 
+  // TODO(joerg84): Remove this after a deprecation cycle starting
+  // with the 1.0 release. It is only required if an older
+  // (pre 1.0 agent) reregisters with a newer master.
+  // In that case the agent does not have the 'frameworks' message
+  // set which is used below to retrieve the framework information.
   foreach (const Task& task, tasks) {
     Framework* framework = getFramework(task.framework_id());
 
@@ -5023,6 +5036,31 @@ void Master::__reregisterSlave(Slave* slave, const vector<Task>& tasks)
     }
   }
 
+  foreach (const FrameworkInfo& frameworkInfo, frameworks) {
+    CHECK(frameworkInfo.has_id());
+    Framework* framework = getFramework(frameworkInfo.id());
+
+    if (framework != nullptr && !ids.contains(framework->id())) {
+      UpdateFrameworkMessage message;
+      message.mutable_framework_id()->MergeFrom(framework->id());
+
+      // TODO(anand): We set 'pid' to UPID() for http frameworks
+      // as 'pid' was made optional in 0.24.0. In 0.25.0, we
+      // no longer have to set pid here for http frameworks.
+      message.set_pid(framework->pid.getOrElse(UPID()));
+
+      send(slave->pid, message);
+
+      ids.insert(framework->id());
+    } else {
+      // The framework hasn't yet reregistered with the master,
+      // hence we store the 'FrameworkInfo' in the recovered list.
+      // TODO(joerg84): Consider recovering this information from
+      // registrar instead of from agents.
+      this->frameworks.recovered[frameworkInfo.id()] = frameworkInfo;
+    }
+  }
+
   // NOTE: Here we always send the message. Slaves whose version are
   // less than 0.22.0 will drop it silently which is OK.
   LOG(INFO) << "Sending updated checkpointed resources "
@@ -6179,6 +6217,11 @@ void Master::addFramework(Framework* framework)
 
   frameworks.registered[framework->id()] = framework;
 
+  // Remove from 'frameworks.recovered' if necessary.
+  if (frameworks.recovered.contains(framework->id())) {
+    frameworks.recovered.erase(framework->id());
+  }
+
   if (framework->pid.isSome()) {
     link(framework->pid.get());
   } else {
@@ -6512,6 +6555,11 @@ void Master::removeFramework(Framework* framework)
   frameworks.registered.erase(framework->id());
   allocator->removeFramework(framework->id());
 
+  // Remove from 'frameworks.recovered' if necessary.
+  if (frameworks.recovered.contains(framework->id())) {
+    frameworks.recovered.erase(framework->id());
+  }
+
   // The completedFramework buffer now owns the framework pointer.
   frameworks.completed.push_back(shared_ptr<Framework>(framework));
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96ca49/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0d054d0..996ff42 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -480,6 +480,7 @@ public:
       const std::vector<Resource>& checkpointedResources,
       const std::vector<ExecutorInfo>& executorInfos,
       const std::vector<Task>& tasks,
+      const std::vector<FrameworkInfo>& frameworks,
       const std::vector<Archive::Framework>& completedFrameworks,
       const std::string& version);
 
@@ -557,6 +558,7 @@ public:
       const std::vector<Resource>& checkpointedResources,
       const std::vector<ExecutorInfo>& executorInfos,
       const std::vector<Task>& tasks,
+      const std::vector<FrameworkInfo>& frameworks,
       const std::vector<Archive::Framework>& completedFrameworks,
       const std::string& version,
       const process::Future<bool>& readmit);
@@ -612,7 +614,8 @@ protected:
 
   void __reregisterSlave(
       Slave* slave,
-      const std::vector<Task>& tasks);
+      const std::vector<Task>& tasks,
+      const std::vector<FrameworkInfo>& frameworks);
 
   // 'authenticate' is the future returned by the authenticator.
   void _authenticate(
@@ -1674,6 +1677,11 @@ private:
       : completed(masterFlags.max_completed_frameworks) {}
 
     hashmap<FrameworkID, Framework*> registered;
+
+    // 'Recovered' contains 'FrameworkInfo's for frameworks which
+    // would otherwise be unknown during recovery after master
+    // failover.
+    hashmap<FrameworkID, FrameworkInfo> recovered;
     boost::circular_buffer<std::shared_ptr<Framework>> completed;
 
     // Principals of frameworks keyed by PID.

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96ca49/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 6d7eccc..f2177ae 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -418,6 +418,7 @@ message ReregisterSlaveMessage {
 
   repeated ExecutorInfo executor_infos = 4;
   repeated Task tasks = 3;
+  repeated FrameworkInfo frameworks = 8;
   repeated Archive.Framework completed_frameworks = 5;
 
   // NOTE: This is a hack for the master to detect the agent's

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96ca49/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index de4cb5a..066c0ee 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1408,10 +1408,11 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     message.mutable_slave()->CopyFrom(info);
 
     foreachvalue (Framework* framework, frameworks) {
+      message.add_frameworks()->CopyFrom(framework->info);
+
       // TODO(bmahler): We need to send the executors for these
       // pending tasks, and we need to send exited events if they
       // cannot be launched: MESOS-1715 MESOS-1720.
-
       typedef hashmap<TaskID, TaskInfo> TaskMap;
       foreachvalue (const TaskMap& tasks, framework->pending) {
         foreachvalue (const TaskInfo& task, tasks) {


Re: [3/4] mesos git commit: Added filtering for orphaned tasks in /state endpoint.

Posted by haosdent <ha...@gmail.com>.
Hi, @neilc Thank you for pop up this. As we discussed in slack, @Joerg is
going to remove this check and add it back after the deprecation cycle.

On Wed, Jul 6, 2016 at 4:24 PM, Neil Conway <ne...@gmail.com> wrote:

> On Wed, Jul 6, 2016 at 12:06 AM,  <vi...@apache.org> wrote:
> > diff --git a/src/master/http.cpp b/src/master/http.cpp
> > index 6b4f85b..debedd4 100644
> > --- a/src/master/http.cpp
> > +++ b/src/master/http.cpp
> > @@ -2498,11 +2498,8 @@ Future<Response> Master::Http::state(
> >              });
> >
> >          // Model all of the orphan tasks.
> > -        // TODO(vinod): Need to filter these tasks based on
> authorization! This
> > -        // is currently not possible because we don't have
> `FrameworkInfo` for
> > -        // these tasks. We need to either store `FrameworkInfo` for
> orphan
> > -        // tasks or persist FrameworkInfo of all frameworks in the
> registry.
> > -        writer->field("orphan_tasks", [this](JSON::ArrayWriter* writer)
> {
> > +        writer->field("orphan_tasks", [this, &tasksApprover](
> > +            JSON::ArrayWriter* writer) {
> >            // Find those orphan tasks.
> >            foreachvalue (const Slave* slave, master->slaves.registered) {
> >              typedef hashmap<TaskID, Task*> TaskMap;
> > @@ -2511,6 +2508,16 @@ Future<Response> Master::Http::state(
> >                  CHECK_NOTNULL(task);
> >                  if (!master->frameworks.registered.contains(
> >                      task->framework_id())) {
> > +                  CHECK(master->frameworks.recovered.contains(
> > +                      task->framework_id()));
>
> This CHECK seems dubious: what if the orphaned task was running on an
> old version of the agent? i.e., a mixed cluster in which the master
> has been updated but the agent has not been.
>
> Neil
>



-- 
Best Regards,
Haosdent Huang

Re: [3/4] mesos git commit: Added filtering for orphaned tasks in /state endpoint.

Posted by Neil Conway <ne...@gmail.com>.
On Wed, Jul 6, 2016 at 12:06 AM,  <vi...@apache.org> wrote:
> diff --git a/src/master/http.cpp b/src/master/http.cpp
> index 6b4f85b..debedd4 100644
> --- a/src/master/http.cpp
> +++ b/src/master/http.cpp
> @@ -2498,11 +2498,8 @@ Future<Response> Master::Http::state(
>              });
>
>          // Model all of the orphan tasks.
> -        // TODO(vinod): Need to filter these tasks based on authorization! This
> -        // is currently not possible because we don't have `FrameworkInfo` for
> -        // these tasks. We need to either store `FrameworkInfo` for orphan
> -        // tasks or persist FrameworkInfo of all frameworks in the registry.
> -        writer->field("orphan_tasks", [this](JSON::ArrayWriter* writer) {
> +        writer->field("orphan_tasks", [this, &tasksApprover](
> +            JSON::ArrayWriter* writer) {
>            // Find those orphan tasks.
>            foreachvalue (const Slave* slave, master->slaves.registered) {
>              typedef hashmap<TaskID, Task*> TaskMap;
> @@ -2511,6 +2508,16 @@ Future<Response> Master::Http::state(
>                  CHECK_NOTNULL(task);
>                  if (!master->frameworks.registered.contains(
>                      task->framework_id())) {
> +                  CHECK(master->frameworks.recovered.contains(
> +                      task->framework_id()));

This CHECK seems dubious: what if the orphaned task was running on an
old version of the agent? i.e., a mixed cluster in which the master
has been updated but the agent has not been.

Neil

[3/4] mesos git commit: Added filtering for orphaned tasks in /state endpoint.

Posted by vi...@apache.org.
Added filtering for orphaned tasks in /state endpoint.

Review: https://reviews.apache.org/r/49609/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0cb4533b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0cb4533b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0cb4533b

Branch: refs/heads/master
Commit: 0cb4533b9b31e1b2a3fa52bc421757bac70d9d9c
Parents: 3c96ca4
Author: Joerg Schad <jo...@mesosphere.io>
Authored: Tue Jul 5 17:05:36 2016 -0500
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Jul 5 17:06:05 2016 -0500

----------------------------------------------------------------------
 src/master/http.cpp | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0cb4533b/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 6b4f85b..debedd4 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -2498,11 +2498,8 @@ Future<Response> Master::Http::state(
             });
 
         // Model all of the orphan tasks.
-        // TODO(vinod): Need to filter these tasks based on authorization! This
-        // is currently not possible because we don't have `FrameworkInfo` for
-        // these tasks. We need to either store `FrameworkInfo` for orphan
-        // tasks or persist FrameworkInfo of all frameworks in the registry.
-        writer->field("orphan_tasks", [this](JSON::ArrayWriter* writer) {
+        writer->field("orphan_tasks", [this, &tasksApprover](
+            JSON::ArrayWriter* writer) {
           // Find those orphan tasks.
           foreachvalue (const Slave* slave, master->slaves.registered) {
             typedef hashmap<TaskID, Task*> TaskMap;
@@ -2511,6 +2508,16 @@ Future<Response> Master::Http::state(
                 CHECK_NOTNULL(task);
                 if (!master->frameworks.registered.contains(
                     task->framework_id())) {
+                  CHECK(master->frameworks.recovered.contains(
+                      task->framework_id()));
+
+                  if (!approveViewTask(
+                      tasksApprover,
+                      *task,
+                      master->frameworks.recovered[task->framework_id()])) {
+                    continue;
+                  }
+
                   writer->element(*task);
                 }
               }


[4/4] mesos git commit: Extended install() to support 7 arguments.

Posted by vi...@apache.org.
Extended install() to support 7 arguments.

Review: https://reviews.apache.org/r/49606/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/90bcedce
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/90bcedce
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/90bcedce

Branch: refs/heads/master
Commit: 90bcedcef22c960a08cc7056253d2b05d9a4ad81
Parents: c5e2aae
Author: Joerg Schad <jo...@mesosphere.io>
Authored: Tue Jul 5 17:05:21 2016 -0500
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Jul 5 17:06:05 2016 -0500

----------------------------------------------------------------------
 .../libprocess/include/process/protobuf.hpp     | 132 +++++++++++++++++++
 1 file changed, 132 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/90bcedce/3rdparty/libprocess/include/process/protobuf.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/protobuf.hpp b/3rdparty/libprocess/include/process/protobuf.hpp
index d478cc0..78ff40e 100644
--- a/3rdparty/libprocess/include/process/protobuf.hpp
+++ b/3rdparty/libprocess/include/process/protobuf.hpp
@@ -273,6 +273,35 @@ protected:
     delete m;
   }
 
+  template <typename M,
+            typename P1, typename P1C,
+            typename P2, typename P2C,
+            typename P3, typename P3C,
+            typename P4, typename P4C,
+            typename P5, typename P5C,
+            typename P6, typename P6C,
+            typename P7, typename P7C>
+  void install(
+      void (T::*method)(const process::UPID&, P1C, P2C, P3C,
+                                              P4C, P5C, P6C, P7C),
+      P1 (M::*p1)() const,
+      P2 (M::*p2)() const,
+      P3 (M::*p3)() const,
+      P4 (M::*p4)() const,
+      P5 (M::*p5)() const,
+      P6 (M::*p6)() const,
+      P7 (M::*p7)() const)
+  {
+    google::protobuf::Message* m = new M();
+    T* t = static_cast<T*>(this);
+    protobufHandlers[m->GetTypeName()] =
+      lambda::bind(&handler7<M, P1, P1C, P2, P2C, P3, P3C,
+                                P4, P4C, P5, P5C, P6, P6C, P7, P7C>,
+                   t, method, p1, p2, p3, p4, p5, p6, p7,
+                   lambda::_1, lambda::_2);
+    delete m;
+  }
+
   // Installs that do not take the sender.
   template <typename M>
   void install(void (T::*method)(const M&))
@@ -419,6 +448,34 @@ protected:
     delete m;
   }
 
+  template <typename M,
+            typename P1, typename P1C,
+            typename P2, typename P2C,
+            typename P3, typename P3C,
+            typename P4, typename P4C,
+            typename P5, typename P5C,
+            typename P6, typename P6C,
+            typename P7, typename P7C>
+  void install(
+      void (T::*method)(P1C, P2C, P3C, P4C, P5C, P6C, P7C),
+      P1 (M::*p1)() const,
+      P2 (M::*p2)() const,
+      P3 (M::*p3)() const,
+      P4 (M::*p4)() const,
+      P5 (M::*p5)() const,
+      P6 (M::*p6)() const,
+      P7 (M::*p7)() const)
+  {
+    google::protobuf::Message* m = new M();
+    T* t = static_cast<T*>(this);
+    protobufHandlers[m->GetTypeName()] =
+      lambda::bind(&_handler7<M, P1, P1C, P2, P2C, P3, P3C,
+                                 P4, P4C, P5, P5C, P6, P6C, P7, P7C>,
+                   t, method, p1, p2, p3, p4, p5, p6, p7,
+                   lambda::_1, lambda::_2);
+    delete m;
+  }
+
   using process::Process<T>::install;
 
 private:
@@ -613,6 +670,44 @@ private:
     }
   }
 
+  template <typename M,
+            typename P1, typename P1C,
+            typename P2, typename P2C,
+            typename P3, typename P3C,
+            typename P4, typename P4C,
+            typename P5, typename P5C,
+            typename P6, typename P6C,
+            typename P7, typename P7C>
+  static void handler7(
+      T* t,
+      void (T::*method)(
+          const process::UPID&, P1C, P2C, P3C, P4C, P5C, P6C, P7C),
+      P1 (M::*p1)() const,
+      P2 (M::*p2)() const,
+      P3 (M::*p3)() const,
+      P4 (M::*p4)() const,
+      P5 (M::*p5)() const,
+      P6 (M::*p6)() const,
+      P7 (M::*p7)() const,
+      const process::UPID& sender,
+      const std::string& data)
+  {
+    M m;
+    m.ParseFromString(data);
+    if (m.IsInitialized()) {
+      (t->*method)(sender,
+                   google::protobuf::convert((&m->*p1)()),
+                   google::protobuf::convert((&m->*p2)()),
+                   google::protobuf::convert((&m->*p3)()),
+                   google::protobuf::convert((&m->*p4)()),
+                   google::protobuf::convert((&m->*p5)()),
+                   google::protobuf::convert((&m->*p6)()),
+                   google::protobuf::convert((&m->*p7)()));
+    } else {
+      LOG(WARNING) << "Initialization errors: "
+                   << m.InitializationErrorString();
+    }
+  }
 
   // Handlers that ignore the sender.
   template <typename M>
@@ -800,6 +895,43 @@ private:
     }
   }
 
+  template <typename M,
+            typename P1, typename P1C,
+            typename P2, typename P2C,
+            typename P3, typename P3C,
+            typename P4, typename P4C,
+            typename P5, typename P5C,
+            typename P6, typename P6C,
+            typename P7, typename P7C>
+  static void _handler7(
+      T* t,
+      void (T::*method)(P1C, P2C, P3C, P4C, P5C, P6C, P7C),
+      P1 (M::*p1)() const,
+      P2 (M::*p2)() const,
+      P3 (M::*p3)() const,
+      P4 (M::*p4)() const,
+      P5 (M::*p5)() const,
+      P6 (M::*p6)() const,
+      P7 (M::*p7)() const,
+      const process::UPID&,
+      const std::string& data)
+  {
+    M m;
+    m.ParseFromString(data);
+    if (m.IsInitialized()) {
+      (t->*method)(google::protobuf::convert((&m->*p1)()),
+                   google::protobuf::convert((&m->*p2)()),
+                   google::protobuf::convert((&m->*p3)()),
+                   google::protobuf::convert((&m->*p4)()),
+                   google::protobuf::convert((&m->*p5)()),
+                   google::protobuf::convert((&m->*p6)()),
+                   google::protobuf::convert((&m->*p7)()));
+    } else {
+      LOG(WARNING) << "Initialization errors: "
+                   << m.InitializationErrorString();
+    }
+  }
+
   typedef lambda::function<
       void(const process::UPID&, const std::string&)> handler;
   hashmap<std::string, handler> protobufHandlers;