You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/08/23 01:17:05 UTC
[4/4] kudu git commit: util: remove old failure detector and
resettable heartbeater code
util: remove old failure detector and resettable heartbeater code
With the move to periodic timers, these classes are no longer needed.
Change-Id: I7a8f77a56ed66e06e182a70e18b2837a264a0a4e
Reviewed-on: http://gerrit.cloudera.org:8080/7736
Reviewed-by: Dan Burkert <da...@apache.org>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e8693965
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e8693965
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e8693965
Branch: refs/heads/master
Commit: e869396543c0d6dba984e0d515a726c3e29747f2
Parents: 21b0f3d
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri Aug 18 19:13:14 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Aug 23 01:15:20 2017 +0000
----------------------------------------------------------------------
build-support/iwyu/iwyu-filter.awk | 1 -
src/kudu/util/CMakeLists.txt | 4 -
src/kudu/util/failure_detector-test.cc | 118 ------------
src/kudu/util/failure_detector.cc | 218 ----------------------
src/kudu/util/failure_detector.h | 177 ------------------
src/kudu/util/resettable_heartbeater-test.cc | 108 -----------
src/kudu/util/resettable_heartbeater.cc | 185 ------------------
src/kudu/util/resettable_heartbeater.h | 80 --------
8 files changed, 891 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/e8693965/build-support/iwyu/iwyu-filter.awk
----------------------------------------------------------------------
diff --git a/build-support/iwyu/iwyu-filter.awk b/build-support/iwyu/iwyu-filter.awk
index b68dbb4..06977e3 100644
--- a/build-support/iwyu/iwyu-filter.awk
+++ b/build-support/iwyu/iwyu-filter.awk
@@ -134,7 +134,6 @@ BEGIN {
muted["kudu/util/bit-util-test.cc"]
muted["kudu/util/compression/compression-test.cc"]
muted["kudu/util/env_util-test.cc"]
- muted["kudu/util/failure_detector-test.cc"]
muted["kudu/util/file_cache-stress-test.cc"]
muted["kudu/util/group_varint-test.cc"]
muted["kudu/util/minidump.cc"]
http://git-wip-us.apache.org/repos/asf/kudu/blob/e8693965/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index 836facf..077f156 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -134,7 +134,6 @@ set(UTIL_SRCS
env.cc env_posix.cc env_util.cc
errno.cc
faststring.cc
- failure_detector.cc
fault_injection.cc
file_cache.cc
flags.cc
@@ -173,7 +172,6 @@ set(UTIL_SRCS
pb_util-internal.cc
process_memory.cc
random_util.cc
- resettable_heartbeater.cc
rolling_log.cc
rw_mutex.cc
rwc_lock.cc
@@ -340,7 +338,6 @@ ADD_KUDU_TEST(easy_json-test)
ADD_KUDU_TEST(env-test LABELS no_tsan)
ADD_KUDU_TEST(env_util-test)
ADD_KUDU_TEST(errno-test)
-ADD_KUDU_TEST(failure_detector-test)
ADD_KUDU_TEST(faststring-test)
ADD_KUDU_TEST(file_cache-test)
ADD_KUDU_TEST(file_cache-stress-test RUN_SERIAL true)
@@ -375,7 +372,6 @@ ADD_KUDU_TEST(path_util-test)
ADD_KUDU_TEST(process_memory-test RUN_SERIAL true)
ADD_KUDU_TEST(random-test)
ADD_KUDU_TEST(random_util-test)
-ADD_KUDU_TEST(resettable_heartbeater-test)
ADD_KUDU_TEST(rle-test)
ADD_KUDU_TEST(rolling_log-test)
ADD_KUDU_TEST(rw_mutex-test)
http://git-wip-us.apache.org/repos/asf/kudu/blob/e8693965/src/kudu/util/failure_detector-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/failure_detector-test.cc b/src/kudu/util/failure_detector-test.cc
deleted file mode 100644
index f8f4478..0000000
--- a/src/kudu/util/failure_detector-test.cc
+++ /dev/null
@@ -1,118 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <algorithm>
-#include <ostream>
-#include <string>
-
-#include <glog/logging.h>
-#include <gtest/gtest.h>
-
-#include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/util/countdown_latch.h"
-#include "kudu/util/failure_detector.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/status.h"
-#include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
-
-namespace kudu {
-
-// How often we expect a node to heartbeat to assert its "aliveness".
-static const int kExpectedHeartbeatPeriodMillis = 100;
-
-// Number of heartbeats after which the FD will consider the node dead.
-static const int kMaxMissedHeartbeats = 2;
-
-// Let's check for failures every 100ms on average +/- 10ms.
-static const int kFailureMonitorMeanMillis = 100;
-static const int kFailureMonitorStddevMillis = 10;
-
-static const char* kNodeName = "node-1";
-static const char* kTestTabletName = "test-tablet";
-
-class FailureDetectorTest : public KuduTest {
- public:
- FailureDetectorTest()
- : KuduTest(),
- latch_(1),
- monitor_(new RandomizedFailureMonitor(SeedRandom(),
- kFailureMonitorMeanMillis,
- kFailureMonitorStddevMillis)) {
- }
-
- void FailureFunction(const std::string& name, const Status& status) {
- LOG(INFO) << "Detected failure of " << name;
- latch_.CountDown();
- }
-
- protected:
- void WaitForFailure() {
- latch_.Wait();
- }
-
- CountDownLatch latch_;
- gscoped_ptr<RandomizedFailureMonitor> monitor_;
-};
-
-// Tests that we can track a node, that while we notify that we're received messages from
-// that node everything is ok and that once we stop doing so the failure detection function
-// gets called.
-TEST_F(FailureDetectorTest, TestDetectsFailure) {
- ASSERT_OK(monitor_->Start());
-
- scoped_refptr<FailureDetector> detector(new TimedFailureDetector(
- MonoDelta::FromMilliseconds(kExpectedHeartbeatPeriodMillis * kMaxMissedHeartbeats)));
-
- monitor_->MonitorFailureDetector(kTestTabletName, detector);
- ASSERT_FALSE(detector->IsTracking(kNodeName));
- ASSERT_OK(detector->Track(kNodeName,
- MonoTime::Now(),
- Bind(&FailureDetectorTest::FailureFunction, Unretained(this))));
- ASSERT_TRUE(detector->IsTracking(kNodeName));
-
- const int kNumPeriodsToWait = 4; // Num heartbeat periods to wait for a failure.
- const int kUpdatesPerPeriod = 10; // Num updates we give per period to minimize test flakiness.
-
- for (int i = 0; i < kNumPeriodsToWait * kUpdatesPerPeriod; i++) {
- // Report in (heartbeat) to the detector.
- ASSERT_OK(detector->MessageFrom(kNodeName, MonoTime::Now()));
-
- // We sleep for a fraction of heartbeat period, to minimize test flakiness.
- SleepFor(MonoDelta::FromMilliseconds(kExpectedHeartbeatPeriodMillis / kUpdatesPerPeriod));
-
- // The latch shouldn't have counted down, since the node's been reporting that
- // it's still alive.
- ASSERT_EQ(1, latch_.count());
- }
-
- // If we stop reporting he node is alive the failure callback is eventually
- // triggered and we exit.
- WaitForFailure();
-
- ASSERT_OK(detector->UnTrack(kNodeName));
- ASSERT_FALSE(detector->IsTracking(kNodeName));
-
- ASSERT_OK(monitor_->UnmonitorFailureDetector(kTestTabletName));
- monitor_->Shutdown();
-}
-
-} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/e8693965/src/kudu/util/failure_detector.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/failure_detector.cc b/src/kudu/util/failure_detector.cc
deleted file mode 100644
index 7adb403..0000000
--- a/src/kudu/util/failure_detector.cc
+++ /dev/null
@@ -1,218 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/util/failure_detector.h"
-
-#include <cstddef>
-#include <mutex>
-#include <ostream>
-#include <unordered_map>
-
-#include <glog/logging.h>
-
-#include "kudu/gutil/basictypes.h"
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/stl_util.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/status.h"
-#include "kudu/util/thread.h"
-
-using std::string;
-using std::unordered_map;
-using strings::Substitute;
-
-namespace kudu {
-
-const int64_t RandomizedFailureMonitor::kMinWakeUpTimeMillis = 10;
-
-TimedFailureDetector::TimedFailureDetector(MonoDelta failure_period)
- : failure_period_(failure_period) {}
-
-TimedFailureDetector::~TimedFailureDetector() {
- STLDeleteValues(&nodes_);
-}
-
-Status TimedFailureDetector::Track(const string& name,
- const MonoTime& now,
- const FailureDetectedCallback& callback) {
- std::lock_guard<simple_spinlock> lock(lock_);
- gscoped_ptr<Node> node(new Node);
- node->permanent_name = name;
- node->callback = callback;
- node->last_heard_of = now;
- node->status = ALIVE;
- if (!InsertIfNotPresent(&nodes_, name, node.get())) {
- return Status::AlreadyPresent(
- Substitute("Node with name '$0' is already being monitored", name));
- }
- ignore_result(node.release());
- return Status::OK();
-}
-
-Status TimedFailureDetector::UnTrack(const string& name) {
- std::lock_guard<simple_spinlock> lock(lock_);
- Node* node = EraseKeyReturnValuePtr(&nodes_, name);
- if (PREDICT_FALSE(node == NULL)) {
- return Status::NotFound(Substitute("Node with name '$0' not found", name));
- }
- delete node;
- return Status::OK();
-}
-
-bool TimedFailureDetector::IsTracking(const std::string& name) {
- std::lock_guard<simple_spinlock> lock(lock_);
- return ContainsKey(nodes_, name);
-}
-
-Status TimedFailureDetector::MessageFrom(const std::string& name, const MonoTime& now) {
- VLOG(3) << "Received message from " << name << " at " << now.ToString();
- std::lock_guard<simple_spinlock> lock(lock_);
- Node* node = FindPtrOrNull(nodes_, name);
- if (node == NULL) {
- VLOG(1) << "Not tracking node: " << name;
- return Status::NotFound(Substitute("Message from unknown node '$0'", name));
- }
- node->last_heard_of = now;
- node->status = ALIVE;
- return Status::OK();
-}
-
-FailureDetector::NodeStatus TimedFailureDetector::GetNodeStatusUnlocked(const std::string& name,
- const MonoTime& now) {
- Node* node = FindOrDie(nodes_, name);
- if ((now - node->last_heard_of) > failure_period_) {
- node->status = DEAD;
- }
- return node->status;
-}
-
-void TimedFailureDetector::CheckForFailures(const MonoTime& now) {
- typedef unordered_map<string, FailureDetectedCallback> CallbackMap;
- CallbackMap callbacks;
- {
- std::lock_guard<simple_spinlock> lock(lock_);
- for (const NodeMap::value_type& entry : nodes_) {
- if (GetNodeStatusUnlocked(entry.first, now) == DEAD) {
- InsertOrDie(&callbacks, entry.first, entry.second->callback);
- }
- }
- }
- // Invoke failure callbacks outside of lock.
- for (const CallbackMap::value_type& entry : callbacks) {
- const string& node_name = entry.first;
- const FailureDetectedCallback& callback = entry.second;
- callback.Run(node_name, Status::RemoteError(Substitute("Node '$0' failed", node_name)));
- }
-}
-
-RandomizedFailureMonitor::RandomizedFailureMonitor(uint32_t random_seed,
- int64_t period_mean_millis,
- int64_t period_stddev_millis)
- : period_mean_millis_(period_mean_millis),
- period_stddev_millis_(period_stddev_millis),
- random_(random_seed),
- run_latch_(0),
- shutdown_(false) {
-}
-
-RandomizedFailureMonitor::~RandomizedFailureMonitor() {
- Shutdown();
-}
-
-Status RandomizedFailureMonitor::Start() {
- CHECK(!thread_);
- run_latch_.Reset(1);
- return Thread::Create("failure-monitors", "failure-monitor",
- &RandomizedFailureMonitor::RunThread,
- this, &thread_);
-}
-
-void RandomizedFailureMonitor::Shutdown() {
- if (!thread_) {
- return;
- }
-
- {
- std::lock_guard<simple_spinlock> l(lock_);
- if (shutdown_) {
- return;
- }
- shutdown_ = true;
- }
-
- run_latch_.CountDown();
- CHECK_OK(ThreadJoiner(thread_.get()).Join());
- thread_.reset();
-}
-
-Status RandomizedFailureMonitor::MonitorFailureDetector(const string& name,
- const scoped_refptr<FailureDetector>& fd) {
- std::lock_guard<simple_spinlock> l(lock_);
- bool inserted = InsertIfNotPresent(&fds_, name, fd);
- if (PREDICT_FALSE(!inserted)) {
- return Status::AlreadyPresent(Substitute("Already monitoring failure detector '$0'", name));
- }
- return Status::OK();
-}
-
-Status RandomizedFailureMonitor::UnmonitorFailureDetector(const string& name) {
- std::lock_guard<simple_spinlock> l(lock_);
- int count = fds_.erase(name);
- if (PREDICT_FALSE(count == 0)) {
- return Status::NotFound(Substitute("Failure detector '$0' not found", name));
- }
- return Status::OK();
-}
-
-void RandomizedFailureMonitor::RunThread() {
- VLOG(1) << "Failure monitor thread starting";
-
- while (true) {
- int64_t wait_millis = random_.Normal(period_mean_millis_, period_stddev_millis_);
- if (wait_millis < kMinWakeUpTimeMillis) {
- wait_millis = kMinWakeUpTimeMillis;
- }
-
- MonoDelta wait_delta = MonoDelta::FromMilliseconds(wait_millis);
- VLOG(3) << "RandomizedFailureMonitor sleeping for: " << wait_delta.ToString();
- if (run_latch_.WaitFor(wait_delta)) {
- // CountDownLatch reached 0.
- std::lock_guard<simple_spinlock> lock(lock_);
- // Check if we were told to shutdown.
- if (shutdown_) {
- // Latch fired: exit loop.
- VLOG(1) << "RandomizedFailureMonitor thread shutting down";
- return;
- }
- }
-
- // Take a copy of the FD map under the lock.
- FDMap fds_copy;
- {
- std::lock_guard<simple_spinlock> l(lock_);
- fds_copy = fds_;
- }
-
- MonoTime now = MonoTime::Now();
- for (const FDMap::value_type& entry : fds_copy) {
- entry.second->CheckForFailures(now);
- }
- }
-}
-
-} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/e8693965/src/kudu/util/failure_detector.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/failure_detector.h b/src/kudu/util/failure_detector.h
deleted file mode 100644
index 1e986b4..0000000
--- a/src/kudu/util/failure_detector.h
+++ /dev/null
@@ -1,177 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef KUDU_UTIL_FAILURE_DETECTOR_H_
-#define KUDU_UTIL_FAILURE_DETECTOR_H_
-
-#include <cstdint>
-#include <string>
-#include <unordered_map>
-
-#include "kudu/gutil/callback.h"
-#include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/util/countdown_latch.h"
-#include "kudu/util/locks.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/random.h"
-
-namespace kudu {
-class Status;
-class Thread;
-
-// A generic interface for failure detector implementations.
-// A failure detector is responsible for deciding whether a certain server is dead or alive.
-class FailureDetector : public RefCountedThreadSafe<FailureDetector> {
- public:
- enum NodeStatus {
- DEAD,
- ALIVE
- };
- typedef std::unordered_map<std::string, NodeStatus> StatusMap;
-
- typedef Callback<void(const std::string& name,
- const Status& status)> FailureDetectedCallback;
-
- virtual ~FailureDetector() {}
-
- // Registers a node with 'name' in the failure detector.
- //
- // If it returns Status::OK() the failure detector will from now
- // expect messages from the machine with 'name' and will trigger
- // 'callback' if a failure is detected.
- //
- // Returns Status::AlreadyPresent() if a machine with 'name' is
- // already registered in this failure detector.
- virtual Status Track(const std::string& name,
- const MonoTime& now,
- const FailureDetectedCallback& callback) = 0;
-
- // Stops tracking node with 'name'.
- virtual Status UnTrack(const std::string& name) = 0;
-
- // Return true iff the named entity is currently being tracked.
- virtual bool IsTracking(const std::string& name) = 0;
-
- // Records that a message from machine with 'name' was received at 'now'.
- virtual Status MessageFrom(const std::string& name, const MonoTime& now) = 0;
-
- // Checks the failure status of each tracked node. If the failure criteria is
- // met, the failure callback is invoked.
- virtual void CheckForFailures(const MonoTime& now) = 0;
-};
-
-// A simple failure detector implementation that considers a node dead
-// when they have not reported by a certain time interval.
-class TimedFailureDetector : public FailureDetector {
- public:
- // Some monitorable entity.
- struct Node {
- std::string permanent_name;
- MonoTime last_heard_of;
- FailureDetectedCallback callback;
- NodeStatus status;
- };
-
- explicit TimedFailureDetector(MonoDelta failure_period);
- virtual ~TimedFailureDetector();
-
- virtual Status Track(const std::string& name,
- const MonoTime& now,
- const FailureDetectedCallback& callback) OVERRIDE;
-
- virtual Status UnTrack(const std::string& name) OVERRIDE;
-
- virtual bool IsTracking(const std::string& name) OVERRIDE;
-
- virtual Status MessageFrom(const std::string& name, const MonoTime& now) OVERRIDE;
-
- virtual void CheckForFailures(const MonoTime& now) OVERRIDE;
-
- private:
- typedef std::unordered_map<std::string, Node*> NodeMap;
-
- // Check if the named failure detector has failed.
- // Does not invoke the callback.
- FailureDetector::NodeStatus GetNodeStatusUnlocked(const std::string& name,
- const MonoTime& now);
-
- const MonoDelta failure_period_;
- mutable simple_spinlock lock_;
- NodeMap nodes_;
-
- DISALLOW_COPY_AND_ASSIGN(TimedFailureDetector);
-};
-
-// A randomized failure monitor that wakes up in normally-distributed intervals
-// and runs CheckForFailures() on each failure detector it monitors.
-//
-// The wake up interval is defined by a normal distribution with the specified
-// mean and standard deviation, in milliseconds, with minimum possible value
-// pinned at kMinWakeUpTimeMillis.
-//
-// We use a random wake up interval to avoid thundering herd / lockstep problems
-// when multiple nodes react to the failure of another node.
-class RandomizedFailureMonitor {
- public:
- // The minimum time the FailureMonitor will wait.
- static const int64_t kMinWakeUpTimeMillis;
-
- RandomizedFailureMonitor(uint32_t random_seed,
- int64_t period_mean_millis,
- int64_t period_std_dev_millis);
- ~RandomizedFailureMonitor();
-
- // Starts the failure monitor.
- Status Start();
-
- // Stops the failure monitor.
- void Shutdown();
-
- // Adds a failure detector to be monitored.
- Status MonitorFailureDetector(const std::string& name,
- const scoped_refptr<FailureDetector>& fd);
-
- // Unmonitors the failure detector with the specified name.
- Status UnmonitorFailureDetector(const std::string& name);
-
- private:
- typedef std::unordered_map<std::string, scoped_refptr<FailureDetector> > FDMap;
-
- // Runs the monitor thread.
- void RunThread();
-
- // Mean & std. deviation of random period to sleep for between checking the
- // failure detectors.
- const int64_t period_mean_millis_;
- const int64_t period_stddev_millis_;
- ThreadSafeRandom random_;
-
- scoped_refptr<Thread> thread_;
- CountDownLatch run_latch_;
-
- mutable simple_spinlock lock_;
- FDMap fds_;
- bool shutdown_; // Whether the failure monitor should shut down.
-
- DISALLOW_COPY_AND_ASSIGN(RandomizedFailureMonitor);
-};
-
-} // namespace kudu
-
-#endif /* KUDU_UTIL_FAILURE_DETECTOR_H_ */
http://git-wip-us.apache.org/repos/asf/kudu/blob/e8693965/src/kudu/util/resettable_heartbeater-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/resettable_heartbeater-test.cc b/src/kudu/util/resettable_heartbeater-test.cc
deleted file mode 100644
index e3989b7..0000000
--- a/src/kudu/util/resettable_heartbeater-test.cc
+++ /dev/null
@@ -1,108 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/util/resettable_heartbeater.h"
-
-#include <cstdint>
-#include <ostream>
-#include <string>
-
-#include <boost/bind.hpp> // IWYU pragma: keep
-#include <glog/logging.h>
-#include <gtest/gtest.h>
-
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/util/countdown_latch.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/status.h"
-#include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
-
-namespace kudu {
-
-// Number of heartbeats we want to observe before allowing the test to end.
-static const int kNumHeartbeats = 2;
-
-class ResettableHeartbeaterTest : public KuduTest {
- public:
- ResettableHeartbeaterTest()
- : KuduTest(),
- latch_(kNumHeartbeats) {
- }
-
- protected:
- void CreateHeartbeater(uint64_t period_ms, const std::string& name) {
- period_ms_ = period_ms;
- heartbeater_.reset(
- new ResettableHeartbeater(name,
- MonoDelta::FromMilliseconds(period_ms),
- boost::bind(&ResettableHeartbeaterTest::HeartbeatFunction,
- this)));
- }
-
- Status HeartbeatFunction() {
- latch_.CountDown();
- return Status::OK();
- }
-
- void WaitForCountDown() {
- // Wait a large multiple (in the worst case) of the required time before we
- // time out and fail the test. Large to avoid test flakiness.
- const uint64_t kMaxWaitMillis = period_ms_ * kNumHeartbeats * 20;
- CHECK(latch_.WaitFor(MonoDelta::FromMilliseconds(kMaxWaitMillis)))
- << "Failed to count down " << kNumHeartbeats << " times in " << kMaxWaitMillis
- << " ms: latch count == " << latch_.count();
- }
-
- CountDownLatch latch_;
- uint64_t period_ms_;
- gscoped_ptr<ResettableHeartbeater> heartbeater_;
-};
-
-// Tests that if Reset() is not called the heartbeat method is called
-// the expected number of times.
-TEST_F(ResettableHeartbeaterTest, TestRegularHeartbeats) {
- const int64_t kHeartbeatPeriodMillis = 100; // Heartbeat every 100ms.
- CreateHeartbeater(kHeartbeatPeriodMillis, CURRENT_TEST_NAME());
- ASSERT_OK(heartbeater_->Start());
- WaitForCountDown();
- ASSERT_OK(heartbeater_->Stop());
-}
-
-// Tests that if we Reset() the heartbeater in a period smaller than
-// the heartbeat period the heartbeat method never gets called.
-// After we stop resetting heartbeats should resume as normal
-TEST_F(ResettableHeartbeaterTest, TestResetHeartbeats) {
- const int64_t kHeartbeatPeriodMillis = 800; // Heartbeat every 800ms.
- const int64_t kNumResetSlicesPerPeriod = 40; // Reset 40 times per heartbeat period.
- // Reset once every 800ms / 40 = 20ms.
- const int64_t kResetPeriodMillis = kHeartbeatPeriodMillis / kNumResetSlicesPerPeriod;
-
- CreateHeartbeater(kHeartbeatPeriodMillis, CURRENT_TEST_NAME());
- ASSERT_OK(heartbeater_->Start());
- // Call Reset() in a loop for 2 heartbeat periods' worth of time, with sleeps
- // in-between as defined above.
- for (int i = 0; i < kNumResetSlicesPerPeriod * 2; i++) {
- heartbeater_->Reset();
- ASSERT_EQ(kNumHeartbeats, latch_.count()); // Ensure we haven't counted down, yet.
- SleepFor(MonoDelta::FromMilliseconds(kResetPeriodMillis));
- }
- WaitForCountDown();
- ASSERT_OK(heartbeater_->Stop());
-}
-
-} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/e8693965/src/kudu/util/resettable_heartbeater.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/resettable_heartbeater.cc b/src/kudu/util/resettable_heartbeater.cc
deleted file mode 100644
index 5ec87fc..0000000
--- a/src/kudu/util/resettable_heartbeater.cc
+++ /dev/null
@@ -1,185 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/util/resettable_heartbeater.h"
-
-#include <algorithm>
-#include <cstdint>
-#include <cstdlib>
-#include <mutex>
-#include <ostream>
-
-#include <glog/logging.h>
-
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/countdown_latch.h"
-#include "kudu/util/locks.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/random.h"
-#include "kudu/util/status.h"
-#include "kudu/util/thread.h"
-
-namespace kudu {
-using std::string;
-
-class ResettableHeartbeaterThread {
- public:
- ResettableHeartbeaterThread(std::string name, MonoDelta period,
- HeartbeatFunction function);
-
- Status Start();
- Status Stop();
- void Reset();
-
- private:
- void RunThread();
- bool IsCurrentThread() const;
-
- const string name_;
-
- // The heartbeat period.
- const MonoDelta period_;
-
- // The function to call to perform the heartbeat
- const HeartbeatFunction function_;
-
- // The actual running thread (NULL before it is started)
- scoped_refptr<kudu::Thread> thread_;
-
- CountDownLatch run_latch_;
-
- // Whether the heartbeater should shutdown.
- bool shutdown_;
-
- // lock that protects access to 'shutdown_' and to 'run_latch_'
- // Reset() method.
- mutable simple_spinlock lock_;
- DISALLOW_COPY_AND_ASSIGN(ResettableHeartbeaterThread);
-};
-
-ResettableHeartbeater::ResettableHeartbeater(const std::string& name,
- MonoDelta period,
- HeartbeatFunction function)
- : thread_(new ResettableHeartbeaterThread(name, period, function)) {
-}
-
-Status ResettableHeartbeater::Start() {
- return thread_->Start();
-}
-
-Status ResettableHeartbeater::Stop() {
- return thread_->Stop();
-}
-void ResettableHeartbeater::Reset() {
- thread_->Reset();
-}
-
-ResettableHeartbeater::~ResettableHeartbeater() {
- WARN_NOT_OK(Stop(), "Unable to stop heartbeater thread");
-}
-
-ResettableHeartbeaterThread::ResettableHeartbeaterThread(
- std::string name, MonoDelta period, HeartbeatFunction function)
- : name_(std::move(name)),
- period_(period),
- function_(std::move(function)),
- run_latch_(0),
- shutdown_(false) {}
-
-void ResettableHeartbeaterThread::RunThread() {
- CHECK(IsCurrentThread());
- VLOG(1) << "Heartbeater: " << name_ << " thread starting";
-
- bool prev_reset_was_manual = false;
- Random rng(random());
- while (true) {
- MonoDelta wait_period = period_;
- if (prev_reset_was_manual) {
- // When the caller does a manual reset, we randomize the subsequent wait
- // timeout between period_/2 and period_. This builds in some jitter so
- // multiple tablets on the same TS don't end up heartbeating in lockstep.
- int64_t half_period_ms = period_.ToMilliseconds() / 2;
- wait_period = MonoDelta::FromMilliseconds(
- half_period_ms +
- rng.NextDoubleFraction() * half_period_ms);
- prev_reset_was_manual = false;
- }
- if (run_latch_.WaitFor(wait_period)) {
- // CountDownLatch reached 0 -- this means there was a manual reset.
- prev_reset_was_manual = true;
- std::lock_guard<simple_spinlock> lock(lock_);
- // check if we were told to shutdown
- if (shutdown_) {
- // Latch fired -- exit loop
- VLOG(1) << "Heartbeater: " << name_ << " thread finished";
- return;
- } else {
- // otherwise it's just a reset, reset the latch
- // and continue;
- run_latch_.Reset(1);
- continue;
- }
- }
-
- Status s = function_();
- if (!s.ok()) {
- LOG(WARNING)<< "Failed to heartbeat in heartbeater: " << name_
- << " Status: " << s.ToString();
- continue;
- }
- }
-}
-
-bool ResettableHeartbeaterThread::IsCurrentThread() const {
- return thread_.get() == kudu::Thread::current_thread();
-}
-
-Status ResettableHeartbeaterThread::Start() {
- CHECK(thread_ == nullptr);
- run_latch_.Reset(1);
- return kudu::Thread::Create("heartbeater", strings::Substitute("$0-heartbeat", name_),
- &ResettableHeartbeaterThread::RunThread,
- this, &thread_);
-}
-
-void ResettableHeartbeaterThread::Reset() {
- if (!thread_) {
- return;
- }
- run_latch_.CountDown();
-}
-
-Status ResettableHeartbeaterThread::Stop() {
- if (!thread_) {
- return Status::OK();
- }
-
- {
- std::lock_guard<simple_spinlock> l(lock_);
- if (shutdown_) {
- return Status::OK();
- }
- shutdown_ = true;
- }
-
- run_latch_.CountDown();
- RETURN_NOT_OK(ThreadJoiner(thread_.get()).Join());
- return Status::OK();
-}
-
-} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/e8693965/src/kudu/util/resettable_heartbeater.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/resettable_heartbeater.h b/src/kudu/util/resettable_heartbeater.h
deleted file mode 100644
index 5699f91..0000000
--- a/src/kudu/util/resettable_heartbeater.h
+++ /dev/null
@@ -1,80 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef KUDU_UTIL_RESETTABLE_HEARTBEATER_H_
-#define KUDU_UTIL_RESETTABLE_HEARTBEATER_H_
-
-#include <string>
-
-#include <boost/function.hpp>
-
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/macros.h"
-
-namespace kudu {
-class MonoDelta;
-class Status;
-class ResettableHeartbeaterThread;
-
-typedef boost::function<Status()> HeartbeatFunction;
-
-// A resettable hearbeater that takes a function and calls
-// it to perform a regular heartbeat, unless Reset() is called
-// in which case the heartbeater resets the heartbeat period.
-// The point is to send "I'm Alive" heartbeats only if no regular
-// messages are sent in the same period.
-//
-// TODO Eventually this should be used instead of the master heartbeater
-// as it shares a lot of logic with the exception of the specific master
-// stuff (and the fact that it is resettable).
-//
-// TODO We'll have a lot of these per server, so eventually we need
-// to refactor this so that multiple heartbeaters share something like
-// java's ScheduledExecutor.
-//
-// TODO Do something about failed hearbeats, right now this is just
-// logging. Probably could take more arguments and do more of an
-// exponential backoff.
-//
-// This class is thread safe.
-class ResettableHeartbeater {
- public:
- ResettableHeartbeater(const std::string& name,
- MonoDelta period,
- HeartbeatFunction function);
-
- // Starts the heartbeater
- Status Start();
-
- // Stops the hearbeater
- Status Stop();
-
- // Resets the heartbeat period.
- // When this is called, the subsequent heartbeat has some built-in jitter and
- // may trigger before a full period (as specified to the constructor).
- void Reset();
-
- ~ResettableHeartbeater();
- private:
- gscoped_ptr<ResettableHeartbeaterThread> thread_;
-
- DISALLOW_COPY_AND_ASSIGN(ResettableHeartbeater);
-};
-
-} // namespace kudu
-
-#endif /* KUDU_UTIL_RESETTABLE_HEARTBEATER_H_ */