You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/21 05:45:46 UTC
[3/6] mesos git commit: Renamed offer operation to operation.
http://git-wip-us.apache.org/repos/asf/mesos/blob/760ab18c/src/tests/offer_operation_status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/offer_operation_status_update_manager_tests.cpp b/src/tests/offer_operation_status_update_manager_tests.cpp
deleted file mode 100644
index 37bea36..0000000
--- a/src/tests/offer_operation_status_update_manager_tests.cpp
+++ /dev/null
@@ -1,913 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include <string>
-
-#include <gtest/gtest.h>
-
-#include <mesos/v1/mesos.hpp>
-
-#include <process/clock.hpp>
-#include <process/future.hpp>
-#include <process/gmock.hpp>
-#include <process/owned.hpp>
-
-#include <stout/lambda.hpp>
-#include <stout/os.hpp>
-#include <stout/path.hpp>
-#include <stout/protobuf.hpp>
-#include <stout/uuid.hpp>
-
-#include <stout/os/ftruncate.hpp>
-
-#include "slave/constants.hpp"
-
-#include "tests/mesos.hpp"
-#include "tests/utils.hpp"
-
-#include "status_update_manager/offer_operation.hpp"
-
-using lambda::function;
-
-using process::Clock;
-using process::Future;
-using process::Owned;
-using process::Promise;
-
-using std::string;
-
-namespace mesos {
-namespace internal {
-namespace tests {
-
-// This class will be the target of the forward callbacks passed to the offer
-// operation status update managers in this test suite.
-//
-// It uses gmock, so that we can easily set expectations and check how
-// often/which status updates are forwarded.
-class MockOfferOperationStatusUpdateProcessor
-{
-public:
- MOCK_METHOD1(update, void(const OfferOperationStatusUpdate&));
-};
-
-
-class OfferOperationStatusUpdateManagerTest : public MesosTest
-{
-protected:
- OfferOperationStatusUpdateManagerTest()
- : statusUpdateManager(new OfferOperationStatusUpdateManager())
- {
- Clock::pause();
-
- const function<void(const OfferOperationStatusUpdate&)> forward =
- [&](const OfferOperationStatusUpdate& update) {
- statusUpdateProcessor.update(update);
- };
-
- statusUpdateManager->initialize(
- forward, OfferOperationStatusUpdateManagerTest::getPath);
- }
-
- void TearDown() override
- {
- Clock::resume();
- statusUpdateManager.reset();
-
- MesosTest::TearDown();
- }
-
- OfferOperationStatusUpdate createOfferOperationStatusUpdate(
- const id::UUID& statusUuid,
- const id::UUID& operationUuid,
- const OfferOperationState& state,
- const Option<FrameworkID>& frameworkId = None())
- {
- OfferOperationStatusUpdate statusUpdate;
-
- statusUpdate.mutable_operation_uuid()->set_value(operationUuid.toBytes());
-
- if (frameworkId.isSome()) {
- statusUpdate.mutable_framework_id()->CopyFrom(frameworkId.get());
- }
-
- OfferOperationStatus* status = statusUpdate.mutable_status();
- status->set_state(state);
- status->mutable_status_uuid()->set_value(statusUuid.toBytes());
-
- return statusUpdate;
- }
-
- void resetStatusUpdateManager()
- {
- statusUpdateManager.reset(new OfferOperationStatusUpdateManager());
-
- const function<void(const OfferOperationStatusUpdate&)> forward =
- [&](const OfferOperationStatusUpdate& update) {
- statusUpdateProcessor.update(update);
- };
-
- statusUpdateManager->initialize(
- forward, OfferOperationStatusUpdateManagerTest::getPath);
- }
-
- static const string getPath(const id::UUID& operationUuid)
- {
- return path::join(os::getcwd(), "streams", operationUuid.toString());
- }
-
- Owned<OfferOperationStatusUpdateManager> statusUpdateManager;
- MockOfferOperationStatusUpdateProcessor statusUpdateProcessor;
-};
-
-
-// This test verifies that the status update manager will not retry a terminal
-// status update after it has been acknowledged.
-TEST_F(OfferOperationStatusUpdateManagerTest, UpdateAndAck)
-{
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
- EXPECT_CALL(statusUpdateProcessor, update(_))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
-
- const id::UUID operationUuid = id::UUID::random();
- const id::UUID statusUuid = id::UUID::random();
-
- OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
- statusUuid, operationUuid, OfferOperationState::OFFER_OPERATION_FINISHED);
-
- // Send a checkpointed offer operation status update.
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
-
- OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
-
- // Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
-
- // Acknowledge the update, this is a terminal update, so `acknowledgement`
- // should return `false`.
- AWAIT_EXPECT_FALSE(
- statusUpdateManager->acknowledgement(operationUuid, statusUuid));
-
- // Advance the clock, the status update has been acknowledged, so it shouldn't
- // trigger a retry.
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-}
-
-
-// This test verifies that the status update manager will not retry a
-// non-terminal status update after it has been acknowledged.
-TEST_F(OfferOperationStatusUpdateManagerTest, UpdateAndAckNonTerminalUpdate)
-{
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
- EXPECT_CALL(statusUpdateProcessor, update(_))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
-
- const id::UUID operationUuid = id::UUID::random();
- const id::UUID statusUuid = id::UUID::random();
-
- OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
- statusUuid, operationUuid, OfferOperationState::OFFER_OPERATION_PENDING);
-
- // Send a checkpointed offer operation status update.
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
-
- OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
-
- // Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
-
- // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
- // should return `true`.
- AWAIT_EXPECT_TRUE(
- statusUpdateManager->acknowledgement(operationUuid, statusUuid));
-
- // Advance the clock, the status update has been acknowledged, so it shouldn't
- // trigger a retry.
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-}
-
-
-// This test verifies that the status update manager resends status updates
-// until they are acknowledged.
-TEST_F(OfferOperationStatusUpdateManagerTest, ResendUnacknowledged)
-{
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate1;
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate2;
- EXPECT_CALL(statusUpdateProcessor, update(_))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate2));
-
- const id::UUID operationUuid = id::UUID::random();
- const id::UUID statusUuid = id::UUID::random();
-
- OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
- statusUuid, operationUuid, OfferOperationState::OFFER_OPERATION_FINISHED);
-
- // Send a checkpointed offer operation status update.
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
-
- OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
-
- // Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
-
- EXPECT_FALSE(forwardedStatusUpdate2.isReady());
-
- // Advance the clock to trigger a retry.
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-
- // Verify that the status update is forwarded again.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
-
- // Acknowledge the update, this is a terminal update, so `acknowledgement`
- // should return `false`.
- AWAIT_EXPECT_FALSE(
- statusUpdateManager->acknowledgement(operationUuid, statusUuid));
-
- // Advance the clock, the status update has been acknowledged, so it shouldn't
- // trigger a retry.
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-}
-
-
-// This test verifies that after the updates belonging to a framework are
-// cleaned up from the status update manager, the status update manager stops
-// resending them.
-TEST_F(OfferOperationStatusUpdateManagerTest, Cleanup)
-{
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
- EXPECT_CALL(statusUpdateProcessor, update(_))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
-
- const id::UUID operationUuid = id::UUID::random();
- const id::UUID statusUuid = id::UUID::random();
-
- FrameworkID frameworkId;
- frameworkId.set_value("frameworkId");
-
- OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
- statusUuid,
- operationUuid,
- OfferOperationState::OFFER_OPERATION_FINISHED,
- frameworkId);
-
- // Send a checkpointed offer operation status update.
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
-
- OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
-
- // Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
-
- // Cleanup the framework.
- statusUpdateManager->cleanup(frameworkId);
-
- // Advance the clock enough to trigger a retry if the update hasn't been
- // cleaned up.
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-}
-
-
-// This test verifies that the status update manager is able to recover
-// checkpointed status updates, and that it resends the recovered updates that
-// haven't been acknowledged.
-TEST_F(OfferOperationStatusUpdateManagerTest, RecoverCheckpointedStream)
-{
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate1;
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate2;
- EXPECT_CALL(statusUpdateProcessor, update(_))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate2));
-
- const id::UUID operationUuid = id::UUID::random();
- const id::UUID statusUuid = id::UUID::random();
-
- OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
- statusUuid,
- operationUuid,
- OfferOperationState::OFFER_OPERATION_FINISHED);
-
- // Send a checkpointed offer operation status update.
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
-
- OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
-
- // Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
-
- resetStatusUpdateManager();
-
- // Advance the clock enough to trigger a retry if the update hasn't been
- // cleaned up.
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-
- EXPECT_FALSE(forwardedStatusUpdate2.isReady());
-
- // Recover the checkpointed stream.
- Future<OfferOperationStatusManagerState> state =
- statusUpdateManager->recover({operationUuid}, true);
-
- AWAIT_READY(state);
-
- EXPECT_EQ(0u, state->errors);
- EXPECT_TRUE(state->streams.contains(operationUuid));
- EXPECT_SOME(state->streams.at(operationUuid));
-
- const Option<OfferOperationStatusUpdate> recoveredUpdate =
- state->streams.at(operationUuid)->updates.front();
-
- ASSERT_SOME(recoveredUpdate);
- EXPECT_EQ(statusUpdate, recoveredUpdate.get());
-
- // The stream should NOT be terminated.
- EXPECT_FALSE(state->streams.at(operationUuid)->terminated);
-
- // Check that the status update is resent.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
-}
-
-
-// This test verifies that the status update manager returns a `Failure` when
-// trying to recover a stream that isn't checkpointed.
-TEST_F(OfferOperationStatusUpdateManagerTest, RecoverNotCheckpointedStream)
-{
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
- EXPECT_CALL(statusUpdateProcessor, update(_))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
-
- const id::UUID operationUuid = id::UUID::random();
- const id::UUID statusUuid = id::UUID::random();
-
- OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
- statusUuid,
- operationUuid,
- OfferOperationState::OFFER_OPERATION_FINISHED);
-
- // Send a non-checkpointed offer operation status update.
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, false));
-
- OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
-
- // Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
-
- // Verify that the stream file is NOT created.
- EXPECT_TRUE(!os::exists(getPath(operationUuid)));
-
- resetStatusUpdateManager();
-
- // Trying to recover the non-checkpointed stream should fail.
- AWAIT_EXPECT_FAILED(statusUpdateManager->recover({operationUuid}, true));
-}
-
-
-// This test verifies that the status update manager doesn't return a
-// `Failure` when trying to recover a stream from an empty file.
-//
-// This can happen when the checkpointing failed between opening the file and
-// writing the first update. In this case `recover()` should succeed, but return
-// `None` as the operation's state.
-TEST_F(OfferOperationStatusUpdateManagerTest, RecoverEmptyFile)
-{
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
- EXPECT_CALL(statusUpdateProcessor, update(_))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
-
- const id::UUID operationUuid = id::UUID::random();
- const id::UUID statusUuid = id::UUID::random();
-
- OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
- statusUuid,
- operationUuid,
- OfferOperationState::OFFER_OPERATION_FINISHED);
-
- // Send a checkpointed offer operation status update.
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
-
- OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
-
- // Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
-
- resetStatusUpdateManager();
-
- // Advance the clock enough to trigger a retry if the update hasn't been
- // cleaned up.
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-
- // Truncate the status updates to zero bytes, simulating a failure in
- // checkpointing, where the file was created, but the status update manager
- // crashed before any data was written to disk.
- Try<int_fd> fd = os::open(getPath(operationUuid), O_RDWR);
- ASSERT_SOME(os::ftruncate(fd.get(), 0));
- os::close(fd.get());
-
- // The recovery of the empty stream should not fail, but the stream state
- // should be empty.
- Future<OfferOperationStatusManagerState> state =
- statusUpdateManager->recover({operationUuid}, false);
-
- AWAIT_READY(state);
-
- EXPECT_EQ(0u, state->errors);
- EXPECT_TRUE(state->streams.contains(operationUuid));
- EXPECT_NONE(state->streams.at(operationUuid));
-}
-
-
-// This test verifies that the status update manager doesn't return a `Failure`
-// when trying to recover a stream from a parent directory that doesn't contain
-// the status updates file.
-//
-// This can happen when the checkpointing failed after creating the parent
-// directory, but before the file was opened. In this case `recover()` should
-// succeed, but return `None` as the operation's state.
-TEST_F(OfferOperationStatusUpdateManagerTest, RecoverEmptyDirectory)
-{
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
- EXPECT_CALL(statusUpdateProcessor, update(_))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
-
- const id::UUID operationUuid = id::UUID::random();
- const id::UUID statusUuid = id::UUID::random();
-
- OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
- statusUuid,
- operationUuid,
- OfferOperationState::OFFER_OPERATION_FINISHED);
-
- // Send a checkpointed offer operation status update.
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
-
- OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
-
- // Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
-
- resetStatusUpdateManager();
-
- // Advance the clock enough to trigger a retry if the update hasn't been
- // cleaned up.
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-
- // Leave the parent directory, but remove the status updates file, simulating
- // a failure in checkpointing, where the parent directory was created, but the
- // checkpointing process crashed before opening the status updates file.
- ASSERT_SOME(os::rm(getPath(operationUuid)));
-
- // The recovery of the empty stream should not fail, but the stream state
- // should be empty.
- Future<OfferOperationStatusManagerState> state =
- statusUpdateManager->recover({operationUuid}, false);
-
- AWAIT_READY(state);
-
- EXPECT_EQ(0u, state->errors);
- EXPECT_TRUE(state->streams.contains(operationUuid));
- EXPECT_NONE(state->streams.at(operationUuid));
-}
-
-
-// This test verifies that the status update manager is able to recover a
-// terminated stream, and that the stream's state reflects that it is
-// terminated.
-TEST_F(OfferOperationStatusUpdateManagerTest, RecoverTerminatedStream)
-{
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
- EXPECT_CALL(statusUpdateProcessor, update(_))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
-
- const id::UUID operationUuid = id::UUID::random();
- const id::UUID statusUuid = id::UUID::random();
-
- OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
- statusUuid,
- operationUuid,
- OfferOperationState::OFFER_OPERATION_FINISHED);
-
- // Send a checkpointed offer operation status update.
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
-
- OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
-
- // Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
-
- // Acknowledge the update, this is a terminal update, so `acknowledgement`
- // should return `false`.
- AWAIT_EXPECT_FALSE(
- statusUpdateManager->acknowledgement(operationUuid, statusUuid));
-
- resetStatusUpdateManager();
-
- // Recover the checkpointed stream.
- Future<OfferOperationStatusManagerState> state =
- statusUpdateManager->recover({operationUuid}, true);
-
- AWAIT_ASSERT_READY(state);
-
- EXPECT_EQ(0u, state->errors);
- EXPECT_TRUE(state->streams.contains(operationUuid));
- EXPECT_SOME(state->streams.at(operationUuid));
-
- // The stream should be terminated.
- EXPECT_TRUE(state->streams.at(operationUuid)->terminated);
-
- const Option<OfferOperationStatusUpdate> recoveredUpdate =
- state->streams.at(operationUuid)->updates.front();
-
- ASSERT_SOME(recoveredUpdate);
- EXPECT_EQ(statusUpdate, recoveredUpdate.get());
-
- // Advance the clock, the status update has been acknowledged, so it shouldn't
- // trigger a retry.
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-}
-
-
-// This test verifies that the status update manager silently ignores duplicate
-// updates.
-TEST_F(OfferOperationStatusUpdateManagerTest, IgnoreDuplicateUpdate)
-{
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
- EXPECT_CALL(statusUpdateProcessor, update(_))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
-
- const id::UUID operationUuid = id::UUID::random();
- const id::UUID statusUuid = id::UUID::random();
-
- OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
- statusUuid,
- operationUuid,
- OfferOperationState::OFFER_OPERATION_PENDING);
-
- // Send a checkpointed offer operation status update.
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
-
- OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
-
- // Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
-
- // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
- // should return `true`.
- AWAIT_EXPECT_TRUE(
- statusUpdateManager->acknowledgement(operationUuid, statusUuid));
-
- // Check that a duplicated update is ignored.
- AWAIT_EXPECT_READY(statusUpdateManager->update(statusUpdate, true));
-
- // Advance the clock enough to trigger a retry if the update hasn't been
- // acknowledged.
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-}
-
-
-// This test verifies that after recovery the status update manager still
-// silently ignores duplicate updates.
-TEST_F(OfferOperationStatusUpdateManagerTest, IgnoreDuplicateUpdateAfterRecover)
-{
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
- EXPECT_CALL(statusUpdateProcessor, update(_))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
-
- const id::UUID operationUuid = id::UUID::random();
- const id::UUID statusUuid = id::UUID::random();
-
- OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
- statusUuid,
- operationUuid,
- OfferOperationState::OFFER_OPERATION_PENDING);
-
- // Send a checkpointed offer operation status update.
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
-
- OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
-
- // Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
-
- // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
- // should return `true`.
- AWAIT_EXPECT_TRUE(
- statusUpdateManager->acknowledgement(operationUuid, statusUuid));
-
- resetStatusUpdateManager();
-
- // Recover the checkpointed stream.
- AWAIT_ASSERT_READY(statusUpdateManager->recover({operationUuid}, true));
-
- // Check that a duplicated update is ignored.
- AWAIT_EXPECT_READY(statusUpdateManager->update(statusUpdate, true));
-
- // Advance the clock enough to trigger a retry if the update hasn't been
- // acknowledged.
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-}
-
-
-// This test verifies that the status update manager rejects duplicated
-// acknowledgements.
-TEST_F(OfferOperationStatusUpdateManagerTest, RejectDuplicateAck)
-{
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
- EXPECT_CALL(statusUpdateProcessor, update(_))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
-
- const id::UUID operationUuid = id::UUID::random();
- const id::UUID statusUuid = id::UUID::random();
-
- OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
- statusUuid,
- operationUuid,
- OfferOperationState::OFFER_OPERATION_PENDING);
-
- // Send a checkpointed offer operation status update.
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
-
- OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
-
- // Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
-
- // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
- // should return `true`.
- AWAIT_EXPECT_TRUE(
- statusUpdateManager->acknowledgement(operationUuid, statusUuid));
-
- // Advance the clock enough to trigger a retry if the update hasn't been
- // ignored.
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-
- // Try to acknowledge the same update, the operation should fail.
- AWAIT_EXPECT_FAILED(
- statusUpdateManager->acknowledgement(operationUuid, statusUuid));
-}
-
-
-// This test verifies that after recovery the status update manager still
-// rejects duplicated acknowledgements.
-TEST_F(OfferOperationStatusUpdateManagerTest, RejectDuplicateAckAfterRecover)
-{
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
- EXPECT_CALL(statusUpdateProcessor, update(_))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
-
- const id::UUID operationUuid = id::UUID::random();
- const id::UUID statusUuid = id::UUID::random();
-
- OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
- statusUuid,
- operationUuid,
- OfferOperationState::OFFER_OPERATION_PENDING);
-
- // Send a checkpointed offer operation status update.
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
-
- OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
-
- // Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
-
- // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
- // should return `true`.
- AWAIT_EXPECT_TRUE(
- statusUpdateManager->acknowledgement(operationUuid, statusUuid));
-
- resetStatusUpdateManager();
-
- // Recover the checkpointed stream.
- AWAIT_ASSERT_READY(statusUpdateManager->recover({operationUuid}, true));
-
- // Try to acknowledge the same update, the operation should fail.
- AWAIT_EXPECT_FAILED(
- statusUpdateManager->acknowledgement(operationUuid, statusUuid));
-
- // Advance the clock enough to trigger a retry if the update hasn't been
- // ignored.
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-}
-
-
-// This test verifies that non-strict recovery of a status updates file
-// containing a corrupted record at the end succeeds.
-TEST_F(OfferOperationStatusUpdateManagerTest, NonStrictRecoveryCorruptedFile)
-{
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate1;
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate2;
- EXPECT_CALL(statusUpdateProcessor, update(_))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate2));
-
- const id::UUID operationUuid = id::UUID::random();
- const id::UUID statusUuid = id::UUID::random();
-
- OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
- statusUuid,
- operationUuid,
- OfferOperationState::OFFER_OPERATION_FINISHED);
-
- // Send a checkpointed offer operation status update.
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
-
- OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
-
- // Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
-
- resetStatusUpdateManager();
-
- // Advance the clock enough to trigger a retry if the update hasn't been
- // cleaned up.
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-
- // The file should contain only `OfferOperationStatusUpdateRecord` messages.
- // Write a `OfferOperationStatusUpdate` to the end of the file, so that the
- // recovery process encounters an error.
- Try<int_fd> fd = os::open(getPath(operationUuid), O_APPEND | O_RDWR);
- ASSERT_SOME(fd);
- ASSERT_SOME(::protobuf::write(fd.get(), statusUpdate));
- ASSERT_SOME(os::close(fd.get()));
-
- EXPECT_FALSE(forwardedStatusUpdate2.isReady());
-
- // The non-strict recovery of the corrupted stream should not fail, but the
- // state should reflect that the status update file contained a corrupted
- // record.
- Future<OfferOperationStatusManagerState> state =
- statusUpdateManager->recover({operationUuid}, false);
-
- AWAIT_READY(state);
-
- EXPECT_EQ(1u, state->errors);
- EXPECT_TRUE(state->streams.contains(operationUuid));
- EXPECT_SOME(state->streams.at(operationUuid));
-
- // Check that the status update could be recovered.
- const Option<OfferOperationStatusUpdate> recoveredUpdate =
- state->streams.at(operationUuid)->updates.front();
-
- ASSERT_SOME(recoveredUpdate);
- EXPECT_EQ(statusUpdate, recoveredUpdate.get());
-
- // Check that the status update is resent.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
-}
-
-
-// This test verifies that strict recovery of a status updates file containing
-// a corrupted record at the end fails.
-TEST_F(OfferOperationStatusUpdateManagerTest, StrictRecoveryCorruptedFile)
-{
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
- EXPECT_CALL(statusUpdateProcessor, update(_))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
-
- const id::UUID operationUuid = id::UUID::random();
- const id::UUID statusUuid = id::UUID::random();
-
- OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
- statusUuid,
- operationUuid,
- OfferOperationState::OFFER_OPERATION_FINISHED);
-
- // Send a checkpointed offer operation status update.
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
-
- OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
-
- // Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
-
- resetStatusUpdateManager();
-
- // Advance the clock enough to trigger a retry if the update hasn't been
- // cleaned up.
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-
- // The file should contain only `OfferOperationStatusUpdateRecord` messages.
- // Write a `OfferOperationStatusUpdate` to the end of the file, so that the
- // recovery process encounters an error.
- Try<int_fd> fd = os::open(getPath(operationUuid), O_APPEND | O_RDWR);
- ASSERT_SOME(fd);
-
- ASSERT_SOME(::protobuf::write(fd.get(), statusUpdate));
- ASSERT_SOME(os::close(fd.get()));
-
- // The strict recovery of the corrupted stream should fail.
- AWAIT_ASSERT_FAILED(statusUpdateManager->recover({operationUuid}, true));
-}
-
-
-// This test verifies that the status update manager correctly fills in the
-// latest status when (re)sending status updates.
-TEST_F(OfferOperationStatusUpdateManagerTest, UpdateLatestWhenResending)
-{
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate1;
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate2;
- Future<OfferOperationStatusUpdate> forwardedStatusUpdate3;
- EXPECT_CALL(statusUpdateProcessor, update(_))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate2))
- .WillOnce(FutureArg<0>(&forwardedStatusUpdate3));
-
- const id::UUID operationUuid = id::UUID::random();
-
- const id::UUID statusUuid1 = id::UUID::random();
- OfferOperationStatusUpdate statusUpdate1 = createOfferOperationStatusUpdate(
- statusUuid1, operationUuid, OfferOperationState::OFFER_OPERATION_PENDING);
-
- // Send a checkpointed offer operation status update.
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate1, true));
-
- // The status update manager should fill in the `latest_status` field with the
- // status update we just sent.
- OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate1);
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(
- statusUpdate1.status());
-
- // Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
-
- EXPECT_FALSE(forwardedStatusUpdate2.isReady());
-
- // Send another status update.
- const id::UUID statusUuid2 = id::UUID::random();
- OfferOperationStatusUpdate statusUpdate2 = createOfferOperationStatusUpdate(
- statusUuid2, operationUuid, OfferOperationState::OFFER_OPERATION_PENDING);
- AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate2, true));
-
- // Advance the clock to trigger a retry of the first update.
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-
- // Now that another status update was sent, the status update manager should
- // fill in the `latest_status` field with this new status update.
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(
- statusUpdate2.status());
-
- // Verify that the status update is forwarded again.
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
-
- EXPECT_FALSE(forwardedStatusUpdate3.isReady());
-
- // Acknowledge the first update, it is NOT a terminal update, so
- // `acknowledgement` should return `true`. The status update manager
- // should now send the second status update.
- AWAIT_EXPECT_TRUE(
- statusUpdateManager->acknowledgement(operationUuid, statusUuid1));
-
- // The status update manager should then forward the latest status update.
- expectedStatusUpdate = statusUpdate2;
- expectedStatusUpdate.mutable_latest_status()->CopyFrom(
- statusUpdate2.status());
-
- AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate3);
-}
-
-} // namespace tests {
-} // namespace internal {
-} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/760ab18c/src/tests/operation_status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/operation_status_update_manager_tests.cpp b/src/tests/operation_status_update_manager_tests.cpp
new file mode 100644
index 0000000..c4429f4
--- /dev/null
+++ b/src/tests/operation_status_update_manager_tests.cpp
@@ -0,0 +1,908 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include <mesos/v1/mesos.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/owned.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/uuid.hpp>
+
+#include <stout/os/ftruncate.hpp>
+
+#include "slave/constants.hpp"
+
+#include "tests/mesos.hpp"
+#include "tests/utils.hpp"
+
+#include "status_update_manager/operation.hpp"
+
+using lambda::function;
+
+using process::Clock;
+using process::Future;
+using process::Owned;
+using process::Promise;
+
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+// This class will be the target of the forward callbacks passed to the
+// operation status update managers in this test suite.
+//
+// It uses gmock, so that we can easily set expectations and check how
+// often/which status updates are forwarded.
+class MockUpdateOperationStatusMessageProcessor
+{
+public:
+ MOCK_METHOD1(update, void(const UpdateOperationStatusMessage&));
+};
+
+
+class OperationStatusUpdateManagerTest : public MesosTest
+{
+protected:
+ OperationStatusUpdateManagerTest()
+ : statusUpdateManager(new OperationStatusUpdateManager())
+ {
+ Clock::pause();
+
+ const function<void(const UpdateOperationStatusMessage&)> forward =
+ [&](const UpdateOperationStatusMessage& update) {
+ statusUpdateProcessor.update(update);
+ };
+
+ statusUpdateManager->initialize(
+ forward, OperationStatusUpdateManagerTest::getPath);
+ }
+
+ void TearDown() override
+ {
+ Clock::resume();
+ statusUpdateManager.reset();
+
+ MesosTest::TearDown();
+ }
+
+ UpdateOperationStatusMessage createUpdateOperationStatusMessage(
+ const id::UUID& uuid,
+ const id::UUID& operationUuid,
+ const OperationState& state,
+ const Option<FrameworkID>& frameworkId = None())
+ {
+ UpdateOperationStatusMessage statusUpdate;
+
+ statusUpdate.mutable_operation_uuid()->set_value(operationUuid.toBytes());
+
+ if (frameworkId.isSome()) {
+ statusUpdate.mutable_framework_id()->CopyFrom(frameworkId.get());
+ }
+
+ OperationStatus* status = statusUpdate.mutable_status();
+ status->set_state(state);
+ status->mutable_uuid()->set_value(uuid.toBytes());
+
+ return statusUpdate;
+ }
+
+ void resetStatusUpdateManager()
+ {
+ statusUpdateManager.reset(new OperationStatusUpdateManager());
+
+ const function<void(const UpdateOperationStatusMessage&)> forward =
+ [&](const UpdateOperationStatusMessage& update) {
+ statusUpdateProcessor.update(update);
+ };
+
+ statusUpdateManager->initialize(
+ forward, OperationStatusUpdateManagerTest::getPath);
+ }
+
+ static const string getPath(const id::UUID& operationUuid)
+ {
+ return path::join(os::getcwd(), "streams", operationUuid.toString());
+ }
+
+ Owned<OperationStatusUpdateManager> statusUpdateManager;
+ MockUpdateOperationStatusMessageProcessor statusUpdateProcessor;
+};
+
+
+// This test verifies that the status update manager will not retry a terminal
+// status update after it has been acknowledged.
+TEST_F(OperationStatusUpdateManagerTest, UpdateAndAck)
+{
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+ const id::UUID operationUuid = id::UUID::random();
+ const id::UUID statusUuid = id::UUID::random();
+
+ UpdateOperationStatusMessage statusUpdate =
+ createUpdateOperationStatusMessage(
+ statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
+
+ // Send a checkpointed operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+ UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
+
+ // Acknowledge the update, this is a terminal update, so `acknowledgement`
+ // should return `false`.
+ AWAIT_EXPECT_FALSE(
+ statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+ // Advance the clock, the status update has been acknowledged, so it shouldn't
+ // trigger a retry.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+}
+
+
+// This test verifies that the status update manager will not retry a
+// non-terminal status update after it has been acknowledged.
+TEST_F(OperationStatusUpdateManagerTest, UpdateAndAckNonTerminalUpdate)
+{
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+ const id::UUID operationUuid = id::UUID::random();
+ const id::UUID statusUuid = id::UUID::random();
+
+ UpdateOperationStatusMessage statusUpdate =
+ createUpdateOperationStatusMessage(
+ statusUuid, operationUuid, OperationState::OPERATION_PENDING);
+
+ // Send a checkpointed operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+ UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
+
+ // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
+ // should return `true`.
+ AWAIT_EXPECT_TRUE(
+ statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+ // Advance the clock, the status update has been acknowledged, so it shouldn't
+ // trigger a retry.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+}
+
+
+// This test verifies that the status update manager resends status updates
+// until they are acknowledged.
+TEST_F(OperationStatusUpdateManagerTest, ResendUnacknowledged)
+{
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate1;
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate2;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate2));
+
+ const id::UUID operationUuid = id::UUID::random();
+ const id::UUID statusUuid = id::UUID::random();
+
+ UpdateOperationStatusMessage statusUpdate =
+ createUpdateOperationStatusMessage(
+ statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
+
+ // Send a checkpointed operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+ UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
+
+ EXPECT_FALSE(forwardedStatusUpdate2.isReady());
+
+ // Advance the clock to trigger a retry.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+
+ // Verify that the status update is forwarded again.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
+
+ // Acknowledge the update, this is a terminal update, so `acknowledgement`
+ // should return `false`.
+ AWAIT_EXPECT_FALSE(
+ statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+ // Advance the clock, the status update has been acknowledged, so it shouldn't
+ // trigger a retry.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+}
+
+
+// This test verifies that after the updates belonging to a framework are
+// cleaned up from the status update manager, the status update manager stops
+// resending them.
+TEST_F(OperationStatusUpdateManagerTest, Cleanup)
+{
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+ const id::UUID operationUuid = id::UUID::random();
+ const id::UUID statusUuid = id::UUID::random();
+
+ FrameworkID frameworkId;
+ frameworkId.set_value("frameworkId");
+
+ UpdateOperationStatusMessage statusUpdate =
+ createUpdateOperationStatusMessage(
+ statusUuid,
+ operationUuid,
+ OperationState::OPERATION_FINISHED,
+ frameworkId);
+
+ // Send a checkpointed operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+ UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
+
+ // Cleanup the framework.
+ statusUpdateManager->cleanup(frameworkId);
+
+ // Advance the clock enough to trigger a retry if the update hasn't been
+ // cleaned up.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+}
+
+
+// This test verifies that the status update manager is able to recover
+// checkpointed status updates, and that it resends the recovered updates that
+// haven't been acknowledged.
+TEST_F(OperationStatusUpdateManagerTest, RecoverCheckpointedStream)
+{
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate1;
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate2;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate2));
+
+ const id::UUID operationUuid = id::UUID::random();
+ const id::UUID statusUuid = id::UUID::random();
+
+ UpdateOperationStatusMessage statusUpdate =
+ createUpdateOperationStatusMessage(
+ statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
+
+ // Send a checkpointed operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+ UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
+
+ resetStatusUpdateManager();
+
+ // Advance the clock enough to trigger a retry if the update hasn't been
+ // cleaned up.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+
+ EXPECT_FALSE(forwardedStatusUpdate2.isReady());
+
+ // Recover the checkpointed stream.
+ Future<OperationStatusUpdateManagerState> state =
+ statusUpdateManager->recover({operationUuid}, true);
+
+ AWAIT_READY(state);
+
+ EXPECT_EQ(0u, state->errors);
+ EXPECT_TRUE(state->streams.contains(operationUuid));
+ EXPECT_SOME(state->streams.at(operationUuid));
+
+ const Option<UpdateOperationStatusMessage> recoveredUpdate =
+ state->streams.at(operationUuid)->updates.front();
+
+ ASSERT_SOME(recoveredUpdate);
+ EXPECT_EQ(statusUpdate, recoveredUpdate.get());
+
+ // The stream should NOT be terminated.
+ EXPECT_FALSE(state->streams.at(operationUuid)->terminated);
+
+ // Check that the status update is resent.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
+}
+
+
+// This test verifies that the status update manager returns a `Failure` when
+// trying to recover a stream that isn't checkpointed.
+TEST_F(OperationStatusUpdateManagerTest, RecoverNotCheckpointedStream)
+{
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+ const id::UUID operationUuid = id::UUID::random();
+ const id::UUID statusUuid = id::UUID::random();
+
+ UpdateOperationStatusMessage statusUpdate =
+ createUpdateOperationStatusMessage(
+ statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
+
+ // Send a non-checkpointed operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, false));
+
+ UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
+
+ // Verify that the stream file is NOT created.
+ EXPECT_TRUE(!os::exists(getPath(operationUuid)));
+
+ resetStatusUpdateManager();
+
+ // Trying to recover the non-checkpointed stream should fail.
+ AWAIT_EXPECT_FAILED(statusUpdateManager->recover({operationUuid}, true));
+}
+
+
+// This test verifies that the status update manager doesn't return a
+// `Failure` when trying to recover a stream from an empty file.
+//
+// This can happen when the checkpointing failed between opening the file and
+// writing the first update. In this case `recover()` should succeed, but return
+// `None` as the operation's state.
+TEST_F(OperationStatusUpdateManagerTest, RecoverEmptyFile)
+{
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+ const id::UUID operationUuid = id::UUID::random();
+ const id::UUID statusUuid = id::UUID::random();
+
+ UpdateOperationStatusMessage statusUpdate =
+ createUpdateOperationStatusMessage(
+ statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
+
+ // Send a checkpointed operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+ UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
+
+ resetStatusUpdateManager();
+
+ // Advance the clock enough to trigger a retry if the update hasn't been
+ // cleaned up.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+
+ // Truncate the status updates to zero bytes, simulating a failure in
+ // checkpointing, where the file was created, but the status update manager
+ // crashed before any data was written to disk.
+ Try<int_fd> fd = os::open(getPath(operationUuid), O_RDWR);
+ ASSERT_SOME(os::ftruncate(fd.get(), 0));
+ os::close(fd.get());
+
+ // The recovery of the empty stream should not fail, but the stream state
+ // should be empty.
+ Future<OperationStatusUpdateManagerState> state =
+ statusUpdateManager->recover({operationUuid}, false);
+
+ AWAIT_READY(state);
+
+ EXPECT_EQ(0u, state->errors);
+ EXPECT_TRUE(state->streams.contains(operationUuid));
+ EXPECT_NONE(state->streams.at(operationUuid));
+}
+
+
+// This test verifies that the status update manager doesn't return a `Failure`
+// when trying to recover a stream from a parent directory that doesn't contain
+// the status updates file.
+//
+// This can happen when the checkpointing failed after creating the parent
+// directory, but before the file was opened. In this case `recover()` should
+// succeed, but return `None` as the operation's state.
+TEST_F(OperationStatusUpdateManagerTest, RecoverEmptyDirectory)
+{
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+ const id::UUID operationUuid = id::UUID::random();
+ const id::UUID statusUuid = id::UUID::random();
+
+ UpdateOperationStatusMessage statusUpdate =
+ createUpdateOperationStatusMessage(
+ statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
+
+ // Send a checkpointed operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+ UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
+
+ resetStatusUpdateManager();
+
+ // Advance the clock enough to trigger a retry if the update hasn't been
+ // cleaned up.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+
+ // Leave the parent directory, but remove the status updates file, simulating
+ // a failure in checkpointing, where the parent directory was created, but the
+ // checkpointing process crashed before opening the status updates file.
+ ASSERT_SOME(os::rm(getPath(operationUuid)));
+
+ // The recovery of the empty stream should not fail, but the stream state
+ // should be empty.
+ Future<OperationStatusUpdateManagerState> state =
+ statusUpdateManager->recover({operationUuid}, false);
+
+ AWAIT_READY(state);
+
+ EXPECT_EQ(0u, state->errors);
+ EXPECT_TRUE(state->streams.contains(operationUuid));
+ EXPECT_NONE(state->streams.at(operationUuid));
+}
+
+
+// This test verifies that the status update manager is able to recover a
+// terminated stream, and that the stream's state reflects that it is
+// terminated.
+TEST_F(OperationStatusUpdateManagerTest, RecoverTerminatedStream)
+{
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+ const id::UUID operationUuid = id::UUID::random();
+ const id::UUID statusUuid = id::UUID::random();
+
+ UpdateOperationStatusMessage statusUpdate =
+ createUpdateOperationStatusMessage(
+ statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
+
+ // Send a checkpointed operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+ UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
+
+ // Acknowledge the update, this is a terminal update, so `acknowledgement`
+ // should return `false`.
+ AWAIT_EXPECT_FALSE(
+ statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+ resetStatusUpdateManager();
+
+ // Recover the checkpointed stream.
+ Future<OperationStatusUpdateManagerState> state =
+ statusUpdateManager->recover({operationUuid}, true);
+
+ AWAIT_ASSERT_READY(state);
+
+ EXPECT_EQ(0u, state->errors);
+ EXPECT_TRUE(state->streams.contains(operationUuid));
+ EXPECT_SOME(state->streams.at(operationUuid));
+
+ // The stream should be terminated.
+ EXPECT_TRUE(state->streams.at(operationUuid)->terminated);
+
+ const Option<UpdateOperationStatusMessage> recoveredUpdate =
+ state->streams.at(operationUuid)->updates.front();
+
+ ASSERT_SOME(recoveredUpdate);
+ EXPECT_EQ(statusUpdate, recoveredUpdate.get());
+
+ // Advance the clock, the status update has been acknowledged, so it shouldn't
+ // trigger a retry.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+}
+
+
+// This test verifies that the status update manager silently ignores duplicate
+// updates.
+TEST_F(OperationStatusUpdateManagerTest, IgnoreDuplicateUpdate)
+{
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+ const id::UUID operationUuid = id::UUID::random();
+ const id::UUID statusUuid = id::UUID::random();
+
+ UpdateOperationStatusMessage statusUpdate =
+ createUpdateOperationStatusMessage(
+ statusUuid, operationUuid, OperationState::OPERATION_PENDING);
+
+ // Send a checkpointed operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+ UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
+
+ // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
+ // should return `true`.
+ AWAIT_EXPECT_TRUE(
+ statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+ // Check that a duplicated update is ignored.
+ AWAIT_EXPECT_READY(statusUpdateManager->update(statusUpdate, true));
+
+ // Advance the clock enough to trigger a retry if the update hasn't been
+ // acknowledged.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+}
+
+
+// This test verifies that after recovery the status update manager still
+// silently ignores duplicate updates.
+TEST_F(OperationStatusUpdateManagerTest, IgnoreDuplicateUpdateAfterRecover)
+{
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+ const id::UUID operationUuid = id::UUID::random();
+ const id::UUID statusUuid = id::UUID::random();
+
+ UpdateOperationStatusMessage statusUpdate =
+ createUpdateOperationStatusMessage(
+ statusUuid, operationUuid, OperationState::OPERATION_PENDING);
+
+ // Send a checkpointed operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+ UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
+
+ // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
+ // should return `true`.
+ AWAIT_EXPECT_TRUE(
+ statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+ resetStatusUpdateManager();
+
+ // Recover the checkpointed stream.
+ AWAIT_ASSERT_READY(statusUpdateManager->recover({operationUuid}, true));
+
+ // Check that a duplicated update is ignored.
+ AWAIT_EXPECT_READY(statusUpdateManager->update(statusUpdate, true));
+
+ // Advance the clock enough to trigger a retry if the update hasn't been
+ // acknowledged.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+}
+
+
+// This test verifies that the status update manager rejects duplicated
+// acknowledgements.
+TEST_F(OperationStatusUpdateManagerTest, RejectDuplicateAck)
+{
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+ const id::UUID operationUuid = id::UUID::random();
+ const id::UUID statusUuid = id::UUID::random();
+
+ UpdateOperationStatusMessage statusUpdate =
+ createUpdateOperationStatusMessage(
+ statusUuid, operationUuid, OperationState::OPERATION_PENDING);
+
+ // Send a checkpointed operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+ UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
+
+ // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
+ // should return `true`.
+ AWAIT_EXPECT_TRUE(
+ statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+ // Advance the clock enough to trigger a retry if the update hasn't been
+ // ignored.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+
+ // Try to acknowledge the same update, the operation should fail.
+ AWAIT_EXPECT_FAILED(
+ statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+}
+
+
+// This test verifies that after recovery the status update manager still
+// rejects duplicated acknowledgements.
+TEST_F(OperationStatusUpdateManagerTest, RejectDuplicateAckAfterRecover)
+{
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+ const id::UUID operationUuid = id::UUID::random();
+ const id::UUID statusUuid = id::UUID::random();
+
+ UpdateOperationStatusMessage statusUpdate =
+ createUpdateOperationStatusMessage(
+ statusUuid, operationUuid, OperationState::OPERATION_PENDING);
+
+ // Send a checkpointed operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+ UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
+
+ // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
+ // should return `true`.
+ AWAIT_EXPECT_TRUE(
+ statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+ resetStatusUpdateManager();
+
+ // Recover the checkpointed stream.
+ AWAIT_ASSERT_READY(statusUpdateManager->recover({operationUuid}, true));
+
+ // Try to acknowledge the same update, the operation should fail.
+ AWAIT_EXPECT_FAILED(
+ statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+ // Advance the clock enough to trigger a retry if the update hasn't been
+ // ignored.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+}
+
+
+// This test verifies that non-strict recovery of a status updates file
+// containing a corrupted record at the end succeeds.
+TEST_F(OperationStatusUpdateManagerTest, NonStrictRecoveryCorruptedFile)
+{
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate1;
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate2;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate2));
+
+ const id::UUID operationUuid = id::UUID::random();
+ const id::UUID statusUuid = id::UUID::random();
+
+ UpdateOperationStatusMessage statusUpdate =
+ createUpdateOperationStatusMessage(
+ statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
+
+ // Send a checkpointed operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+ UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
+
+ resetStatusUpdateManager();
+
+ // Advance the clock enough to trigger a retry if the update hasn't been
+ // cleaned up.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+
+ // The file should contain only `UpdateOperationStatusRecord` messages.
+ // Write a `UpdateOperationStatusMessage` to the end of the file, so that the
+ // recovery process encounters an error.
+ Try<int_fd> fd = os::open(getPath(operationUuid), O_APPEND | O_RDWR);
+ ASSERT_SOME(fd);
+ ASSERT_SOME(::protobuf::write(fd.get(), statusUpdate));
+ ASSERT_SOME(os::close(fd.get()));
+
+ EXPECT_FALSE(forwardedStatusUpdate2.isReady());
+
+ // The non-strict recovery of the corrupted stream should not fail, but the
+ // state should reflect that the status update file contained a corrupted
+ // record.
+ Future<OperationStatusUpdateManagerState> state =
+ statusUpdateManager->recover({operationUuid}, false);
+
+ AWAIT_READY(state);
+
+ EXPECT_EQ(1u, state->errors);
+ EXPECT_TRUE(state->streams.contains(operationUuid));
+ EXPECT_SOME(state->streams.at(operationUuid));
+
+ // Check that the status update could be recovered.
+ const Option<UpdateOperationStatusMessage> recoveredUpdate =
+ state->streams.at(operationUuid)->updates.front();
+
+ ASSERT_SOME(recoveredUpdate);
+ EXPECT_EQ(statusUpdate, recoveredUpdate.get());
+
+ // Check that the status update is resent.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
+}
+
+
+// This test verifies that strict recovery of a status updates file containing
+// a corrupted record at the end fails.
+TEST_F(OperationStatusUpdateManagerTest, StrictRecoveryCorruptedFile)
+{
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+ const id::UUID operationUuid = id::UUID::random();
+ const id::UUID statusUuid = id::UUID::random();
+
+ UpdateOperationStatusMessage statusUpdate =
+ createUpdateOperationStatusMessage(
+ statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
+
+ // Send a checkpointed operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+ UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
+
+ resetStatusUpdateManager();
+
+ // Advance the clock enough to trigger a retry if the update hasn't been
+ // cleaned up.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+
+ // The file should contain only `UpdateOperationStatusMessageRecord` messages.
+ // Write a `UpdateOperationStatusMessage` to the end of the file, so that the
+ // recovery process encounters an error.
+ Try<int_fd> fd = os::open(getPath(operationUuid), O_APPEND | O_RDWR);
+ ASSERT_SOME(fd);
+
+ ASSERT_SOME(::protobuf::write(fd.get(), statusUpdate));
+ ASSERT_SOME(os::close(fd.get()));
+
+ // The strict recovery of the corrupted stream should fail.
+ AWAIT_ASSERT_FAILED(statusUpdateManager->recover({operationUuid}, true));
+}
+
+
+// This test verifies that the status update manager correctly fills in the
+// latest status when (re)sending status updates.
+TEST_F(OperationStatusUpdateManagerTest, UpdateLatestWhenResending)
+{
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate1;
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate2;
+ Future<UpdateOperationStatusMessage> forwardedStatusUpdate3;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate2))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate3));
+
+ const id::UUID operationUuid = id::UUID::random();
+
+ const id::UUID statusUuid1 = id::UUID::random();
+ UpdateOperationStatusMessage statusUpdate1 =
+ createUpdateOperationStatusMessage(
+ statusUuid1, operationUuid, OperationState::OPERATION_PENDING);
+
+ // Send a checkpointed operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate1, true));
+
+ // The status update manager should fill in the `latest_status` field with the
+ // status update we just sent.
+ UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate1);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(
+ statusUpdate1.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
+
+ EXPECT_FALSE(forwardedStatusUpdate2.isReady());
+
+ // Send another status update.
+ const id::UUID statusUuid2 = id::UUID::random();
+ UpdateOperationStatusMessage statusUpdate2 =
+ createUpdateOperationStatusMessage(
+ statusUuid2, operationUuid, OperationState::OPERATION_PENDING);
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate2, true));
+
+ // Advance the clock to trigger a retry of the first update.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+
+ // Now that another status update was sent, the status update manager should
+ // fill in the `latest_status` field with this new status update.
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(
+ statusUpdate2.status());
+
+ // Verify that the status update is forwarded again.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
+
+ EXPECT_FALSE(forwardedStatusUpdate3.isReady());
+
+ // Acknowledge the first update, it is NOT a terminal update, so
+ // `acknowledgement` should return `true`. The status update manager
+ // should now send the second status update.
+ AWAIT_EXPECT_TRUE(
+ statusUpdateManager->acknowledgement(operationUuid, statusUuid1));
+
+ // The status update manager should then forward the latest status update.
+ expectedStatusUpdate = statusUpdate2;
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(
+ statusUpdate2.status());
+
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate3);
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/760ab18c/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 9fcc0a8..03b1cd8 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -2657,14 +2657,14 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, RegistryGcByCountManySlaves)
Future<bool> admitApply =
master.get()->registrar->unmocked_apply(
- Owned<master::Operation>(
+ Owned<master::RegistryOperation>(
new master::AdmitSlave(slaveInfo)));
AWAIT_EXPECT_TRUE(admitApply);
Future<bool> unreachableApply =
master.get()->registrar->unmocked_apply(
- Owned<master::Operation>(
+ Owned<master::RegistryOperation>(
new master::MarkSlaveUnreachable(slaveInfo, unreachableTime)));
AWAIT_EXPECT_TRUE(unreachableApply);
@@ -3186,7 +3186,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, RegistryGcRace)
// Cause `slave2` to reregister with the master. We expect the
// master to update the registry to mark the slave as reachable; we
// intercept the registry operation.
- Future<Owned<master::Operation>> markReachable;
+ Future<Owned<master::RegistryOperation>> markReachable;
Promise<bool> markReachableContinue;
EXPECT_CALL(*master.get()->registrar, apply(_))
.WillOnce(DoAll(FutureArg<0>(&markReachable),
@@ -3206,7 +3206,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, RegistryGcRace)
// this should result in attempting to prune `slave1` and `slave2`
// from the unreachable list. We intercept the registry operation to
// force the race condition with the reregistration of `slave2`.
- Future<Owned<master::Operation>> prune;
+ Future<Owned<master::RegistryOperation>> prune;
Promise<bool> pruneContinue;
EXPECT_CALL(*master.get()->registrar, apply(_))
.WillOnce(DoAll(FutureArg<0>(&prune),
@@ -3369,7 +3369,7 @@ TEST_F(PartitionTest, FailHealthChecksTwice)
Future<Nothing> unreachableDispatch1 =
FUTURE_DISPATCH(master.get()->pid, &Master::markUnreachable);
- Future<Owned<master::Operation>> markUnreachable;
+ Future<Owned<master::RegistryOperation>> markUnreachable;
Promise<bool> markUnreachableContinue;
EXPECT_CALL(*master.get()->registrar, apply(_))
.WillOnce(DoAll(FutureArg<0>(&markUnreachable),
http://git-wip-us.apache.org/repos/asf/mesos/blob/760ab18c/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index 8546769..5962dd1 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -195,8 +195,8 @@ protected:
Future<Resources> getOperationMessage(To to)
{
if (::testing::get<1>(GetParam()) == ENABLED) {
- return FUTURE_PROTOBUF(ApplyOfferOperationMessage(), _, to)
- .then([](const ApplyOfferOperationMessage& message) {
+ return FUTURE_PROTOBUF(ApplyOperationMessage(), _, to)
+ .then([](const ApplyOperationMessage& message) {
switch (message.operation_info().type()) {
case Offer::Operation::UNKNOWN:
case Offer::Operation::LAUNCH:
http://git-wip-us.apache.org/repos/asf/mesos/blob/760ab18c/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 9ee91c9..ec0f609 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -650,7 +650,7 @@ TEST_F(ReconciliationTest, RemovalInProgress)
// Intercept the next registrar operation; this should be the
// registry operation that unregisters the slave.
- Future<Owned<master::Operation>> unregister;
+ Future<Owned<master::RegistryOperation>> unregister;
Future<Nothing> unregisterStarted;
Promise<bool> promise; // Never satisfied.
EXPECT_CALL(*master.get()->registrar, apply(_))