You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2019/08/26 23:16:28 UTC
[kudu] branch master updated: [clock] introduce mini_chronyd
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new efda918 [clock] introduce mini_chronyd
efda918 is described below
commit efda91850e57a904f1b0e7858a3e2d8c4e2b7241
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Jul 19 22:55:56 2019 -0700
[clock] introduce mini_chronyd
This patch introduces a wrapper around chronyd, so it's possible to run
multiple instances of chronyd reference NTP servers as part of Kudu
mini-cluster, providing reference for NTP clients.
In addition, this patch contains tests to cover the newly introduced
functionality.
Change-Id: Id9d06d218828240f2a2980ef5ec30428f86277f7
Reviewed-on: http://gerrit.cloudera.org:8080/13916
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
build-support/dist_test.py | 4 +
src/kudu/clock/CMakeLists.txt | 24 +-
src/kudu/clock/test/mini_chronyd-test.cc | 161 +++++++++++
src/kudu/clock/test/mini_chronyd.cc | 385 +++++++++++++++++++++++++
src/kudu/clock/test/mini_chronyd.h | 173 +++++++++++
src/kudu/mini-cluster/CMakeLists.txt | 1 +
src/kudu/mini-cluster/external_mini_cluster.cc | 36 ++-
src/kudu/mini-cluster/external_mini_cluster.h | 20 ++
8 files changed, 802 insertions(+), 2 deletions(-)
diff --git a/build-support/dist_test.py b/build-support/dist_test.py
index 2cadad5..8636314 100755
--- a/build-support/dist_test.py
+++ b/build-support/dist_test.py
@@ -112,6 +112,10 @@ DEPS_FOR_ALL = \
# Add the Kudu HMS plugin.
"build/latest/bin/hms-plugin.jar",
+
+ # Add chrony binaries.
+ "build/latest/bin/chronyc",
+ "build/latest/bin/chronyd",
]
class StagingDir(object):
diff --git a/src/kudu/clock/CMakeLists.txt b/src/kudu/clock/CMakeLists.txt
index e95a16f..af56567 100644
--- a/src/kudu/clock/CMakeLists.txt
+++ b/src/kudu/clock/CMakeLists.txt
@@ -31,6 +31,28 @@ target_link_libraries(clock
kudu_common
kudu_util)
-SET_KUDU_TEST_LINK_LIBS(clock)
+##############################
+# mini_chronyd
+##############################
+
+# These are copied/installed instead of linking because:
+# * symlinks would not work with dist-test
+# * hardlinks would not work if the target directory is at different
+# filesystem than thirdparty
+file(COPY "${CMAKE_SOURCE_DIR}/thirdparty/installed/common/bin/chronyc"
+ DESTINATION "${EXECUTABLE_OUTPUT_PATH}")
+file(COPY "${CMAKE_SOURCE_DIR}/thirdparty/installed/common/sbin/chronyd"
+ DESTINATION "${EXECUTABLE_OUTPUT_PATH}")
+
+set(MINI_CHRONYD_SRCS test/mini_chronyd.cc)
+
+add_library(mini_chronyd ${MINI_CHRONYD_SRCS})
+target_link_libraries(mini_chronyd
+ gutil
+ kudu_test_util
+ kudu_util)
+
+SET_KUDU_TEST_LINK_LIBS(clock mini_chronyd)
ADD_KUDU_TEST(hybrid_clock-test PROCESSORS 3)
ADD_KUDU_TEST(logical_clock-test)
+ADD_KUDU_TEST(test/mini_chronyd-test)
diff --git a/src/kudu/clock/test/mini_chronyd-test.cc b/src/kudu/clock/test/mini_chronyd-test.cc
new file mode 100644
index 0000000..6cb01d2
--- /dev/null
+++ b/src/kudu/clock/test/mini_chronyd-test.cc
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/clock/test/mini_chronyd.h"
+
+#include <unistd.h>
+
+#include <cstdint>
+#include <ctime>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+namespace kudu {
+namespace clock {
+
+class MiniChronydTest: public KuduTest {
+};
+
+// This scenario verifies basic functionality of the mini-chronyd wrapper:
+// start, stop, manually setting the reference time, etc.
+TEST_F(MiniChronydTest, BasicSingleServerInstance) {
+ // Start chronyd at the specified port, making sure it's serving requests.
+ MiniChronydOptions options;
+ options.bindaddress = GetBindIpForDaemon(1, kDefaultBindMode);
+ options.port = 10123 + getpid() % 1000;
+ MiniChronyd chrony(options);
+ ASSERT_OK(chrony.Start());
+
+ // A chronyd that uses the system clock as a reference lock should present
+ // itself as reliable NTP server.
+ const HostPort ntp_endpoint(chrony.options().bindaddress,
+ chrony.options().port);
+ {
+ // Make sure the server opens ports to listen and serve requests
+ // from NTP clients.
+ auto s = MiniChronyd::CheckNtpSource({ ntp_endpoint });
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ }
+
+ // Set time manually using chronyc and verify that chronyd tracks the time as
+ // expected.
+ ASSERT_OK(chrony.SetTime(time(nullptr) - 60));
+
+ // Sanity check: make sure chronyd receives NTP packets which were sent
+ // by chronyc and the chronyd running in client-only mode.
+ MiniChronyd::ServerStats stats;
+ ASSERT_OK(chrony.GetServerStats(&stats));
+ ASSERT_LT(0, stats.ntp_packets_received);
+ ASSERT_LT(0, stats.cmd_packets_received);
+ const auto ntp_packets_received = stats.ntp_packets_received;
+
+ {
+ // After setting the clock manually to be simply offset from the reference,
+ // chronyd should continue providing a good NTP source for its clients.
+ auto s = MiniChronyd::CheckNtpSource({ ntp_endpoint });
+ ASSERT_TRUE(s.ok()) << s.ToString();
+
+ // The activity of checking for a reliable NTP source and fetching
+ // information on the system clock synchronisation status should generate
+ // additional NTP packets which should have been received by the NTP server.
+ MiniChronyd::ServerStats stats;
+ ASSERT_OK(chrony.GetServerStats(&stats));
+ ASSERT_GT(stats.ntp_packets_received, ntp_packets_received);
+ }
+}
+
+// This scenario runs multiple chronyd and verifies they can co-exist without
+// conflicts w.r.t. resources such as port numbers and paths to their
+// configuration files, command sockets, and other related files.
+TEST_F(MiniChronydTest, BasicMultipleServerInstances) {
+ vector<unique_ptr<MiniChronyd>> servers;
+ vector<HostPort> ntp_endpoints;
+ const uint16_t base_port = 10123 + getpid() % 1000;
+ for (int idx = 0; idx < 5; ++idx) {
+ MiniChronydOptions options;
+ options.index = idx;
+ options.bindaddress = GetBindIpForDaemon(idx + 1, kDefaultBindMode);
+ options.port = base_port + idx * 10;
+ unique_ptr<MiniChronyd> chrony(new MiniChronyd(options));
+ ASSERT_OK(chrony->Start());
+ ntp_endpoints.emplace_back(chrony->options().bindaddress,
+ chrony->options().port);
+ servers.emplace_back(std::move(chrony));
+ }
+
+ {
+ // All chronyd servers that use the system clock as a reference lock should
+ // present themselves as a set of NTP servers suitable for synchronisation.
+ auto s = MiniChronyd::CheckNtpSource(ntp_endpoints, 10);
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ }
+
+ // On macOS the same loopback IP address is used for all the NTP servers:
+ // they differ only by their port number. However, it seems chronyd
+ // doesn't differentiate between them by IP addr + port, only by IP addr.
+ // So, it means chronyd run as a client (chronyd -q/-Q) sees all those as
+ // the same NTP server, and talks only with the first one (as listed
+ // in the configuration file).
+
+ // Sanity check: make sure every server received packets from the client
+ // (see the note above regarding running chronyd at the same IP address
+ // but different NTP port number).
+ for (auto& server : servers) {
+ MiniChronyd::ServerStats stats;
+ ASSERT_OK(server->GetServerStats(&stats));
+#ifndef __APPLE__
+ ASSERT_LT(0, stats.ntp_packets_received);
+#endif
+ ASSERT_LT(0, stats.cmd_packets_received);
+ }
+
+ // Offset the reference time at the servers, so they would have their
+ // reference times far from each other. It doesn't matter from which point
+ // the servers are offset, but it's important that they are 'far enough'
+ // from each other. The idea is to make sure the client does _not_ see these
+ // servers as a reliable source for time synchronisation via NTP.
+ const auto ref_time = time(nullptr) - 50;
+ for (auto i = 0; i < servers.size(); ++i) {
+ ASSERT_OK(servers[i]->SetTime(ref_time + i * 10));
+ }
+
+#ifndef __APPLE__
+ {
+ // Now, with contradicting source NTP servers, it should be impossible
+ // to synchronize the time.
+ auto s = MiniChronyd::CheckNtpSource(ntp_endpoints, 10);
+ ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "No suitable source for synchronisation");
+ }
+#endif
+}
+
+} // namespace clock
+} // namespace kudu
diff --git a/src/kudu/clock/test/mini_chronyd.cc b/src/kudu/clock/test/mini_chronyd.cc
new file mode 100644
index 0000000..8f92c5f
--- /dev/null
+++ b/src/kudu/clock/test/mini_chronyd.cc
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/clock/test/mini_chronyd.h"
+
+#include <unistd.h>
+
+#include <algorithm>
+#include <cerrno>
+#include <csignal>
+#include <iterator>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/subprocess.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/user.h"
+
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::SkipEmpty;
+using strings::Split;
+using strings::Substitute;
+
+namespace kudu {
+namespace clock {
+
+string MiniChronydOptions::ToString() const {
+ return Substitute(
+ "{bindaddress: $0,"
+ " port: $1,"
+ " data_root: $2,"
+ " bindcmdaddress: $3}",
+ bindaddress, port, data_root, bindcmdaddress);
+}
+
+// Check that the specified servers are seen as good enough synchronisation
+// source by the reference NTP client (chronyd itself).
+Status MiniChronyd::CheckNtpSource(const vector<HostPort>& servers,
+ int timeout_sec) {
+ // The configuration template for chronyd to make it print the offset of the
+ // system clock from the NTP time of the specified servers. The NTP client
+ // has to latch on the specified NTP servers (i.e. deem them to be a reliable
+ // NTP clock source) before printing the offset. In case of successful
+ // latching, the offset is printed and chronyd exits with status 0. Otherwise,
+ // if the set of servers doesn't appear to be a reliable NTP clock source or
+ // other error happens, chronyd exits with non-zero status and prints warning
+ // "No suitable source for synchronisation".
+ //
+ // The shorter polling time intervals allows for faster querying of NTP
+ // servers, and using 'initial burst' mode allows for quicker exchange of NTP
+ // packets. Also, it's desirable to use the same NTP protocol version that is
+ // used by the Kudu built-in NTP client. Some more details:
+ // * 'minpoll' parameter is set to the smallest possible that is supported
+ // by chrony (-6, i.e. 1/64 second)
+ // * 'maxpoll' parameter is set to be at least 2 times longer interval
+ // than 'minpoll' with some margin to accumulate at least 4 samples
+ // with current setting of 'minpoll' interval
+ // * 'iburst' option allows to send first 4 NTP packets in burst.
+ // * 'version' is set to 3 to enforce using NTP v3 since the current
+ // implementation of the Kudu built-in NTP client uses NTPv3 (chrony
+ // supports NTP v4 and would use newer version of protocol otherwise)
+ static const string kConfigTemplate =
+ "server $0 port $1 maxpoll -1 minpoll -6 iburst version 3\n";
+
+ if (servers.empty()) {
+ return Status::InvalidArgument("empty set of NTP server endpoints");
+ }
+ string cfg;
+ for (const auto& hp : servers) {
+ cfg += Substitute(kConfigTemplate, hp.host(), hp.port());
+ }
+ string chronyd_bin;
+ RETURN_NOT_OK(MiniChronyd::GetChronydPath(&chronyd_bin));
+ const vector<string> cmd_and_args = {
+ chronyd_bin,
+ "-Q", // client-only mode without setting time
+ "-f", "/dev/stdin", // read config file from stdin
+ "-t", std::to_string(timeout_sec), // timeout for clock synchronisation
+ };
+ string out_stdout;
+ string out_stderr;
+ RETURN_NOT_OK_PREPEND(
+ Subprocess::Call(cmd_and_args, cfg, &out_stdout, &out_stderr),
+ Substitute("failed measure clock offset from reference NTP servers: "
+ "stdout{$0} stderr{$1}",
+ out_stdout, out_stderr));
+ return Status::OK();
+}
+
+MiniChronyd::MiniChronyd(MiniChronydOptions options)
+ : options_(std::move(options)) {
+ if (options_.data_root.empty()) {
+ options_.data_root = JoinPathSegments(
+ GetTestDataDirectory(), Substitute("chrony.$0", options_.index));
+ }
+ if (options_.pidfile.empty()) {
+ options_.pidfile = JoinPathSegments(options_.data_root, "chronyd.pid");
+ }
+}
+
+MiniChronyd::~MiniChronyd() {
+ if (process_) {
+ WARN_NOT_OK(Stop(), "unable to stop MiniChronyd");
+ }
+}
+
+const MiniChronydOptions& MiniChronyd::options() const {
+ CHECK(process_) << "must start the chronyd process first";
+ return options_;
+}
+
+pid_t MiniChronyd::pid() const {
+ CHECK(process_) << "must start the chronyd process first";
+ return process_->pid();
+}
+
+Status MiniChronyd::Start() {
+ SCOPED_LOG_SLOW_EXECUTION(WARNING, 100, "starting chronyd");
+ CHECK(!process_);
+ VLOG(1) << "starting chronyd: " << options_.ToString();
+
+ if (!Env::Default()->FileExists(options_.data_root)) {
+ VLOG(1) << "creating chronyd configuration file";
+ RETURN_NOT_OK(Env::Default()->CreateDir(options_.data_root));
+ // The chronyd's implementation puts strict requirements on the ownership
+ // of the directories where the runtime data is stored. In some environments
+ // (e.g., macOS), the group owner of the newly created directory might be
+ // different from the user account's GID.
+ RETURN_NOT_OK(CorrectOwnership(options_.data_root));
+ RETURN_NOT_OK(CreateConf());
+ }
+
+ // Start the chronyd in server-only mode, not detaching from terminal
+ // since the Subprocess needs to have the process running in foreground
+ // to be able to control it.
+ string server_bin;
+ RETURN_NOT_OK(GetChronydPath(&server_bin));
+ string username;
+ RETURN_NOT_OK(GetLoggedInUser(&username));
+
+ process_.reset(new Subprocess({
+ server_bin,
+ "-f", config_file_path(),
+ "-x", // do not drive the system clock (server-only mode)
+ "-d", // do not daemonize; print logs into standard out
+ }));
+ RETURN_NOT_OK(process_->Start());
+
+ static const auto kTimeout = MonoDelta::FromSeconds(1);
+ const auto deadline = MonoTime::Now() + kTimeout;
+ for (auto i = 0; ; ++i) {
+ auto s = GetServerStats(nullptr);
+ if (s.ok()) {
+ break;
+ }
+ if (deadline < MonoTime::Now()) {
+ return Status::TimedOut(Substitute("failed to contact chronyd in $0",
+ kTimeout.ToString()));
+ }
+ SleepFor(MonoDelta::FromMilliseconds(i * 2));
+ }
+ return Status::OK();
+}
+
+Status MiniChronyd::Stop() {
+ if (!process_) {
+ return Status::OK();
+ }
+ VLOG(1) << "stopping chronyd";
+ unique_ptr<Subprocess> proc = std::move(process_);
+ return proc->KillAndWait(SIGTERM);
+}
+
+Status MiniChronyd::GetServerStats(ServerStats* stats) const {
+ static const string kNtpPacketsKey = "NTP packets received";
+ static const string kCmdPacketsKey = "Command packets received";
+
+ string out;
+ RETURN_NOT_OK(RunChronyCmd({ "serverstats" }, &out));
+ if (stats) {
+ ServerStats result;
+ bool ntp_packets_key_found = false;
+ bool cmd_packets_key_found = false;
+ for (StringPiece sp : Split(out, "\n", SkipEmpty())) {
+ vector<string> kv = Split(sp, ":", SkipEmpty());
+ if (kv.size() != 2) {
+ return Status::Corruption(
+ Substitute("'$0': unexpected line in serverstats", sp));
+ }
+ for (auto& str : kv) {
+ StripWhiteSpace(&str);
+ }
+ if (kv[0] == kNtpPacketsKey) {
+ int64_t val;
+ if (!safe_strto64(kv[1], &val)) {
+ return Status::Corruption(
+ Substitute("$0: unexpected value for '$1' in serverstats",
+ kv[1], kNtpPacketsKey));
+ }
+ result.ntp_packets_received = val;
+ ntp_packets_key_found = true;
+ } else if (kv[0] == kCmdPacketsKey) {
+ int64_t val;
+ if (!safe_strto64(kv[1], &val)) {
+ return Status::Corruption(
+ Substitute("$0: unexpected value for '$1' in serverstats",
+ kv[1], kCmdPacketsKey));
+ }
+ result.cmd_packets_received = val;
+ cmd_packets_key_found = true;
+ }
+ }
+ if (!(ntp_packets_key_found && cmd_packets_key_found)) {
+ return Status::Corruption("'$0': unexpected serverstats output", out);
+ }
+ *stats = result;
+ }
+ return Status::OK();
+}
+
+Status MiniChronyd::SetTime(time_t time) {
+ char buf[kFastToBufferSize];
+ char* time_to_set = FastTimeToBuffer(time, buf);
+ return RunChronyCmd({ "settime", time_to_set });
+}
+
+// Find absolute path to chronyc (chrony's CLI tool),
+// storing the result path in 'path' output parameter.
+Status MiniChronyd::GetChronycPath(string* path) {
+ return GetPath("chronyc", path);
+}
+
+// Find absolute path to chronyd (chrony NTP implementation),
+// storing the result path in 'path' output parameter.
+Status MiniChronyd::GetChronydPath(string* path) {
+ return GetPath("chronyd", path);
+}
+
+Status MiniChronyd::GetPath(const string& path_suffix, string* abs_path) {
+ Env* env = Env::Default();
+ string exe;
+ RETURN_NOT_OK(env->GetExecutablePath(&exe));
+ auto path = Substitute("$0/$1", DirName(exe), path_suffix);
+ string result;
+ RETURN_NOT_OK(env->Canonicalize(path, &result));
+ if (env->FileExists(result)) {
+ *abs_path = std::move(result);
+ return Status::OK();
+ }
+ return Status::NotFound(Substitute("$0: no such file", result));
+}
+
+Status MiniChronyd::CorrectOwnership(const string& path) {
+ const uid_t uid = getuid();
+ const gid_t gid = getgid();
+ if (chown(path.c_str(), uid, gid) == -1) {
+ int err = errno;
+ return Status::IOError(Substitute("chown($0, $1, $2)", path, uid, gid),
+ ErrnoToString(err), err);
+ }
+ return Status::OK();
+}
+
+string MiniChronyd::config_file_path() const {
+ return JoinPathSegments(options_.data_root, "chrony.conf");
+}
+
+// Creates a chronyd.conf file according to the provided options. The multitude
+// of overriden parameters is because it's necessary to run multiple chronyd
+// instances on the same node, so all the 'defaults' should be customized
+// to avoid conflicts.
+Status MiniChronyd::CreateConf() {
+ static const string kFileTemplate = R"(
+# Override the default user chronyd NTP server starts because the compiled-in
+# default 'root' is not suitable when running chronyd in the context of the Kudu
+# testing framework. It's also be possible to override this parameter in the
+# chronyd's command line, but doing so in the configuration file is cleaner.
+user $0
+
+# The IP address to bind the NTP server socket to. By default, chronyd tries
+# to bind to all available IP addresses, which is not desirable in case of a
+# shared environment where Kudu tests are usually run.
+bindaddress $1
+
+# In this case, it's an absolute path to Unix domain socket file that is used
+# by the chronyc CLI tool to send commands to chronyd server.
+bindcmdaddress $2
+
+# Override the default NTP port 123 since it's necessary to (1) run multiple
+# chronyd servers at the same IP address and (2) specify non-privileged port,
+# so chronyd is able to bind to the port even if the chronyd process is not run
+# with super-user privileges.
+port $3
+
+# Absolute path where to store the PID of chronyd process once it's started.
+pidfile $4
+
+# The daemon is controlled only via Unix domain socket (see 'bindcmdaddress'),
+# no INET network control port is needed. The command control via INET addresses
+# is very limited: no need for that if the control via Unix domain socket
+# is already enabled.
+cmdport 0
+
+# Using the local clock as the clock source (usually it's a high precision
+# oscillator or a NTP server).
+local
+
+# NTP clients from all addresses are allowed to access the NTP server that is
+# serving requests as specified by the 'bindaddress' and 'port' directives.
+allow all
+
+# Allow setting the time manually using the cronyc CLI utility.
+manual
+)";
+ if (options_.bindcmdaddress.empty()) {
+ // The path to Unix domain socket file cannot be longer than ~100 bytes,
+ // so it's necessary to create a directory with shorter absolute path.
+ // TODO(aserbin): use some synthetic mount point instead?
+ string dir;
+ RETURN_NOT_OK(Env::Default()->GetTestDirectory(&dir));
+ dir += Substitute("/$0.$1", Env::Default()->NowMicros(), getpid());
+ const auto s = Env::Default()->CreateDir(dir);
+ if (!s.IsAlreadyPresent() && !s.ok()) {
+ return s;
+ }
+ RETURN_NOT_OK(CorrectOwnership(dir));
+ options_.bindcmdaddress = Substitute("$0/chronyd.$1.sock",
+ dir, options_.index);
+ }
+ string username;
+ RETURN_NOT_OK(GetLoggedInUser(&username));
+ auto contents = Substitute(kFileTemplate,
+ username,
+ options_.bindaddress,
+ options_.bindcmdaddress,
+ options_.port,
+ options_.pidfile);
+ return WriteStringToFile(Env::Default(), contents, config_file_path());
+}
+
+Status MiniChronyd::RunChronyCmd(const vector<string>& args,
+ string* out_stdout,
+ string* out_stderr) const {
+ string chronyc_bin;
+ RETURN_NOT_OK(GetChronycPath(&chronyc_bin));
+ vector<string> cmd_and_args = { chronyc_bin, "-h", cmdaddress(), };
+ std::copy(args.begin(), args.end(), std::back_inserter(cmd_and_args));
+ return Subprocess::Call(cmd_and_args, "", out_stdout, out_stderr);
+}
+
+} // namespace clock
+} // namespace kudu
diff --git a/src/kudu/clock/test/mini_chronyd.h b/src/kudu/clock/test/mini_chronyd.h
new file mode 100644
index 0000000..fa1862b
--- /dev/null
+++ b/src/kudu/clock/test/mini_chronyd.h
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <ctime>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/port.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class HostPort;
+class RWFile;
+class Subprocess;
+
+namespace clock {
+
+// Options to run MiniChronyd with.
+struct MiniChronydOptions {
+ // There might be multiple mini_chronyd run by the same test.
+ //
+ // Default: 0
+ size_t index = 0;
+
+ // Directory under which to store all chronyd-related data.
+ //
+ // Default: "", which auto-generates a unique path for this chronyd.
+ // The default may only be used from a gtest unit test.
+ std::string data_root;
+
+ // IP address or path Unix domain local socket file used to listen to command
+ // packets (issued by chronyc).
+ // This directly maps to chronyd's configuration property with the same name.
+ //
+ // Default: "", which auto-generates a unique path to Unix domain socket for
+ // this chronyd. The default may only be used from a gtest unit test.
+ std::string bindcmdaddress;
+
+ // IP address to bind the NTP server to listen and respond to client requests.
+ // This directly maps to chronyd's configuration property with the same name.
+ //
+ // Default: 127.0.0.1
+ std::string bindaddress = "127.0.0.1";
+
+ // Port of the NTP server to listen to and serve requests from clients.
+ // This directly maps to chronyd's configuration property with the same name.
+ //
+ // Default: 10123 (10000 + the default NTP port).
+ uint16_t port = 10123;
+
+ // File to store PID of the chronyd process.
+ // This directly maps to chronyd's configuration property with the same name.
+ //
+ // Default: "", which auto-generates a unique pid file path for this chronyd.
+ // The default may only be used from a gtest unit test.
+ std::string pidfile;
+
+ // Returns a string representation of the options suitable for debug printing.
+ std::string ToString() const;
+};
+
+// MiniChronyd is a wrapper around chronyd NTP daemon running in server-only
+// mode (i.e. it doesn't drive the system clock), allowing manual setting of the
+// reference true time. MiniChronyd is used in tests as a reference NTP servers
+// for the built-in NTP client.
+class MiniChronyd {
+ public:
+ // Structure to represent relevant information from output by
+ // 'chronyc serverstats'.
+ struct ServerStats {
+ int64_t cmd_packets_received;
+ int64_t ntp_packets_received;
+ };
+
+ // Check that NTP servers with the specified endpoints are seen as a good
+ // enough synchronisation source by the reference NTP client (chronyd itself).
+ // The client will wait for no more than the specified timeout in seconds
+ // for the set reference servers to become a good NTP source.
+ // This method returns Status::OK() if the servers look like a good source
+ // for clock synchronisation via NTP, even if the offset of the client's clock
+ // from the reference clock provided by NTP server(s) is huge.
+ static Status CheckNtpSource(const std::vector<HostPort>& servers,
+ int timeout_sec = 3)
+ WARN_UNUSED_RESULT;
+
+ // Create a new MiniChronyd with the provided options, or with defaults
+ // if the 'options' argument is omitted.
+ explicit MiniChronyd(MiniChronydOptions options = {});
+
+ ~MiniChronyd();
+
+ // Return the options the underlying chronyd has been started with.
+ const MiniChronydOptions& options() const;
+
+ // Get the PID of the chronyd process.
+ pid_t pid() const;
+
+ // Start the mini chronyd in server-only mode.
+ Status Start() WARN_UNUSED_RESULT;
+
+ // Stop the mini chronyd.
+ Status Stop() WARN_UNUSED_RESULT;
+
+ // Get NTP server statistics as output by 'chronyc serverstats'.
+ Status GetServerStats(ServerStats* stats) const;
+
+ // Manually set the reference time for the underlying chronyd
+ // with the precision of 1 second. The input is number of seconds
+ // from the beginning of the Epoch.
+ Status SetTime(time_t time) WARN_UNUSED_RESULT;
+
+ private:
+ friend class MiniChronydTest;
+
+ // Find absolute path to chronyc (chrony's CLI tool),
+ // storing the result path in 'path' output parameter.
+ static Status GetChronycPath(std::string* path);
+
+ // Find absolute path to chronyd (chrony NTP implementation),
+ // storing the result path in 'path' output parameter.
+ static Status GetChronydPath(std::string* path);
+
+ // Get absolute path to an executable from the chrony bundle.
+ static Status GetPath(const std::string& path_suffix, std::string* abs_path);
+
+ // Correct the ownership of the target path to be compliant with chrony's
+ // security constraints.
+ static Status CorrectOwnership(const std::string& path);
+
+ // A shortcut to options_.bindcmdaddress: returns the command address
+ // for the underlying chronyd, by default that's the absolute path
+ // to a Unix socket.
+ std::string cmdaddress() const { return options_.bindcmdaddress; }
+
+ // Return absolute path to chronyd's configuration file.
+ std::string config_file_path() const;
+
+ // Create a chrony.conf file with server-only mode settings and other options
+ // corresponding to MiniChronydOptions in the data root of Kudu mini cluster.
+ Status CreateConf() WARN_UNUSED_RESULT;
+
+ // Run chronyc command with arguments as specified by 'args', targeting this
+ // chronyd instance.
+ Status RunChronyCmd(const std::vector<std::string>& args,
+ std::string* out_stdout = nullptr,
+ std::string* out_stderr = nullptr) const WARN_UNUSED_RESULT;
+
+ MiniChronydOptions options_;
+ std::unique_ptr<RWFile> cmd_socket_;
+ std::unique_ptr<Subprocess> process_;
+};
+
+} // namespace clock
+} // namespace kudu
diff --git a/src/kudu/mini-cluster/CMakeLists.txt b/src/kudu/mini-cluster/CMakeLists.txt
index 17e37ae..c1fe5bd 100644
--- a/src/kudu/mini-cluster/CMakeLists.txt
+++ b/src/kudu/mini-cluster/CMakeLists.txt
@@ -34,6 +34,7 @@ target_link_libraries(mini_cluster
kudu_util
master
master_proto
+ mini_chronyd
mini_hms
mini_kdc
mini_sentry
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc
index bffb048..185f9d3 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -33,6 +33,7 @@
#include "kudu/client/client.h"
#include "kudu/client/master_rpc.h"
+#include "kudu/clock/test/mini_chronyd.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/gutil/basictypes.h"
@@ -70,6 +71,7 @@
#include "kudu/util/test_util.h"
using kudu::client::internal::ConnectToClusterRpc;
+using kudu::clock::MiniChronyd;
using kudu::master::ListTablesRequestPB;
using kudu::master::ListTablesResponsePB;
using kudu::master::MasterServiceProxy;
@@ -112,7 +114,8 @@ ExternalMiniClusterOptions::ExternalMiniClusterOptions()
enable_sentry(false),
logtostderr(true),
start_process_timeout(MonoDelta::FromSeconds(70)),
- rpc_negotiation_timeout(MonoDelta::FromSeconds(3)) {
+ rpc_negotiation_timeout(MonoDelta::FromSeconds(3)),
+ num_ntp_servers(0) {
}
ExternalMiniCluster::ExternalMiniCluster()
@@ -219,6 +222,14 @@ Status ExternalMiniCluster::Start() {
"could not set krb5 client env");
}
+ // Start NTP servers, if requested.
+ if (opts_.num_ntp_servers > 0) {
+ for (auto i = 0; i < opts_.num_ntp_servers; ++i) {
+ RETURN_NOT_OK_PREPEND(AddNtpServer(),
+ Substitute("failed to start NTP server $0", i));
+ }
+ }
+
// Start the Sentry service and the HMS in the following steps, in order
// to deal with the circular dependency in terms of configuring each
// with the other's IP/port.
@@ -578,6 +589,19 @@ Status ExternalMiniCluster::AddTabletServer() {
return Status::OK();
}
+Status ExternalMiniCluster::AddNtpServer() {
+ const auto idx = ntp_servers_.size();
+ string bind_host = GetBindIpForExternalServer(idx);
+
+ clock::MiniChronydOptions options;
+ options.port = 10123 + idx;
+ options.bindaddress = bind_host;
+ unique_ptr<MiniChronyd> chrony(new MiniChronyd(std::move(options)));
+ RETURN_NOT_OK(chrony->Start());
+ ntp_servers_.emplace_back(std::move(chrony));
+ return Status::OK();
+}
+
Status ExternalMiniCluster::WaitForTabletServerCount(int count, const MonoDelta& timeout) {
MonoTime deadline = MonoTime::Now() + timeout;
@@ -755,6 +779,16 @@ vector<ExternalDaemon*> ExternalMiniCluster::daemons() const {
return results;
}
+vector<MiniChronyd*> ExternalMiniCluster::ntp_servers() const {
+ vector<MiniChronyd*> servers;
+ servers.reserve(ntp_servers_.size());
+ for (const auto& server : ntp_servers_) {
+ DCHECK(server);
+ servers.emplace_back(server.get());
+ }
+ return servers;
+}
+
vector<HostPort> ExternalMiniCluster::master_rpc_addrs() const {
vector<HostPort> master_hostports;
for (const auto& master : masters_) {
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h
index f7f4caa..05e63c6 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -53,6 +53,10 @@ class KuduClient;
class KuduClientBuilder;
} // namespace client
+namespace clock {
+class MiniChronyd;
+} // namespace clock
+
namespace hms {
class MiniHms;
} // namespace hms
@@ -182,6 +186,13 @@ struct ExternalMiniClusterOptions {
//
// Default: empty
LocationInfo location_info;
+
+ // Number of NTP servers to start as part of the cluster. The servers
+ // are used as reference NTP servers for the built-in NTP client: it uses
+ // them to synchronize its internal clock.
+ //
+ // Default: 0
+ int num_ntp_servers;
};
// A mini-cluster made up of subprocesses running each of the daemons
@@ -211,6 +222,10 @@ class ExternalMiniCluster : public MiniCluster {
// Requires that the master is already running.
Status AddTabletServer();
+ // Add a new NTP server to the cluster. The new NTP server is started upon
+ // adding it.
+ Status AddNtpServer();
+
// Currently, this uses SIGKILL on each daemon for a non-graceful shutdown.
void ShutdownNodes(ClusterNodes nodes) override;
@@ -273,6 +288,10 @@ class ExternalMiniCluster : public MiniCluster {
// Return all tablet servers and masters.
std::vector<ExternalDaemon*> daemons() const;
+ // Return all configured NTP servers used for the synchronisation of the
+ // built-in NTP client.
+ std::vector<clock::MiniChronyd*> ntp_servers() const;
+
MiniKdc* kdc() const {
return kdc_.get();
}
@@ -408,6 +427,7 @@ class ExternalMiniCluster : public MiniCluster {
std::vector<scoped_refptr<ExternalMaster>> masters_;
std::vector<scoped_refptr<ExternalTabletServer>> tablet_servers_;
+ std::vector<std::unique_ptr<clock::MiniChronyd>> ntp_servers_;
std::unique_ptr<MiniKdc> kdc_;
std::unique_ptr<hms::MiniHms> hms_;
std::unique_ptr<sentry::MiniSentry> sentry_;