You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/04/23 21:09:00 UTC

[01/13] mesos git commit: Fixed handling of operations in `master::recoverFramework()`.

Repository: mesos
Updated Branches:
  refs/heads/master 7a3c63fe1 -> d58e6351b


Fixed handling of operations in `master::recoverFramework()`.

`Master::recoverFramework()` only recovers operations affecting agent
default resources. This patch makes it also recover operations affecting
resources managed by resource providers.

It also fixes a bug in which not just the corresponding operations, but
all the ones affecting agent default resources will be added to the
framework.

Review: https://reviews.apache.org/r/66458/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/470476c4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/470476c4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/470476c4

Branch: refs/heads/master
Commit: 470476c4e2574d5711bab2b5bd10e851e2314516
Parents: 7a3c63f
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:43:17 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:44:19 2018 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/470476c4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 767ad8c..5946c7b 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -9491,7 +9491,7 @@ void Master::recoverFramework(
 
   Framework* framework = new Framework(this, flags, info);
 
-  // Add active tasks and executors to the framework.
+  // Add active operations, tasks, and executors to the framework.
   foreachvalue (Slave* slave, slaves.registered) {
     if (slave->tasks.contains(framework->id())) {
       foreachvalue (Task* task, slave->tasks.at(framework->id())) {
@@ -9507,7 +9507,20 @@ void Master::recoverFramework(
     }
 
     foreachvalue (Operation* operation, slave->operations) {
-      framework->addOperation(operation);
+      if (operation->has_framework_id() &&
+          operation->framework_id() == framework->id()) {
+        framework->addOperation(operation);
+      }
+    }
+
+    foreachvalue (const Slave::ResourceProvider& resourceProvider,
+                  slave->resourceProviders) {
+      foreachvalue (Operation* operation, resourceProvider.operations) {
+        if (operation->has_framework_id() &&
+            operation->framework_id() == framework->id()) {
+          framework->addOperation(operation);
+        }
+      }
     }
   }
 


[06/13] mesos git commit: Added a master metric for operation reconciliation messages.

Posted by gr...@apache.org.
Added a master metric for operation reconciliation messages.

Review: https://reviews.apache.org/r/66463/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c7c38483
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c7c38483
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c7c38483

Branch: refs/heads/master
Commit: c7c384832905945aa3ef8cf02ed6ad20256290e7
Parents: 918f99e
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:43:34 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:48:49 2018 -0700

----------------------------------------------------------------------
 docs/monitoring.md         | 7 +++++++
 docs/operator-http-api.md  | 4 ++++
 src/master/metrics.cpp     | 4 ++++
 src/master/metrics.hpp     | 1 +
 src/tests/master_tests.cpp | 1 +
 5 files changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c7c38483/docs/monitoring.md
----------------------------------------------------------------------
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 12e2103..34cfd05 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -665,6 +665,13 @@ messages may indicate that there is a problem with the network.
 </tr>
 <tr>
   <td>
+  <code>master/messages_reconcile_operations</code>
+  </td>
+  <td>Number of reconcile operations messages</td>
+  <td>Counter</td>
+</tr>
+<tr>
+  <td>
   <code>master/messages_reconcile_tasks</code>
   </td>
   <td>Number of reconcile task messages</td>

http://git-wip-us.apache.org/repos/asf/mesos/blob/c7c38483/docs/operator-http-api.md
----------------------------------------------------------------------
diff --git a/docs/operator-http-api.md b/docs/operator-http-api.md
index 10dcac8..9be1e2d 100644
--- a/docs/operator-http-api.md
+++ b/docs/operator-http-api.md
@@ -677,6 +677,10 @@ Content-Type: application/json
         "value": 0.0
       },
       {
+        "name": "master/messages_reconcile_operations",
+        "value": 0.0
+      },
+      {
         "name": "master/messages_reconcile_tasks",
         "value": 0.0
       },

http://git-wip-us.apache.org/repos/asf/mesos/blob/c7c38483/src/master/metrics.cpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.cpp b/src/master/metrics.cpp
index 894c041..4cc96a1 100644
--- a/src/master/metrics.cpp
+++ b/src/master/metrics.cpp
@@ -129,6 +129,8 @@ Metrics::Metrics(const Master& master)
         "master/messages_revive_offers"),
     messages_suppress_offers(
         "master/messages_suppress_offers"),
+    messages_reconcile_operations(
+        "master/messages_reconcile_operations"),
     messages_reconcile_tasks(
         "master/messages_reconcile_tasks"),
     messages_framework_to_executor(
@@ -253,6 +255,7 @@ Metrics::Metrics(const Master& master)
   process::metrics::add(messages_decline_offers);
   process::metrics::add(messages_revive_offers);
   process::metrics::add(messages_suppress_offers);
+  process::metrics::add(messages_reconcile_operations);
   process::metrics::add(messages_reconcile_tasks);
   process::metrics::add(messages_framework_to_executor);
   process::metrics::add(messages_executor_to_framework);
@@ -403,6 +406,7 @@ Metrics::~Metrics()
   process::metrics::remove(messages_decline_offers);
   process::metrics::remove(messages_revive_offers);
   process::metrics::remove(messages_suppress_offers);
+  process::metrics::remove(messages_reconcile_operations);
   process::metrics::remove(messages_reconcile_tasks);
   process::metrics::remove(messages_framework_to_executor);
   process::metrics::remove(messages_executor_to_framework);

http://git-wip-us.apache.org/repos/asf/mesos/blob/c7c38483/src/master/metrics.hpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.hpp b/src/master/metrics.hpp
index 5699c64..5414c47 100644
--- a/src/master/metrics.hpp
+++ b/src/master/metrics.hpp
@@ -135,6 +135,7 @@ struct Metrics
   process::metrics::Counter messages_decline_offers;
   process::metrics::Counter messages_revive_offers;
   process::metrics::Counter messages_suppress_offers;
+  process::metrics::Counter messages_reconcile_operations;
   process::metrics::Counter messages_reconcile_tasks;
   process::metrics::Counter messages_framework_to_executor;
   process::metrics::Counter messages_operation_status_update_acknowledgement;

http://git-wip-us.apache.org/repos/asf/mesos/blob/c7c38483/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index be7a3cc..d5ce52c 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -2268,6 +2268,7 @@ TEST_F(MasterTest, MetricsInMetricsEndpoint)
   EXPECT_EQ(1u, snapshot.values.count("master/messages_decline_offers"));
   EXPECT_EQ(1u, snapshot.values.count("master/messages_revive_offers"));
   EXPECT_EQ(1u, snapshot.values.count("master/messages_suppress_offers"));
+  EXPECT_EQ(1u, snapshot.values.count("master/messages_reconcile_operations"));
   EXPECT_EQ(1u, snapshot.values.count("master/messages_reconcile_tasks"));
   EXPECT_EQ(1u, snapshot.values.count("master/messages_framework_to_executor"));
 


[10/13] mesos git commit: Added a test helper for creating `RECONCILE_OPERATIONS` v1 calls.

Posted by gr...@apache.org.
Added a test helper for creating `RECONCILE_OPERATIONS` v1 calls.

Review: https://reviews.apache.org/r/66467/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/949b44e2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/949b44e2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/949b44e2

Branch: refs/heads/master
Commit: 949b44e239aa218aefceb5fadf3cee7c46972ca9
Parents: 28b93d5
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:43:54 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:49:54 2018 -0700

----------------------------------------------------------------------
 src/tests/mesos.hpp | 24 ++++++++++++++++++++++++
 1 file changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/949b44e2/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 7a292f3..dcbfb95 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -2192,6 +2192,30 @@ inline mesos::v1::scheduler::Call createCallKill(
 }
 
 
+inline mesos::v1::scheduler::Call createCallReconcileOperations(
+    const mesos::v1::FrameworkID& frameworkId,
+    const std::vector<
+        mesos::v1::scheduler::Call::ReconcileOperations::Operation>&
+      operations = {})
+{
+  mesos::v1::scheduler::Call call;
+  call.set_type(mesos::v1::scheduler::Call::RECONCILE_OPERATIONS);
+  call.mutable_framework_id()->CopyFrom(frameworkId);
+
+  mesos::v1::scheduler::Call::ReconcileOperations* reconcile =
+    call.mutable_reconcile_operations();
+
+  foreach (
+      const mesos::v1::scheduler::Call::ReconcileOperations::Operation&
+        operation,
+      operations) {
+    reconcile->add_operations()->CopyFrom(operation);
+  }
+
+  return call;
+}
+
+
 inline mesos::v1::scheduler::Call createCallSubscribe(
   const mesos::v1::FrameworkInfo& frameworkInfo,
   const Option<mesos::v1::FrameworkID>& frameworkId = None())


[08/13] mesos git commit: Updated `using` statements in `tests/mesos.hpp`.

Posted by gr...@apache.org.
Updated `using` statements in `tests/mesos.hpp`.

Review: https://reviews.apache.org/r/66465/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/327a4a96
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/327a4a96
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/327a4a96

Branch: refs/heads/master
Commit: 327a4a965b3d39b00b0d09d91b4475398adb6068
Parents: d261690
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:43:49 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:49:30 2018 -0700

----------------------------------------------------------------------
 src/tests/mesos.hpp | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/327a4a96/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 7356523..ac6684d 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -413,6 +413,16 @@ namespace maintenance = mesos::v1::maintenance;
 namespace master = mesos::v1::master;
 namespace quota = mesos::v1::quota;
 
+using mesos::v1::OPERATION_PENDING;
+using mesos::v1::OPERATION_FINISHED;
+using mesos::v1::OPERATION_FAILED;
+using mesos::v1::OPERATION_ERROR;
+using mesos::v1::OPERATION_DROPPED;
+using mesos::v1::OPERATION_UNREACHABLE;
+using mesos::v1::OPERATION_GONE_BY_OPERATOR;
+using mesos::v1::OPERATION_RECOVERING;
+using mesos::v1::OPERATION_UNKNOWN;
+
 using mesos::v1::TASK_STAGING;
 using mesos::v1::TASK_STARTING;
 using mesos::v1::TASK_RUNNING;
@@ -444,7 +454,11 @@ using mesos::v1::InverseOffer;
 using mesos::v1::MachineID;
 using mesos::v1::Metric;
 using mesos::v1::Offer;
+using mesos::v1::OperationID;
+using mesos::v1::OperationState;
+using mesos::v1::OperationStatus;
 using mesos::v1::Resource;
+using mesos::v1::ResourceProviderID;
 using mesos::v1::ResourceProviderInfo;
 using mesos::v1::Resources;
 using mesos::v1::TaskID;
@@ -2555,6 +2569,7 @@ namespace scheduler {
 using Call = mesos::v1::scheduler::Call;
 using Event = mesos::v1::scheduler::Event;
 using Mesos = mesos::v1::scheduler::Mesos;
+using Response = mesos::v1::scheduler::Response;
 
 
 using TestMesos = tests::scheduler::TestMesos<


[09/13] mesos git commit: Updated `RESERVE()` helper to allow specifying an operation ID.

Posted by gr...@apache.org.
Updated `RESERVE()` helper to allow specifying an operation ID.

Review: https://reviews.apache.org/r/66466/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/28b93d53
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/28b93d53
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/28b93d53

Branch: refs/heads/master
Commit: 28b93d53e2e5bfa7fd9a808aa203b178e2d8c75e
Parents: 327a4a9
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:43:51 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:49:45 2018 -0700

----------------------------------------------------------------------
 src/tests/mesos.hpp | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/28b93d53/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index ac6684d..7a292f3 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1296,11 +1296,18 @@ inline TDomainInfo createDomainInfo(
 
 // Helpers for creating operations.
 template <typename TResources, typename TOffer>
-inline typename TOffer::Operation RESERVE(const TResources& resources)
+inline typename TOffer::Operation RESERVE(
+    const TResources& resources,
+    const Option<std::string> operationId = None())
 {
   typename TOffer::Operation operation;
   operation.set_type(TOffer::Operation::RESERVE);
   operation.mutable_reserve()->mutable_resources()->CopyFrom(resources);
+
+  if (operationId.isSome()) {
+    operation.mutable_id()->set_value(operationId.get());
+  }
+
   return operation;
 }
 


[02/13] mesos git commit: Fixed a bug in `Master::updateSlave()`.

Posted by gr...@apache.org.
Fixed a bug in `Master::updateSlave()`.

A part of `Master::updateSlave()` doesn't account for operations created
via the operator API; this patch fixes that.

Review: https://reviews.apache.org/r/66459/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4e55884b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4e55884b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4e55884b

Branch: refs/heads/master
Commit: 4e55884bf78f65cb62398df713426d4606dc9bc9
Parents: 470476c
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:43:21 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:47:37 2018 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4e55884b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 5946c7b..67baa6b 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -7775,7 +7775,8 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
 
         addOperation(framework, slave, new Operation(operation));
 
-        if (!protobuf::isTerminalState(operation.latest_status().state())) {
+        if (!protobuf::isTerminalState(operation.latest_status().state()) &&
+            operation.has_framework_id()) {
           // If we do not yet know the `FrameworkInfo` of the framework the
           // operation originated from, we cannot properly track the operation
           // at this point.


[12/13] mesos git commit: Added tests for operation status reconciliation.

Posted by gr...@apache.org.
Added tests for operation status reconciliation.

Review: https://reviews.apache.org/r/66468/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b8cca436
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b8cca436
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b8cca436

Branch: refs/heads/master
Commit: b8cca436fb5646adad8174124efdb549f6988422
Parents: c39ef69
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:43:58 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:50:43 2018 -0700

----------------------------------------------------------------------
 src/Makefile.am                                 |   1 +
 src/tests/CMakeLists.txt                        |   1 +
 src/tests/mesos.hpp                             |   1 +
 src/tests/operation_reconciliation_tests.cpp    | 843 +++++++++++++++++++
 .../storage_local_resource_provider_tests.cpp   | 181 ++++
 5 files changed, 1027 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b8cca436/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index e50e43b..7e91681 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2552,6 +2552,7 @@ mesos_tests_SOURCES =						\
   tests/mock_registrar.cpp					\
   tests/module.cpp						\
   tests/module_tests.cpp					\
+  tests/operation_reconciliation_tests.cpp			\
   tests/operation_status_update_manager_tests.cpp		\
   tests/oversubscription_tests.cpp				\
   tests/partition_tests.cpp					\

http://git-wip-us.apache.org/repos/asf/mesos/blob/b8cca436/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index 4eb8e23..1fef060 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -108,6 +108,7 @@ set(MESOS_TESTS_SRC
   http_fault_tolerance_tests.cpp
   master_maintenance_tests.cpp
   master_slave_reconciliation_tests.cpp
+  operation_reconciliation_tests.cpp
   operation_status_update_manager_tests.cpp
   partition_tests.cpp
   paths_tests.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/b8cca436/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index dcbfb95..756a521 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -2597,6 +2597,7 @@ public:
 namespace v1 {
 namespace scheduler {
 
+using APIResult = mesos::v1::scheduler::APIResult;
 using Call = mesos::v1::scheduler::Call;
 using Event = mesos::v1::scheduler::Event;
 using Mesos = mesos::v1::scheduler::Mesos;

http://git-wip-us.apache.org/repos/asf/mesos/blob/b8cca436/src/tests/operation_reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/operation_reconciliation_tests.cpp b/src/tests/operation_reconciliation_tests.cpp
new file mode 100644
index 0000000..76c1695
--- /dev/null
+++ b/src/tests/operation_reconciliation_tests.cpp
@@ -0,0 +1,843 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <gmock/gmock.h>
+
+#include <mesos/mesos.hpp>
+
+#include <mesos/v1/mesos.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/http.hpp>
+#include <process/message.hpp>
+
+#include <stout/gtest.hpp>
+
+#include "master/master.hpp"
+
+#include "master/detector/standalone.hpp"
+
+#include "slave/slave.hpp"
+
+#include "tests/mesos.hpp"
+
+using mesos::master::detector::MasterDetector;
+using mesos::master::detector::StandaloneMasterDetector;
+
+using process::Clock;
+using process::Future;
+using process::Message;
+using process::Owned;
+
+using testing::Eq;
+using testing::WithParamInterface;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+namespace v1 {
+
+class OperationReconciliationTest
+  : public MesosTest,
+    public WithParamInterface<ContentType> {};
+
+
+// These tests are parameterized by the content type of the HTTP request.
+INSTANTIATE_TEST_CASE_P(
+    ContentType,
+    OperationReconciliationTest,
+    ::testing::Values(ContentType::PROTOBUF, ContentType::JSON));
+
+
+// This test ensures that the master responds with `OPERATION_PENDING` for
+// operations that are pending at the master.
+TEST_P(OperationReconciliationTest, PendingOperation)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  auto scheduler = std::make_shared<MockHTTPScheduler>();
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(frameworkInfo));
+
+  Future<scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(scheduler::DeclineOffers()); // Decline subsequent offers.
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  scheduler::TestMesos mesos(master.get()->pid, GetParam(), scheduler);
+
+  AWAIT_READY(subscribed);
+  FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const Offer& offer = offers->offers(0);
+  const AgentID& agentId = offer.agent_id();
+
+  OperationID operationId;
+  operationId.set_value("operation");
+
+  const Resources reservedResources =
+    Resources(offer.resources())
+      .pushReservation(createDynamicReservationInfo(
+          frameworkInfo.roles(0), frameworkInfo.principal()));
+
+  // We'll drop the `ApplyOperationMessage` from the master to the agent.
+  Future<ApplyOperationMessage> applyOperationMessage =
+    DROP_PROTOBUF(ApplyOperationMessage(), master.get()->pid, _);
+
+  mesos.send(createCallAccept(
+      frameworkId,
+      offer,
+      {RESERVE(reservedResources, operationId.value())}));
+
+  AWAIT_READY(applyOperationMessage);
+
+  scheduler::Call::ReconcileOperations::Operation operation;
+  operation.mutable_operation_id()->CopyFrom(operationId);
+  operation.mutable_agent_id()->CopyFrom(agentId);
+
+  const Future<scheduler::APIResult> result =
+    mesos.call({createCallReconcileOperations(frameworkId, {operation})});
+
+  AWAIT_READY(result);
+
+  // The master should respond with '200 OK' and with a `scheduler::Response`.
+  ASSERT_EQ(process::http::Status::OK, result->status_code());
+  ASSERT_TRUE(result->has_response());
+
+  const scheduler::Response response = result->response();
+  ASSERT_EQ(scheduler::Response::RECONCILE_OPERATIONS, response.type());
+  ASSERT_TRUE(response.has_reconcile_operations());
+
+  const scheduler::Response::ReconcileOperations& reconcile =
+    response.reconcile_operations();
+  ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+  const OperationStatus& operationStatus = reconcile.operation_statuses(0);
+  EXPECT_EQ(operationId, operationStatus.operation_id());
+  EXPECT_EQ(OPERATION_PENDING, operationStatus.state());
+  EXPECT_FALSE(operationStatus.has_uuid());
+}
+
+
+// This test verifies that reconciliation of an unknown operation that belongs
+// to an agent that has been recovered from the registry after master failover
+// but has not yet registered, results in `OPERATION_RECOVERING`.
+//
+// TODO(gkleiman): Enable this test on Windows once Windows supports the
+// replicated log.
+TEST_P_TEMP_DISABLED_ON_WINDOWS(
+    OperationReconciliationTest, UnknownOperationRecoveredAgent)
+{
+  mesos::internal::master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.registry = "replicated_log";
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Wait for the agent to register and get the agent ID.
+  AWAIT_READY(slaveRegisteredMessage);
+  const AgentID agentId = evolve(slaveRegisteredMessage->slave_id());
+
+  // Stop the master.
+  master->reset();
+
+  // Stop the slave.
+  slave.get()->terminate();
+  slave->reset();
+
+  // Restart the master.
+  master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(DEFAULT_FRAMEWORK_INFO));
+
+  Future<scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  // Decline all offers.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(scheduler::DeclineOffers());
+
+  scheduler::TestMesos mesos(master.get()->pid, GetParam(), scheduler);
+
+  AWAIT_READY(subscribed);
+  FrameworkID frameworkId(subscribed->framework_id());
+
+  OperationID operationId;
+  operationId.set_value("operation");
+
+  scheduler::Call::ReconcileOperations::Operation operation;
+  operation.mutable_operation_id()->CopyFrom(operationId);
+  operation.mutable_agent_id()->CopyFrom(agentId);
+
+  const Future<scheduler::APIResult> result =
+    mesos.call({createCallReconcileOperations(frameworkId, {operation})});
+
+  AWAIT_READY(result);
+
+  // The master should respond with '200 OK' and with a `scheduler::Response`.
+  ASSERT_EQ(process::http::Status::OK, result->status_code());
+  ASSERT_TRUE(result->has_response());
+
+  const scheduler::Response response = result->response();
+  ASSERT_EQ(scheduler::Response::RECONCILE_OPERATIONS, response.type());
+  ASSERT_TRUE(response.has_reconcile_operations());
+
+  const scheduler::Response::ReconcileOperations& reconcile =
+    response.reconcile_operations();
+  ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+  const OperationStatus& operationStatus = reconcile.operation_statuses(0);
+  EXPECT_EQ(operationId, operationStatus.operation_id());
+  EXPECT_EQ(OPERATION_RECOVERING, operationStatus.state());
+  EXPECT_FALSE(operationStatus.has_uuid());
+}
+
+
+// This test verifies that reconciliation of an unknown operation that belongs
+// to a known agent results in `OPERATION_UNKNOWN`.
+TEST_P(OperationReconciliationTest, UnknownOperationKnownAgent)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Wait for the agent to register and get the agent ID.
+  AWAIT_READY(slaveRegisteredMessage);
+  const AgentID agentId = evolve(slaveRegisteredMessage->slave_id());
+
+  auto scheduler = std::make_shared<MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(DEFAULT_FRAMEWORK_INFO));
+
+  Future<scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  // Decline all offers.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(scheduler::DeclineOffers());
+
+  scheduler::TestMesos mesos(master.get()->pid, GetParam(), scheduler);
+
+  AWAIT_READY(subscribed);
+  FrameworkID frameworkId(subscribed->framework_id());
+
+  OperationID operationId;
+  operationId.set_value("operation");
+
+  scheduler::Call::ReconcileOperations::Operation operation;
+  operation.mutable_operation_id()->CopyFrom(operationId);
+  operation.mutable_agent_id()->CopyFrom(agentId);
+
+  const Future<scheduler::APIResult> result =
+    mesos.call({createCallReconcileOperations(frameworkId, {operation})});
+
+  AWAIT_READY(result);
+
+  // The master should respond with '200 OK' and with a `scheduler::Response`.
+  ASSERT_EQ(process::http::Status::OK, result->status_code());
+  ASSERT_TRUE(result->has_response());
+
+  const scheduler::Response response = result->response();
+  ASSERT_EQ(scheduler::Response::RECONCILE_OPERATIONS, response.type());
+  ASSERT_TRUE(response.has_reconcile_operations());
+
+  const scheduler::Response::ReconcileOperations& reconcile =
+    response.reconcile_operations();
+  ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+  const OperationStatus& operationStatus = reconcile.operation_statuses(0);
+  EXPECT_EQ(operationId, operationStatus.operation_id());
+  EXPECT_EQ(OPERATION_UNKNOWN, operationStatus.state());
+  EXPECT_FALSE(operationStatus.has_uuid());
+}
+
+
+// This test verifies that reconciliation of an unknown operation that belongs
+// to an unreachable agent results in `OPERATION_UNREACHABLE`.
+TEST_P(OperationReconciliationTest, UnknownOperationUnreachableAgent)
+{
+  Clock::pause();
+
+  mesos::internal::master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  // Allow the master to PING the agent, but drop all PONG messages
+  // from the agent. Note that we don't match on the master / agent
+  // PIDs because it's actually the `SlaveObserver` process that sends
+  // the pings.
+  Future<Message> ping =
+    FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+
+  DROP_PROTOBUFS(PongSlaveMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Wait for the agent to register and get the agent ID.
+  AWAIT_READY(slaveRegisteredMessage);
+  const AgentID agentId = evolve(slaveRegisteredMessage->slave_id());
+
+  // Now, induce a partition of the agent by having the master
+  // timeout the agent.
+  size_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == masterFlags.max_agent_ping_timeouts) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+  }
+
+  Clock::advance(masterFlags.agent_ping_timeout);
+  Clock::settle();
+
+  auto scheduler = std::make_shared<MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(DEFAULT_FRAMEWORK_INFO));
+
+  Future<scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  // Decline all offers.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(scheduler::DeclineOffers());
+
+  scheduler::TestMesos mesos(master.get()->pid, GetParam(), scheduler);
+
+  AWAIT_READY(subscribed);
+  FrameworkID frameworkId(subscribed->framework_id());
+
+  OperationID operationId;
+  operationId.set_value("operation");
+
+  scheduler::Call::ReconcileOperations::Operation operation;
+  operation.mutable_operation_id()->CopyFrom(operationId);
+  operation.mutable_agent_id()->CopyFrom(agentId);
+
+  const Future<scheduler::APIResult> result =
+    mesos.call({createCallReconcileOperations(frameworkId, {operation})});
+
+  AWAIT_READY(result);
+
+  // The master should respond with '200 OK' and with a `scheduler::Response`.
+  ASSERT_EQ(process::http::Status::OK, result->status_code());
+  ASSERT_TRUE(result->has_response());
+
+  const scheduler::Response response = result->response();
+  ASSERT_EQ(scheduler::Response::RECONCILE_OPERATIONS, response.type());
+  ASSERT_TRUE(response.has_reconcile_operations());
+
+  const scheduler::Response::ReconcileOperations& reconcile =
+    response.reconcile_operations();
+  ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+  const OperationStatus& operationStatus = reconcile.operation_statuses(0);
+  EXPECT_EQ(operationId, operationStatus.operation_id());
+  EXPECT_EQ(OPERATION_UNREACHABLE, operationStatus.state());
+  EXPECT_FALSE(operationStatus.has_uuid());
+}
+
+
+// This test verifies that reconciliation of an unknown operation that belongs
+// to an agent marked gone results in `OPERATION_GONE_BY_OPERATOR`.
+TEST_P(OperationReconciliationTest, UnknownOperationAgentMarkedGone)
+{
+  Clock::pause();
+
+  mesos::internal::master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Wait for the agent to register and get the agent ID.
+  AWAIT_READY(slaveRegisteredMessage);
+  const AgentID agentId = evolve(slaveRegisteredMessage->slave_id());
+
+  ContentType contentType = GetParam();
+
+  {
+    master::Call call;
+    call.set_type(master::Call::MARK_AGENT_GONE);
+
+    call.mutable_mark_agent_gone()->mutable_agent_id()->CopyFrom(agentId);
+
+    Future<process::http::Response> response = process::http::post(
+        master.get()->pid,
+        "api/v1",
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+        serialize(contentType, call),
+        stringify(contentType));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
+  }
+
+  auto scheduler = std::make_shared<MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(DEFAULT_FRAMEWORK_INFO));
+
+  Future<scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  // Decline all offers.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(scheduler::DeclineOffers());
+
+  scheduler::TestMesos mesos(master.get()->pid, contentType, scheduler);
+
+  AWAIT_READY(subscribed);
+  FrameworkID frameworkId(subscribed->framework_id());
+
+  OperationID operationId;
+  operationId.set_value("operation");
+
+  scheduler::Call::ReconcileOperations::Operation operation;
+  operation.mutable_operation_id()->CopyFrom(operationId);
+  operation.mutable_agent_id()->CopyFrom(agentId);
+
+  const Future<scheduler::APIResult> result =
+    mesos.call({createCallReconcileOperations(frameworkId, {operation})});
+
+  AWAIT_READY(result);
+
+  // The master should respond with '200 OK' and with a `scheduler::Response`.
+  ASSERT_EQ(process::http::Status::OK, result->status_code());
+  ASSERT_TRUE(result->has_response());
+
+  const scheduler::Response response = result->response();
+  ASSERT_EQ(scheduler::Response::RECONCILE_OPERATIONS, response.type());
+  ASSERT_TRUE(response.has_reconcile_operations());
+
+  const scheduler::Response::ReconcileOperations& reconcile =
+    response.reconcile_operations();
+  ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+  const OperationStatus& operationStatus = reconcile.operation_statuses(0);
+  EXPECT_EQ(operationId, operationStatus.operation_id());
+  EXPECT_EQ(OPERATION_GONE_BY_OPERATOR, operationStatus.state());
+  EXPECT_FALSE(operationStatus.has_uuid());
+}
+
+
+// This test verifies that reconciliation of an unknown operation that belongs
+// to an unknown agent results in `OPERATION_UNKNOWN`.
+TEST_P(OperationReconciliationTest, UnknownOperationUnknownAgent)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(DEFAULT_FRAMEWORK_INFO));
+
+  Future<scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  // Decline all offers.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(scheduler::DeclineOffers());
+
+  scheduler::TestMesos mesos(master.get()->pid, GetParam(), scheduler);
+
+  AWAIT_READY(subscribed);
+  FrameworkID frameworkId(subscribed->framework_id());
+
+  AgentID agentId;
+  agentId.set_value("agent");
+
+  OperationID operationId;
+  operationId.set_value("operation");
+
+  scheduler::Call::ReconcileOperations::Operation operation;
+  operation.mutable_operation_id()->CopyFrom(operationId);
+  operation.mutable_agent_id()->CopyFrom(agentId);
+
+  const Future<scheduler::APIResult> result =
+    mesos.call({createCallReconcileOperations(frameworkId, {operation})});
+
+  AWAIT_READY(result);
+
+  // The master should respond with '200 OK' and with a `scheduler::Response`.
+  ASSERT_EQ(process::http::Status::OK, result->status_code());
+  ASSERT_TRUE(result->has_response());
+
+  const scheduler::Response response = result->response();
+  ASSERT_EQ(scheduler::Response::RECONCILE_OPERATIONS, response.type());
+  ASSERT_TRUE(response.has_reconcile_operations());
+
+  const scheduler::Response::ReconcileOperations& reconcile =
+    response.reconcile_operations();
+  ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+  const OperationStatus& operationStatus = reconcile.operation_statuses(0);
+  EXPECT_EQ(operationId, operationStatus.operation_id());
+  EXPECT_EQ(OPERATION_UNKNOWN, operationStatus.state());
+  EXPECT_FALSE(operationStatus.has_uuid());
+}
+
+
+// This test verifies that, after a master failover, reconciliation of an
+// operation that is still pending on an agent results in `OPERATION_PENDING`.
+TEST_P(OperationReconciliationTest, AgentPendingOperationAfterMasterFailover)
+{
+  Clock::pause();
+
+  mesos::internal::master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  auto detector = std::make_shared<StandaloneMasterDetector>(master.get()->pid);
+
+  mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Wait for the agent to register.
+  AWAIT_READY(updateSlaveMessage);
+
+  // Start and register a resource provider.
+
+  ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+  resourceProviderInfo.set_name("test");
+
+  Resource disk =
+    createDiskResource("200", "*", None(), None(), createDiskSourceRaw());
+
+  Owned<MockResourceProvider> resourceProvider(
+      new MockResourceProvider(
+          resourceProviderInfo,
+          Resources(disk)));
+
+  // We override the mock resource provider's default action, so the operation
+  // will stay in `OPERATION_PENDING`.
+  Future<resource_provider::Event::ApplyOperation> applyOperation;
+  EXPECT_CALL(*resourceProvider, applyOperation(_))
+    .WillOnce(FutureArg<0>(&applyOperation));
+
+  Owned<EndpointDetector> endpointDetector(
+      mesos::internal::tests::resource_provider::createEndpointDetector(
+          slave.get()->pid));
+
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // fully register.
+  Clock::resume();
+
+  ContentType contentType = GetParam();
+
+  resourceProvider->start(endpointDetector, contentType, DEFAULT_CREDENTIAL);
+
+  // Wait until the agent's resources have been updated to include the
+  // resource provider resources.
+  AWAIT_READY(updateSlaveMessage);
+  ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
+  ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
+
+  Clock::pause();
+
+  // Start a v1 framework.
+  auto scheduler = std::make_shared<MockHTTPScheduler>();
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(frameworkInfo));
+
+  Future<scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  // Decline offers that do not contain wanted resources.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(scheduler::DeclineOffers());
+
+  Future<scheduler::Event::Offers> offers;
+
+  auto isRaw = [](const Resource& r) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().type() == Resource::DiskInfo::Source::RAW;
+  };
+
+  EXPECT_CALL(*scheduler, offers(_, scheduler::OffersHaveAnyResource(
+      std::bind(isRaw, lambda::_1))))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(scheduler::DeclineOffers()); // Decline successive offers.
+
+  scheduler::TestMesos mesos(
+      master.get()->pid, contentType, scheduler, detector);
+
+  AWAIT_READY(subscribed);
+  FrameworkID frameworkId(subscribed->framework_id());
+
+  // NOTE: If the framework has not declined an unwanted offer yet when
+  // the master updates the agent with the RAW disk resource, the new
+  // allocation triggered by this update won't generate an allocatable
+  // offer due to no CPU and memory resources. So here we first settle
+  // the clock to ensure that the unwanted offer has been declined, then
+  // advance the clock to trigger another allocation.
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const Offer& offer = offers->offers(0);
+  const AgentID& agentId = offer.agent_id();
+
+  Option<Resource> source;
+  Option<ResourceProviderID> resourceProviderId;
+  foreach (const Resource& resource, offer.resources()) {
+    if (isRaw(resource)) {
+      source = resource;
+
+      ASSERT_TRUE(resource.has_provider_id());
+      resourceProviderId = resource.provider_id();
+
+      break;
+    }
+  }
+
+  ASSERT_SOME(source);
+  ASSERT_SOME(resourceProviderId);
+
+  OperationID operationId;
+  operationId.set_value("operation");
+
+  mesos.send(createCallAccept(
+      frameworkId,
+      offer,
+      {CREATE_VOLUME(
+          source.get(),
+          Resource::DiskInfo::Source::MOUNT,
+          operationId.value())}));
+
+  AWAIT_READY(applyOperation);
+
+  // Simulate master failover.
+
+  detector->appoint(None());
+
+  master->reset();
+  master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Settle the clock to ensure the master finishes recovering the registry.
+  Clock::settle();
+
+  Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF(
+      SlaveReregisteredMessage(), master.get()->pid, slave.get()->pid);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(frameworkInfo, frameworkId));
+
+  Future<scheduler::Event::Subscribed> frameworkResubscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&frameworkResubscribed));
+
+  // Simulate a new master detected event to the agent and the scheduler.
+  detector->appoint(master.get()->pid);
+
+  // Advance the clock, so that the agent re-registers.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Wait for the framework and agent to re-register.
+  AWAIT_READY(slaveReregistered);
+  AWAIT_READY(frameworkResubscribed);
+
+  // Test explicit reconciliation
+  {
+    scheduler::Call::ReconcileOperations::Operation operation;
+    operation.mutable_operation_id()->CopyFrom(operationId);
+    operation.mutable_agent_id()->CopyFrom(agentId);
+
+    const Future<scheduler::APIResult> result =
+      mesos.call({createCallReconcileOperations(frameworkId, {operation})});
+
+    AWAIT_READY(result);
+
+    // The master should respond with '200 OK' and with a `scheduler::Response`.
+    ASSERT_EQ(process::http::Status::OK, result->status_code());
+    ASSERT_TRUE(result->has_response());
+
+    const scheduler::Response response = result->response();
+    ASSERT_EQ(scheduler::Response::RECONCILE_OPERATIONS, response.type());
+    ASSERT_TRUE(response.has_reconcile_operations());
+
+    const scheduler::Response::ReconcileOperations& reconcile =
+      response.reconcile_operations();
+    ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+    const OperationStatus& operationStatus = reconcile.operation_statuses(0);
+    EXPECT_EQ(operationId, operationStatus.operation_id());
+    EXPECT_EQ(OPERATION_PENDING, operationStatus.state());
+    EXPECT_FALSE(operationStatus.has_uuid());
+  }
+
+  // Test implicit reconciliation
+  {
+    const Future<scheduler::APIResult> result =
+      mesos.call({createCallReconcileOperations(frameworkId, {})});
+
+    AWAIT_READY(result);
+
+    // The master should respond with '200 OK' and with a `scheduler::Response`.
+    ASSERT_EQ(process::http::Status::OK, result->status_code());
+    ASSERT_TRUE(result->has_response());
+
+    const scheduler::Response response = result->response();
+    ASSERT_EQ(scheduler::Response::RECONCILE_OPERATIONS, response.type());
+    ASSERT_TRUE(response.has_reconcile_operations());
+
+    const scheduler::Response::ReconcileOperations& reconcile =
+      response.reconcile_operations();
+    ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+    const OperationStatus& operationStatus = reconcile.operation_statuses(0);
+    EXPECT_EQ(operationId, operationStatus.operation_id());
+    EXPECT_EQ(OPERATION_PENDING, operationStatus.state());
+    EXPECT_FALSE(operationStatus.has_uuid());
+  }
+}
+
+} // namespace v1 {
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b8cca436/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 2872f1a..ccb114a 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -3305,6 +3305,187 @@ TEST_F(
   Clock::settle();
 }
 
+
+// This test ensures that the master responds with the latest state
+// for operations that are terminal at the master, but have not been
+// acknowledged by the framework.
+TEST_F(
+    StorageLocalResourceProviderTest,
+    ROOT_ReconcileUnacknowledgedTerminalOperation)
+{
+  Clock::pause();
+
+  loadUriDiskProfileAdaptorModule();
+
+  setupResourceProviderConfig(Gigabytes(4));
+  setupDiskProfileMapping();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.isolation = "filesystem/linux";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  flags.authenticate_http_readwrite = false;
+
+  flags.resource_provider_config_dir = resourceProviderConfigDir;
+  flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  // Since the local resource provider daemon is started after the agent
+  // is registered, it is guaranteed that the slave will send two
+  // `UpdateSlaveMessage`s, where the latter one contains resources from
+  // the storage local resource provider.
+  //
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlave2 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave1 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(flags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlave1);
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // periodically check if the CSI endpoint socket has been created by
+  // the plugin container, which runs in another Linux process.
+  Clock::resume();
+
+  AWAIT_READY(updateSlave2);
+  ASSERT_TRUE(updateSlave2->has_resource_providers());
+  ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+
+  Clock::pause();
+
+  // Register a framework to exercise an operation.
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, "storage");
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  // Decline offers that do not contain wanted resources.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(v1::scheduler::DeclineOffers());
+
+  Future<v1::scheduler::Event::Offers> offers;
+
+  auto isRaw = [](const v1::Resource& r) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().type() == v1::Resource::DiskInfo::Source::RAW;
+  };
+
+  EXPECT_CALL(*scheduler, offers(_, v1::scheduler::OffersHaveAnyResource(
+      std::bind(isRaw, lambda::_1))))
+    .WillOnce(FutureArg<1>(&offers));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  // NOTE: If the framework has not declined an unwanted offer yet when
+  // the master updates the agent with the RAW disk resource, the new
+  // allocation triggered by this update won't generate an allocatable
+  // offer due to no CPU and memory resources. So here we first settle
+  // the clock to ensure that the unwanted offer has been declined, then
+  // advance the clock to trigger another allocation.
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Future<v1::scheduler::Event::UpdateOperationStatus> update;
+  EXPECT_CALL(*scheduler, updateOperationStatus(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Option<v1::Resource> source;
+  Option<mesos::v1::ResourceProviderID> resourceProviderId;
+  foreach (const v1::Resource& resource, offer.resources()) {
+    if (isRaw(resource)) {
+      source = resource;
+
+      ASSERT_TRUE(resource.has_provider_id());
+      resourceProviderId = resource.provider_id();
+
+      break;
+    }
+  }
+
+  ASSERT_SOME(source);
+  ASSERT_SOME(resourceProviderId);
+
+  v1::OperationID operationId;
+  operationId.set_value("operation");
+
+  mesos.send(v1::createCallAccept(
+      frameworkId,
+      offer,
+      {v1::CREATE_VOLUME(
+          source.get(),
+          v1::Resource::DiskInfo::Source::MOUNT,
+          operationId.value())}));
+
+  AWAIT_READY(update);
+
+  ASSERT_EQ(operationId, update->status().operation_id());
+  ASSERT_EQ(v1::OperationState::OPERATION_FINISHED, update->status().state());
+  ASSERT_TRUE(update->status().has_uuid());
+
+  v1::scheduler::Call::ReconcileOperations::Operation operation;
+  operation.mutable_operation_id()->CopyFrom(operationId);
+  operation.mutable_agent_id()->CopyFrom(agentId);
+
+  const Future<v1::scheduler::APIResult> result =
+    mesos.call({v1::createCallReconcileOperations(frameworkId, {operation})});
+
+  AWAIT_READY(result);
+
+  // The master should respond with '200 OK' and with a `scheduler::Response`.
+  ASSERT_EQ(process::http::Status::OK, result->status_code());
+  ASSERT_TRUE(result->has_response());
+
+  const v1::scheduler::Response response = result->response();
+  ASSERT_EQ(v1::scheduler::Response::RECONCILE_OPERATIONS, response.type());
+  ASSERT_TRUE(response.has_reconcile_operations());
+
+  const v1::scheduler::Response::ReconcileOperations& reconcile =
+    response.reconcile_operations();
+  ASSERT_EQ(1, reconcile.operation_statuses_size());
+
+  const v1::OperationStatus& operationStatus = reconcile.operation_statuses(0);
+  ASSERT_EQ(operationId, operationStatus.operation_id());
+  ASSERT_EQ(v1::OPERATION_FINISHED, operationStatus.state());
+  ASSERT_TRUE(operationStatus.has_uuid());
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[05/13] mesos git commit: Added new operation states to be used for status reconciliation.

Posted by gr...@apache.org.
Added new operation states to be used for status reconciliation.

Review: https://reviews.apache.org/r/66462/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/918f99e6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/918f99e6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/918f99e6

Branch: refs/heads/master
Commit: 918f99e6558db465f4c6aca75563e0c49b0203d1
Parents: c265ae6
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:43:28 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:48:36 2018 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto     | 29 +++++++++++++++++++++++++++++
 include/mesos/v1/mesos.proto  | 29 +++++++++++++++++++++++++++++
 src/common/protobuf_utils.cpp |  6 +++++-
 src/master/master.cpp         |  8 ++++++--
 src/slave/slave.cpp           |  8 ++++++--
 5 files changed, 75 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/918f99e6/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 9e24d3e..5bc4a80 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -2309,6 +2309,35 @@ enum OperationState {
 
   // TERMINAL: The operation was dropped due to a transient error.
   OPERATION_DROPPED = 5;
+
+  // The operation affects an agent that has lost contact with the master,
+  // typically due to a network failure or partition. The operation may or may
+  // not still be pending.
+  OPERATION_UNREACHABLE = 6;
+
+  // The operation affected an agent that the master cannot contact;
+  // the operator has asserted that the agent has been shutdown, but this has
+  // not been directly confirmed by the master.
+  //
+  // If the operator is correct, the operation is not pending and this is a
+  // terminal state; if the operator is mistaken, the operation may still be
+  // pending and might return to a different state in the future.
+  OPERATION_GONE_BY_OPERATOR = 7;
+
+  // The operation affects an agent that the master recovered from its
+  // state, but that agent has not yet re-registered.
+  //
+  // The operation can transition to `OPERATION_UNREACHABLE` if the
+  // corresponding agent is marked as unreachable, and will transition to
+  // another status if the agent re-registers.
+  OPERATION_RECOVERING = 8;
+
+  // The master has no knowledge of the operation. This is typically
+  // because either (a) the master never had knowledge of the operation, or
+  // (b) the master forgot about the operation because it garbage collected
+  // its metadata about the operation. The operation may or may not still be
+  // pending.
+  OPERATION_UNKNOWN = 9;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/918f99e6/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index 0f3fd8a..5a4e733 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -2301,6 +2301,35 @@ enum OperationState {
 
   // TERMINAL: The operation was dropped due to a transient error.
   OPERATION_DROPPED = 5;
+
+  // The operation affects an agent that has lost contact with the master,
+  // typically due to a network failure or partition. The operation may or may
+  // not still be pending.
+  OPERATION_UNREACHABLE = 6;
+
+  // The operation affected an agent that the master cannot contact;
+  // the operator has asserted that the agent has been shutdown, but this has
+  // not been directly confirmed by the master.
+  //
+  // If the operator is correct, the operation is not pending and this is a
+  // terminal state; if the operator is mistaken, the operation may still be
+  // pending and might return to a different state in the future.
+  OPERATION_GONE_BY_OPERATOR = 7;
+
+  // The operation affects an agent that the master recovered from its
+  // state, but that agent has not yet re-registered.
+  //
+  // The operation can transition to `OPERATION_UNREACHABLE` if the
+  // corresponding agent is marked as unreachable, and will transition to
+  // another status if the agent re-registers.
+  OPERATION_RECOVERING = 8;
+
+  // The master has no knowledge of the operation. This is typically
+  // because either (a) the master never had knowledge of the operation, or
+  // (b) the master forgot about the operation because it garbage collected
+  // its metadata about the operation. The operation may or may not still be
+  // pending.
+  OPERATION_UNKNOWN = 9;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/918f99e6/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 141a444..78bffd8 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -408,8 +408,12 @@ bool isTerminalState(const OperationState& state)
     case OPERATION_ERROR:
     case OPERATION_DROPPED:
       return true;
-    case OPERATION_PENDING:
     case OPERATION_UNSUPPORTED:
+    case OPERATION_PENDING:
+    case OPERATION_UNREACHABLE:
+    case OPERATION_GONE_BY_OPERATOR:
+    case OPERATION_RECOVERING:
+    case OPERATION_UNKNOWN:
       return false;
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/918f99e6/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 67baa6b..ada7709 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -10743,9 +10743,13 @@ void Master::updateOperation(
       break;
     }
 
-    // Non-terminal. This shouldn't happen.
+    // Non-terminal or not expected from an agent. This shouldn't happen.
+    case OPERATION_UNSUPPORTED:
     case OPERATION_PENDING:
-    case OPERATION_UNSUPPORTED: {
+    case OPERATION_UNREACHABLE:
+    case OPERATION_GONE_BY_OPERATOR:
+    case OPERATION_RECOVERING:
+    case OPERATION_UNKNOWN: {
       LOG(FATAL) << "Unexpected operation state "
                  << operation->latest_status().state();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/918f99e6/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 2b8c6e0..d0ff5f8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7975,9 +7975,13 @@ void Slave::updateOperation(
       break;
     }
 
-    // Non-terminal. This shouldn't happen.
+    // Non-terminal or not sent by resource providers. This shouldn't happen.
+    case OPERATION_UNSUPPORTED:
     case OPERATION_PENDING:
-    case OPERATION_UNSUPPORTED: {
+    case OPERATION_UNREACHABLE:
+    case OPERATION_GONE_BY_OPERATOR:
+    case OPERATION_RECOVERING:
+    case OPERATION_UNKNOWN: {
       LOG(FATAL) << "Unexpected operation state "
                  << operation->latest_status().state();
     }


[13/13] mesos git commit: Updated documentation for operation feedback.

Posted by gr...@apache.org.
Updated documentation for operation feedback.

Review: https://reviews.apache.org/r/66696/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d58e6351
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d58e6351
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d58e6351

Branch: refs/heads/master
Commit: d58e6351b1377c53cf5122b5f0755c4a7d09a44f
Parents: b8cca43
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:44:01 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:50:53 2018 -0700

----------------------------------------------------------------------
 docs/csi.md                | 26 +++----------
 docs/scheduler-http-api.md | 81 ++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 85 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d58e6351/docs/csi.md
----------------------------------------------------------------------
diff --git a/docs/csi.md b/docs/csi.md
index 2d96ee7..77a5edb 100644
--- a/docs/csi.md
+++ b/docs/csi.md
@@ -345,27 +345,11 @@ It is important for the frameworks to get the results of the above offer
 operations so that they know if the dynamic disk provisioning is successful or
 not.
 
-Unfortunately, the current scheduler API does not provide a way to give explicit
-offer operation feedback. Frameworks have to infer the result of the operation
-by looking at various sources of information that are available to them. Here are
-the tips to get offer operation results:
-
-* Leverage [reservation labels](reservation.md#reservation-labels). Reservation
-  labels can be used to uniquely identify a resource. By looking at the
-  reservation labels of an offered resource, the framework can infer if an
-  operation is successful or not.
-* Use [operator API](operator-http-api.md) to get the current set of resources.
-
-##### Explicit Operation Feedback
-
-Even if there are tips to infer offer operation results, it is far from ideal.
-The biggest issue is that it is impossible to get the failure reason if an offer
-operation fails. For instance, a CSI plugin might return a failure when creating
-a volume, and it is important for the framework to know about that and surface
-that information to the end user.
-
-As a result, we need a way to get explicit operation feedback just like task
-status updates. This feature is [coming soon](https://issues.apache.org/jira/browse/MESOS-8054).
+Starting with Mesos 1.6.0 it is possible to opt-in to receive status updates
+related to operations that affect resources managed by a resource provider. In
+order to do so, the framework has to set the `id` field in the operation.
+Support for operations affecting the agent default resources is [coming
+soon](https://issues.apache.org/jira/browse/MESOS-8194).
 
 ## Profiles
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d58e6351/docs/scheduler-http-api.md
----------------------------------------------------------------------
diff --git a/docs/scheduler-http-api.md b/docs/scheduler-http-api.md
index 3929c33..bea8817 100644
--- a/docs/scheduler-http-api.md
+++ b/docs/scheduler-http-api.md
@@ -34,8 +34,11 @@ The scheduler interacts with Mesos via the [/api/v1/scheduler](endpoints/master/
 
 **Schedulers are expected to keep the subscription connection open as long as possible (barring errors in network, software, hardware, etc.) and incrementally process the response.** HTTP client libraries that can only parse the response after the connection is closed cannot be used. For the encoding used, please refer to **Events** section below.
 
-All subsequent (non-`SUBSCRIBE`) requests to the "/scheduler" endpoint (see details below in **Calls** section) must be sent using a different connection than the one used for subscription. The master responds to these HTTP POST requests with "202 Accepted" status codes (or, for unsuccessful requests, with 4xx or 5xx status codes; details in later sections). The "202 Accepted" response means that a request has been accepted for processing, not that the processing of the request has been completed. The request might or might not be acted upon by Mesos (e.g., master fails during the processing of the request). Any asynchronous responses from these requests will be streamed on the long-lived subscription connection. Schedulers can submit requests using more than one different HTTP connection.
+All subsequent (non-`SUBSCRIBE`) requests to the "/scheduler" endpoint (see details below in **Calls** section) must be sent using a different connection than the one used for subscription. Schedulers can submit requests using more than one different HTTP connection.
 
+The master responds to HTTP POST requests that require asynchronous processing with status **202 Accepted** (or, for unsuccessful requests, with 4xx or 5xx status codes; details in later sections). The **202 Accepted** response means that a request has been accepted for processing, not that the processing of the request has been completed. The request might or might not be acted upon by Mesos (e.g., master fails during the processing of the request). Any asynchronous responses from these requests will be streamed on the long-lived subscription connection.
+
+The master responds to HTTP POST requests that can be answered synchronously and immediately with status **200 OK** (or, for unsuccessful requests, with 4xx or 5xx status codes; details in later sections), possibly including a response body encoded in JSON or Protobuf. The encoding depends on the **Accept** header present in the request (the default encoding is JSON).
 
 ## Calls
 
@@ -171,6 +174,10 @@ The scheduler API uses `Filters.refuse_seconds` to specify the duration for whic
 
 NOTE: Mesos will cap `Filters.refuse_seconds` at 31536000 seconds (365 days).
 
+The master will send task status updates in response to `LAUNCH` and `LAUNCH_GROUP` operations. For other types of operations, if an operation ID is specified, the master will send operation status updates in response.
+
+NOTE: For the time being, an operation ID can only be set if the operation affects resources provided by a [resource provider](csi.md#resource-providers). See [MESOS-8194](https://issues.apache.org/jira/browse/MESOS-8371) for more details.
+
 ```
 ACCEPT Request (JSON):
 POST /api/v1/scheduler  HTTP/1.1
@@ -358,6 +365,33 @@ HTTP/1.1 202 Accepted
 
 ```
 
+### ACKNOWLEDGE_OPERATION_STATUS
+Sent by the scheduler to acknowledge an operation status update. Schedulers are responsible for explicitly acknowledging the receipt of status updates that have `status.uuid` set. These status updates are retried until they are acknowledged by the scheduler. The scheduler must not acknowledge status updates that do not have `status.uuid` set, as they are not retried. The `uuid` field contains raw bytes encoded in Base64.
+
+```
+ACKNOWLEDGE_OPERATION_STATUS Request (JSON):
+POST /api/v1/scheduler  HTTP/1.1
+
+Host: masterhost:5050
+Content-Type: application/json
+Mesos-Stream-Id: 130ae4e3-6b13-4ef4-baa9-9f2e85c3e9af
+
+{
+  "framework_id": { "value": "12220-3440-12532-2345" },
+  "type": "ACKNOWLEDGE_OPERATION_STATUS",
+  "acknowledge_operation_status": {
+    "agent_id": { "value": "12220-3440-12532-S1233" },
+    "resource_provider_id": { "value": "12220-3440-12532-rp" },
+    "uuid": "jhadf73jhakdlfha723adf",
+    "operation_id": "73jhakdlfha723adf"
+  }
+}
+
+ACKNOWLEDGE_OPERATION_STATUS Response:
+HTTP/1.1 202 Accepted
+
+```
+
 ### RECONCILE
 Sent by the scheduler to query the status of non-terminal tasks. This causes the master to send back `UPDATE` events for each task in the list. Tasks that are no longer known to Mesos will result in `TASK_LOST` updates. If the list of tasks is empty, master will send `UPDATE` events for all currently known tasks of the framework.
 
@@ -386,6 +420,51 @@ HTTP/1.1 202 Accepted
 
 ```
 
+### RECONCILE_OPERATIONS
+Sent by the scheduler to query the status of non-terminal operations. The master will respond with a `RECONCILE_OPERATIONS` response containing the status of each operation in the list. If the list of operations is empty, the master will include in the response all currently known operations of the framework.
+
+```
+RECONCILE_OPERATIONS Request (JSON):
+POST /api/v1/scheduler   HTTP/1.1
+
+Host: masterhost:5050
+Content-Type: application/json
+Accept: application/json
+Mesos-Stream-Id: 130ae4e3-6b13-4ef4-baa9-9f2e85c3e9af
+
+{
+  "framework_id": { "value": "12220-3440-12532-2345" },
+  "type": "RECONCILE_OPERATIONS",
+  "reconcile_operations": {
+    "operations": [
+      {
+        "operation_id": { "value": "312325" },
+        "agent_id": { "value": "123535" }
+      }
+    ]
+  }
+}
+
+RECONCILE_OPERATIONS Response:
+HTTP/1.1 200 Accepted
+
+Content-Type: application/json
+
+{
+  "type": "RECONCILE_OPERATIONS",
+  "reconcile_operations": {
+    "operation_statuses": [
+      {
+        "operation_id": { "value": "312325" },
+        "state": "OPERATION_PENDING",
+        "uuid": "adfadfadbhgvjayd23r2uahj"
+      }
+    ]
+  }
+}
+
+```
+
 ### MESSAGE
 Sent by the scheduler to send arbitrary binary data to the executor. Mesos neither interprets this data nor makes any guarantees about the delivery of this message to the executor. `data` is raw bytes encoded in Base64.
 


[04/13] mesos git commit: Cleaned up internal evolve functions.

Posted by gr...@apache.org.
Cleaned up internal evolve functions.

This patch removes the `mesos::` prefix from declarations where
possible, uses a consistent number of empty lines in the header,
and reorders some declarations.

Review: https://reviews.apache.org/r/66489/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c265ae60
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c265ae60
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c265ae60

Branch: refs/heads/master
Commit: c265ae60b9976d16e74736704f8005029d4137a8
Parents: 50b315a
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:43:26 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:48:21 2018 -0700

----------------------------------------------------------------------
 src/internal/evolve.cpp | 11 +++++------
 src/internal/evolve.hpp | 18 ++++++++++--------
 2 files changed, 15 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c265ae60/src/internal/evolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp
index 6e8bf6a..5ef5838 100644
--- a/src/internal/evolve.cpp
+++ b/src/internal/evolve.cpp
@@ -217,19 +217,19 @@ v1::UUID evolve(const UUID& uuid)
 }
 
 
-v1::agent::Call evolve(const mesos::agent::Call& call)
+v1::agent::Call evolve(const agent::Call& call)
 {
   return evolve<v1::agent::Call>(call);
 }
 
 
-v1::agent::ProcessIO evolve(const mesos::agent::ProcessIO& processIO)
+v1::agent::ProcessIO evolve(const agent::ProcessIO& processIO)
 {
   return evolve<v1::agent::ProcessIO>(processIO);
 }
 
 
-v1::agent::Response evolve(const mesos::agent::Response& response)
+v1::agent::Response evolve(const agent::Response& response)
 {
   return evolve<v1::agent::Response>(response);
 }
@@ -253,14 +253,13 @@ v1::master::Response evolve(const mesos::master::Response& response)
 }
 
 
-v1::resource_provider::Call evolve(const mesos::resource_provider::Call& call)
+v1::resource_provider::Call evolve(const resource_provider::Call& call)
 {
   return evolve<v1::resource_provider::Call>(call);
 }
 
 
-v1::resource_provider::Event evolve(
-    const mesos::resource_provider::Event& event)
+v1::resource_provider::Event evolve(const resource_provider::Event& event)
 {
   return evolve<v1::resource_provider::Event>(event);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/c265ae60/src/internal/evolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.hpp b/src/internal/evolve.hpp
index 388c4f8..e792ff5 100644
--- a/src/internal/evolve.hpp
+++ b/src/internal/evolve.hpp
@@ -84,20 +84,19 @@ v1::TaskInfo evolve(const TaskInfo& taskInfo);
 v1::TaskStatus evolve(const TaskStatus& status);
 v1::UUID evolve(const UUID& uuid);
 
-v1::agent::Call evolve(const mesos::agent::Call& call);
-v1::agent::ProcessIO evolve(const mesos::agent::ProcessIO& processIO);
-v1::agent::Response evolve(const mesos::agent::Response& response);
+
+v1::agent::Call evolve(const agent::Call& call);
+v1::agent::ProcessIO evolve(const agent::ProcessIO& processIO);
+v1::agent::Response evolve(const agent::Response& response);
+
 
 v1::maintenance::ClusterStatus evolve(
     const maintenance::ClusterStatus& cluster);
 v1::maintenance::Schedule evolve(const maintenance::Schedule& schedule);
 
-v1::master::Response evolve(const mesos::master::Response& response);
-
 
-v1::resource_provider::Call evolve(const mesos::resource_provider::Call& call);
-v1::resource_provider::Event evolve(
-    const mesos::resource_provider::Event& event);
+v1::resource_provider::Call evolve(const resource_provider::Call& call);
+v1::resource_provider::Event evolve(const resource_provider::Event& event);
 
 
 // Helper for repeated field evolving to 'T1' from 'T2'.
@@ -135,6 +134,7 @@ v1::scheduler::Event evolve(const RescindResourceOfferMessage& message);
 v1::scheduler::Event evolve(const StatusUpdateMessage& message);
 v1::scheduler::Event evolve(const UpdateOperationStatusMessage& message);
 
+
 v1::executor::Call evolve(const executor::Call& call);
 v1::executor::Event evolve(const executor::Event& event);
 
@@ -150,6 +150,7 @@ v1::executor::Event evolve(const StatusUpdateAcknowledgementMessage& message);
 
 
 v1::master::Event evolve(const mesos::master::Event& event);
+v1::master::Response evolve(const mesos::master::Response& response);
 
 
 // Before the v1 API we had REST endpoints that returned JSON. The JSON was not
@@ -170,6 +171,7 @@ v1::master::Response evolve(const JSON::Object& object);
 template <v1::agent::Response::Type T>
 v1::agent::Response evolve(const JSON::Object& object);
 
+
 template <v1::agent::Response::Type T>
 v1::agent::Response evolve(const JSON::Array& array);
 


[11/13] mesos git commit: Added a `call()` method to the v1 scheduler library.

Posted by gr...@apache.org.
Added a `call()` method to the v1 scheduler library.

This patch adds a `call()` method to the scheduler library that allows
clients to send a `v1::scheduler::Call` to the master and receive a
`v1::scheduler::Response`.

It will be used to test operation state reconciliation.

Review: https://reviews.apache.org/r/66460/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c39ef695
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c39ef695
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c39ef695

Branch: refs/heads/master
Commit: c39ef69514e57ca7c90e764a4a617abf88cd144f
Parents: 949b44e
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:43:56 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:50:00 2018 -0700

----------------------------------------------------------------------
 include/mesos/v1/scheduler.hpp                  |  39 +++++++
 include/mesos/v1/scheduler/scheduler.proto      |  34 ++++++
 .../org_apache_mesos_v1_scheduler_V0Mesos.cpp   |   9 ++
 src/scheduler/scheduler.cpp                     | 111 +++++++++++++++++++
 4 files changed, 193 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c39ef695/include/mesos/v1/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/scheduler.hpp b/include/mesos/v1/scheduler.hpp
index d56e088..b1dfbb1 100644
--- a/include/mesos/v1/scheduler.hpp
+++ b/include/mesos/v1/scheduler.hpp
@@ -28,6 +28,10 @@
 
 #include <mesos/v1/scheduler/scheduler.hpp>
 
+#include <process/future.hpp>
+
+#include <stout/option.hpp>
+
 namespace mesos {
 
 namespace master {
@@ -48,6 +52,7 @@ public:
   // Empty virtual destructor (necessary to instantiate subclasses).
   virtual ~MesosBase() {}
   virtual void send(const Call& call) = 0;
+  virtual process::Future<APIResult> call(const Call& callMessage) = 0;
   virtual void reconnect() = 0;
 };
 
@@ -94,6 +99,40 @@ public:
   // disconnected).
   virtual void send(const Call& call) override;
 
+  // Attempts to send a call to the master, returning the response.
+  //
+  // The scheduler should only invoke this method once it has received the
+  // 'connected' callback. Otherwise, a `Failure` will be returned.
+  //
+  // Some local validation of calls is performed, and the request will not be
+  // sent to the master if the validation fails.
+  //
+  // A `Failure` will be returned on validation failures or if an error happens
+  // when sending the request to the master, e.g., a master disconnection, or a
+  // deserialization error.
+  //
+  // If it was possible to receive a response from the server, the returned
+  // object will contain the HTTP response status code.
+  //
+  // There are three cases to consider depending on the HTTP response status
+  // code:
+  //
+  //  (1) '202 ACCEPTED': Indicates the call was accepted for processing and
+  //      neither `APIResult::response` nor `APIResult::error` will be set.
+  //
+  //  (2) '200 OK': Indicates the call completed successfully.
+  //      `APIResult::response` will be set if the `scheduler::Call::Type`
+  //      has a corresponding `scheduler::Response::Type`, `APIResult::error`
+  //      will not be set.
+  //
+  //  (3) For all other HTTP status codes, the `APIResult::response` field will
+  //      not be set and the `APIResult::error` field may be set to provide more
+  //      information.
+  //
+  // Note: This method cannot be used to send `SUBSCRIBE` calls, use `send()`
+  // instead.
+  virtual process::Future<APIResult> call(const Call& callMessage) override;
+
   // Force a reconnection with the master.
   //
   // In the case of a one-way network partition, the connection between the

http://git-wip-us.apache.org/repos/asf/mesos/blob/c39ef695/include/mesos/v1/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/scheduler/scheduler.proto b/include/mesos/v1/scheduler/scheduler.proto
index b912901..fcfec5e 100644
--- a/include/mesos/v1/scheduler/scheduler.proto
+++ b/include/mesos/v1/scheduler/scheduler.proto
@@ -495,3 +495,37 @@ message Call {
   optional Request request = 11;
   optional Suppress suppress = 16;
 }
+
+/**
+ * This message is used by the C++ Scheduler HTTP API library as the return
+ * type of the `call()` method. The message includes the HTTP status code with
+ * which the master responded, and optionally a `scheduler::Response` message.
+ *
+ * There are three cases to consider depending on the HTTP response status code:
+ *
+ *  (1) '202 ACCEPTED': Indicates the call was accepted for processing and
+ *        neither `response` nor `error` will be set.
+ *
+ *  (2) '200 OK': Indicates the call completed successfully, and the `response`
+ *        field will be set if the `scheduler::Call::Type` has a corresponding
+ *        `scheduler::Response::Type`; `error` will not be set.
+ *
+ *  (3) For all other HTTP status codes, the `response` field will not be set
+ *      and the `error` field may be set to provide more information.
+ *
+ * NOTE: This message is used by the C++ Scheduler HTTP API library and is not
+ * part of the API specification.
+ */
+message APIResult {
+  // HTTP status code with which the master responded.
+  required uint32 status_code = 1;
+
+  // This field will only be set if the call completed successfully and the
+  // master responded with `200 OK` and a non-empty body.
+  optional Response response = 2;
+
+  // This field will only be set if the call did not complete successfully and
+  // the master responded with a status other than `202 Accepted` or `200 OK`,
+  // and with a non-empty body.
+  optional string error = 3;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/c39ef695/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp b/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
index 60b17b9..ea8d54f 100644
--- a/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
+++ b/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
@@ -28,6 +28,7 @@
 #include <process/clock.hpp>
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
+#include <process/future.hpp>
 #include <process/id.hpp>
 #include <process/owned.hpp>
 #include <process/process.hpp>
@@ -134,6 +135,14 @@ public:
     UNREACHABLE();
   }
 
+  virtual process::Future<v1::scheduler::APIResult> call(
+      const v1::scheduler::Call& callMessage) override
+  {
+    // The driver does not support sending a `v1::scheduler::Call` that returns
+    // a `v1::scheduler::Response`.
+    UNREACHABLE();
+  }
+
   process::Owned<V0ToV1AdapterProcess> process;
 
 private:

http://git-wip-us.apache.org/repos/asf/mesos/blob/c39ef695/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index ecef916..c0dff53 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -114,6 +114,7 @@ using mesos::internal::recordio::Reader;
 using mesos::master::detector::MasterDetector;
 
 using process::collect;
+using process::Failure;
 using process::Owned;
 using process::wait; // Necessary on some OS's to disambiguate.
 
@@ -263,6 +264,49 @@ public:
       .onAny(defer(self(), &Self::_send, call, lambda::_1));
   }
 
+  Future<APIResult> call(const Call& callMessage)
+  {
+    Option<Error> error =
+      validation::scheduler::call::validate(devolve(callMessage));
+
+    if (error.isSome()) {
+      return Failure(error->message);
+    }
+
+    if (callMessage.type() == Call::SUBSCRIBE) {
+      return Failure("This method doesn't support SUBSCRIBE calls");
+    }
+
+    if (state != SUBSCRIBED) {
+      return Failure(
+          "Cannot perform calls until subscribed. Current state: " +
+          stringify(state));
+    }
+
+    VLOG(1) << "Sending " << callMessage.type() << " call to " << master.get();
+
+    // TODO(vinod): Add support for sending MESSAGE calls directly
+    // to the slave, instead of relaying it through the master, as
+    // the scheduler driver does.
+
+    process::http::Request request;
+    request.method = "POST";
+    request.url = master.get();
+    request.body = serialize(contentType, callMessage);
+    request.keepAlive = true;
+    request.headers = {{"Accept", stringify(contentType)},
+                       {"Content-Type", stringify(contentType)}};
+
+    // TODO(tillt): Add support for multi-step authentication protocols.
+    return authenticatee->authenticate(request, credential)
+      .recover([](const Future<process::http::Request>& future) {
+        return Failure(
+            stringify("HTTP authenticatee ") +
+            (future.isFailed() ? "failed: " + future.failure() : "discarded"));
+      })
+      .then(defer(self(), &Self::_call, callMessage, lambda::_1));
+  }
+
   void reconnect()
   {
     // Ignore the reconnection request if we are currently disconnected
@@ -675,6 +719,68 @@ protected:
           response->body + ") for " + stringify(call.type()));
   }
 
+  Future<APIResult> _call(
+      const Call& callMessage,
+      process::http::Request request)
+  {
+    if (connections.isNone()) {
+      return Failure("Connection to master interrupted");
+    }
+
+    Future<process::http::Response> response;
+
+    CHECK_SOME(streamId);
+
+    // Set the stream ID associated with this connection.
+    request.headers["Mesos-Stream-Id"] = streamId->toString();
+
+    CHECK_SOME(connectionId);
+
+    return connections->nonSubscribe.send(request)
+      .then(defer(self(),
+                  &Self::__call,
+                  callMessage,
+                  lambda::_1));
+  }
+
+  Future<APIResult> __call(
+      const Call& callMessage,
+      const process::http::Response& response)
+  {
+    APIResult result;
+
+    result.set_status_code(response.code);
+
+    if (response.code == process::http::Status::ACCEPTED) {
+      // "202 Accepted" responses are asynchronously processed, so the body
+      // should be empty.
+      if (!response.body.empty()) {
+        LOG(WARNING) << "Response for " << callMessage.type()
+                     << " unexpectedly included body: '" << response.body
+                     << "'";
+      }
+    } else if (response.code == process::http::Status::OK) {
+      if (!response.body.empty()) {
+        Try<Response> deserializedResponse =
+          deserialize<Response>(contentType, response.body);
+
+        if (deserializedResponse.isError()) {
+          return Failure(
+              "Failed to deserialize the response '" + response.status + "'" +
+              " (" + response.body + "): " + deserializedResponse.error());
+        }
+
+        *result.mutable_response() = deserializedResponse.get();
+      }
+    } else {
+      result.set_error(
+          "Received unexpected '" + response.status + "'" + " (" +
+          response.body + ")");
+    }
+
+    return result;
+  }
+
   void read()
   {
     subscribed->decoder->read()
@@ -917,6 +1023,11 @@ void Mesos::send(const Call& call)
   dispatch(process, &MesosProcess::send, call);
 }
 
+Future<APIResult> Mesos::call(const Call& callMessage)
+{
+  return dispatch(process, &MesosProcess::call, callMessage);
+}
+
 
 void Mesos::reconnect()
 {


[03/13] mesos git commit: Added an evolve function for `v1::scheduler::Response`.

Posted by gr...@apache.org.
Added an evolve function for `v1::scheduler::Response`.

Review: https://reviews.apache.org/r/66461/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/50b315a8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/50b315a8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/50b315a8

Branch: refs/heads/master
Commit: 50b315a8271a221b1f83f558bc29a19ee39d32cc
Parents: 4e55884
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:43:25 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:48:03 2018 -0700

----------------------------------------------------------------------
 src/internal/evolve.cpp | 6 ++++++
 src/internal/evolve.hpp | 5 ++---
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/50b315a8/src/internal/evolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp
index f598ea1..6e8bf6a 100644
--- a/src/internal/evolve.cpp
+++ b/src/internal/evolve.cpp
@@ -500,6 +500,12 @@ v1::executor::Event evolve(const executor::Event& event)
 }
 
 
+v1::scheduler::Response evolve(const scheduler::Response& response)
+{
+  return evolve<v1::scheduler::Response>(response);
+}
+
+
 v1::executor::Event evolve(const ExecutorRegisteredMessage& message)
 {
   v1::executor::Event event;

http://git-wip-us.apache.org/repos/asf/mesos/blob/50b315a8/src/internal/evolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.hpp b/src/internal/evolve.hpp
index e00ac71..388c4f8 100644
--- a/src/internal/evolve.hpp
+++ b/src/internal/evolve.hpp
@@ -100,9 +100,6 @@ v1::resource_provider::Event evolve(
     const mesos::resource_provider::Event& event);
 
 
-v1::scheduler::Call evolve(const scheduler::Call& call);
-
-
 // Helper for repeated field evolving to 'T1' from 'T2'.
 template <typename T1, typename T2>
 google::protobuf::RepeatedPtrField<T1> evolve(
@@ -118,7 +115,9 @@ google::protobuf::RepeatedPtrField<T1> evolve(
 }
 
 
+v1::scheduler::Call evolve(const scheduler::Call& call);
 v1::scheduler::Event evolve(const scheduler::Event& event);
+v1::scheduler::Response evolve(const scheduler::Response& response);
 
 
 // Helper functions that evolve old style internal messages to a


[07/13] mesos git commit: Implemented operation status reconciliation.

Posted by gr...@apache.org.
Implemented operation status reconciliation.

Review: https://reviews.apache.org/r/66464/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d2616908
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d2616908
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d2616908

Branch: refs/heads/master
Commit: d26169081699b6bc654113ae7ea980e55cd5f67d
Parents: c7c3848
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Apr 23 13:43:41 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Apr 23 13:49:07 2018 -0700

----------------------------------------------------------------------
 include/mesos/v1/scheduler/scheduler.proto |   6 ++
 src/master/http.cpp                        |  32 +++++--
 src/master/master.cpp                      | 108 +++++++++++++++++++++++-
 src/master/master.hpp                      |  21 ++++-
 src/messages/messages.proto                |   7 +-
 5 files changed, 159 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/include/mesos/v1/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/scheduler/scheduler.proto b/include/mesos/v1/scheduler/scheduler.proto
index 25ebcfc..b912901 100644
--- a/include/mesos/v1/scheduler/scheduler.proto
+++ b/include/mesos/v1/scheduler/scheduler.proto
@@ -426,7 +426,13 @@ message Call {
   message ReconcileOperations {
     message Operation {
       required OperationID operation_id = 1;
+
+      // If `agent_id` is not set and the master doesn't know the operation,
+      // then it will return `OPERATION_UNKNOWN`; if `agent_id` is set, it can
+      // return more fine-grained states depending on the state of the
+      // corresponding agent.
       optional AgentID agent_id = 2;
+
       optional ResourceProviderID resource_provider_id = 3;
     }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 34c9023..135ae43 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -956,12 +956,14 @@ Future<Response> Master::Http::scheduler(
     return BadRequest("Failed to validate scheduler::Call: " + error->message);
   }
 
-  if (call.type() == scheduler::Call::SUBSCRIBE) {
-    // We default to JSON 'Content-Type' in the response since an
-    // empty 'Accept' header results in all media types considered
-    // acceptable.
-    ContentType acceptType = ContentType::JSON;
+  ContentType acceptType;
 
+  // Ideally this handler would be consistent with the Operator API handler
+  // and determine the accept type regardless of the type of request.
+  // However, to maintain backwards compatibility, it determines the accept
+  // type only if the response will not be empty.
+  if (call.type() == scheduler::Call::SUBSCRIBE ||
+      call.type() == scheduler::Call::RECONCILE_OPERATIONS) {
     if (request.acceptsMediaType(APPLICATION_JSON)) {
       acceptType = ContentType::JSON;
     } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
@@ -971,7 +973,9 @@ Future<Response> Master::Http::scheduler(
           string("Expecting 'Accept' to allow ") +
           "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
     }
+  }
 
+  if (call.type() == scheduler::Call::SUBSCRIBE) {
     // Make sure that a stream ID was not included in the request headers.
     if (request.headers.contains("Mesos-Stream-Id")) {
       return BadRequest(
@@ -1110,9 +1114,9 @@ Future<Response> Master::Http::scheduler(
       master->reconcile(framework, std::move(*call.mutable_reconcile()));
       return Accepted();
 
-    // TODO(greggomann): Implement operation reconciliation.
     case scheduler::Call::RECONCILE_OPERATIONS:
-      return Forbidden("Operation reconciliation is not yet implemented");
+      return reconcileOperations(
+          framework, call.reconcile_operations(), acceptType);
 
     case scheduler::Call::MESSAGE:
       master->message(framework, std::move(*call.mutable_message()));
@@ -5089,6 +5093,20 @@ Future<Response> Master::Http::_markAgentGone(const SlaveID& slaveId) const
   });
 }
 
+
+Future<Response> Master::Http::reconcileOperations(
+    Framework* framework,
+    const scheduler::Call::ReconcileOperations& call,
+    ContentType contentType) const
+{
+  mesos::scheduler::Response response;
+  response.set_type(mesos::scheduler::Response::RECONCILE_OPERATIONS);
+  *response.mutable_reconcile_operations() =
+    master->reconcileOperations(framework, call);
+
+  return OK(serialize(contentType, evolve(response)), stringify(contentType));
+}
+
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index ada7709..545a4d7 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -8837,10 +8837,112 @@ void Master::reconcile(
 }
 
 
-// TODO(greggomann): Implement operation update reconciliation.
-void Master::reconcileOperations(
+scheduler::Response::ReconcileOperations Master::reconcileOperations(
     Framework* framework,
-    const scheduler::Call::ReconcileOperations& reconcile) {}
+    const scheduler::Call::ReconcileOperations& reconcile)
+{
+  CHECK_NOTNULL(framework);
+
+  ++metrics->messages_reconcile_operations;
+
+  scheduler::Response::ReconcileOperations response;
+
+  if (reconcile.operations_size() == 0) {
+    // Implicit reconciliation.
+    LOG(INFO) << "Performing implicit operation state reconciliation"
+                 " for framework " << *framework;
+
+    response.mutable_operation_statuses()->Reserve(
+        framework->operations.size());
+
+    foreachvalue (Operation* operation, framework->operations) {
+      if (operation->statuses().empty()) {
+        // This can happen if the operation is pending.
+        response.add_operation_statuses()->CopyFrom(operation->latest_status());
+      } else {
+        response.add_operation_statuses()->CopyFrom(
+            *operation->statuses().rbegin());
+      }
+    }
+
+    return response;
+  }
+
+  // Explicit reconciliation.
+  LOG(INFO) << "Performing explicit operation state reconciliation for "
+            << reconcile.operations_size() << " operations of framework "
+            << *framework;
+
+  // Explicit reconciliation occurs for the following cases:
+  //   (1) Operation is known: the latest status sent to the framework.
+  //   (2) Operation is unknown, slave is recovered: OPERATION_RECOVERING.
+  //   (3) Operation is unknown, slave is registered: OPERATION_UNKNOWN.
+  //   (4) Operation is unknown, slave is unreachable: OPERATION_UNREACHABLE.
+  //   (5) Operation is unknown, slave is gone: OPERATION_GONE_BY_OPERATOR.
+  //   (6) Operation is unknown, slave is unknown: OPERATION_UNKNOWN.
+  //   (7) Operation is unknown, slave ID is not specified: OPERATION_UNKNOWN.
+
+  foreach (const scheduler::Call::ReconcileOperations::Operation& operation,
+           reconcile.operations()) {
+    Option<SlaveID> slaveId = None();
+    if (operation.has_slave_id()) {
+      slaveId = operation.slave_id();
+    }
+
+    Option<Operation*> frameworkOperation =
+      framework->getOperation(operation.operation_id());
+
+    OperationStatus* status = response.add_operation_statuses();
+    if (frameworkOperation.isSome()) {
+      // (1) Operation is known: resend the latest status sent to the framework.
+      if (frameworkOperation.get()->statuses().empty()) {
+        // This can happen if the operation is pending.
+        *status = frameworkOperation.get()->latest_status();
+      } else {
+        *status = *frameworkOperation.get()->statuses().rbegin();
+      }
+    } else if (slaveId.isSome() && slaves.recovered.contains(slaveId.get())) {
+      // (2) Operation is unknown, slave is recovered: OPERATION_RECOVERING.
+      *status = protobuf::createOperationStatus(
+          OperationState::OPERATION_RECOVERING,
+          operation.operation_id(),
+          "Reconciliation: Agent is recovered but has not re-registered");
+    } else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) {
+      // (3) Operation is unknown, slave is registered: OPERATION_UNKNOWN.
+      *status = protobuf::createOperationStatus(
+          OperationState::OPERATION_UNKNOWN,
+          operation.operation_id(),
+          "Reconciliation: Operation is unknown");
+    } else if (slaveId.isSome() && slaves.unreachable.contains(slaveId.get())) {
+      // (4) Operation is unknown, slave is unreachable: OPERATION_UNREACHABLE.
+      *status = protobuf::createOperationStatus(
+          OperationState::OPERATION_UNREACHABLE,
+          operation.operation_id(),
+          "Reconciliation: Agent is unreachable");
+    } else if (slaveId.isSome() && slaves.gone.contains(slaveId.get())) {
+      // (5) Operation is unknown, slave is gone: OPERATION_GONE_BY_OPERATOR.
+      *status = protobuf::createOperationStatus(
+          OperationState::OPERATION_GONE_BY_OPERATOR,
+          operation.operation_id(),
+          "Reconciliation: Agent marked gone by operator");
+    } else if (slaveId.isSome()) {
+      // (6) Operation is unknown, slave is unknown: OPERATION_UNKNOWN.
+      *status = protobuf::createOperationStatus(
+          OperationState::OPERATION_UNKNOWN,
+          operation.operation_id(),
+          "Reconciliation: Both operation and agent are unknown");
+    } else {
+      // (7) Operation is unknown, slave is unknown: OPERATION_UNKNOWN.
+      *status = protobuf::createOperationStatus(
+          OperationState::OPERATION_UNKNOWN,
+          operation.operation_id(),
+          "Reconciliation: Operation is unknown and no 'agent_id' was"
+          " provided");
+    }
+  }
+
+  return response;
+}
 
 
 void Master::frameworkFailoverTimeout(const FrameworkID& frameworkId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0d9620d..a7cadd9 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1076,7 +1076,7 @@ private:
       Framework* framework,
       scheduler::Call::Reconcile&& reconcile);
 
-  void reconcileOperations(
+  scheduler::Response::ReconcileOperations reconcileOperations(
       Framework* framework,
       const scheduler::Call::ReconcileOperations& reconcile);
 
@@ -1734,6 +1734,11 @@ private:
     process::Future<process::http::Response> _markAgentGone(
         const SlaveID& slaveId) const;
 
+    process::Future<process::http::Response> reconcileOperations(
+        Framework* framework,
+        const scheduler::Call::ReconcileOperations& call,
+        ContentType contentType) const;
+
     Master* master;
 
     // NOTE: The quota specific pieces of the Operator API are factored
@@ -2549,6 +2554,20 @@ struct Framework
     }
   }
 
+  Option<Operation*> getOperation(const OperationID& id) {
+    Option<UUID> uuid = operationUUIDs.get(id);
+
+    if (uuid.isNone()) {
+      return None();
+    }
+
+    Option<Operation*> operation = operations.get(uuid.get());
+
+    CHECK_SOME(operation);
+
+    return operation;
+  }
+
   void recoverResources(Operation* operation)
   {
     CHECK(operation->has_slave_id())

http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 556801d..41e6a8a 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -471,10 +471,9 @@ message ReconcileTasksMessage {
 
 
 /**
- * The master uses this message to query an agent about the state of
- * one or more operations. This is useful to resolve
- * discrepancies between the master and agent's view after agent
- * reregistration.
+ * The master uses this message to query an agent about the state of one or
+ * more operations. This is useful to resolve discrepancies between the master
+ * and agent's view after agent reregistration.
  */
 message ReconcileOperationsMessage {
   message Operation {