You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:23 UTC
[11/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/net/sockaddr.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/sockaddr.cc b/be/src/kudu/util/net/sockaddr.cc
new file mode 100644
index 0000000..60905b2
--- /dev/null
+++ b/be/src/kudu/util/net/sockaddr.cc
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/net/sockaddr.h"
+
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <sys/socket.h>
+
+#include <cerrno>
+#include <cstring>
+#include <string>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/hash/builtin_type_hash.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/stopwatch.h"
+
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+
+///
+/// Sockaddr
+///
+Sockaddr::Sockaddr() {
+ memset(&addr_, 0, sizeof(addr_));
+ addr_.sin_family = AF_INET;
+ addr_.sin_addr.s_addr = INADDR_ANY;
+}
+
+Sockaddr::Sockaddr(const struct sockaddr_in& addr) {
+ memcpy(&addr_, &addr, sizeof(struct sockaddr_in));
+}
+
+Status Sockaddr::ParseString(const std::string& s, uint16_t default_port) {
+ HostPort hp;
+ RETURN_NOT_OK(hp.ParseString(s, default_port));
+
+ if (inet_pton(AF_INET, hp.host().c_str(), &addr_.sin_addr) != 1) {
+ return Status::InvalidArgument("Invalid IP address", hp.host());
+ }
+ set_port(hp.port());
+ return Status::OK();
+}
+
+Sockaddr& Sockaddr::operator=(const struct sockaddr_in &addr) {
+ memcpy(&addr_, &addr, sizeof(struct sockaddr_in));
+ return *this;
+}
+
+bool Sockaddr::operator==(const Sockaddr& other) const {
+ return memcmp(&other.addr_, &addr_, sizeof(addr_)) == 0;
+}
+
+bool Sockaddr::operator<(const Sockaddr &rhs) const {
+ return addr_.sin_addr.s_addr < rhs.addr_.sin_addr.s_addr;
+}
+
+uint32_t Sockaddr::HashCode() const {
+ uint32_t hash = Hash32NumWithSeed(addr_.sin_addr.s_addr, 0);
+ hash = Hash32NumWithSeed(addr_.sin_port, hash);
+ return hash;
+}
+
+void Sockaddr::set_port(int port) {
+ addr_.sin_port = htons(port);
+}
+
+int Sockaddr::port() const {
+ return ntohs(addr_.sin_port);
+}
+
+std::string Sockaddr::host() const {
+ char str[INET_ADDRSTRLEN];
+ ::inet_ntop(AF_INET, &addr_.sin_addr, str, INET_ADDRSTRLEN);
+ return str;
+}
+
+const struct sockaddr_in& Sockaddr::addr() const {
+ return addr_;
+}
+
+std::string Sockaddr::ToString() const {
+ return Substitute("$0:$1", host(), port());
+}
+
+bool Sockaddr::IsWildcard() const {
+ return addr_.sin_addr.s_addr == 0;
+}
+
+bool Sockaddr::IsAnyLocalAddress() const {
+ return (NetworkByteOrder::FromHost32(addr_.sin_addr.s_addr) >> 24) == 127;
+}
+
+Status Sockaddr::LookupHostname(string* hostname) const {
+ char host[NI_MAXHOST];
+ int flags = 0;
+
+ int rc;
+ LOG_SLOW_EXECUTION(WARNING, 200,
+ Substitute("DNS reverse-lookup for $0", ToString())) {
+ rc = getnameinfo((struct sockaddr *) &addr_, sizeof(sockaddr_in),
+ host, NI_MAXHOST,
+ nullptr, 0, flags);
+ }
+ if (PREDICT_FALSE(rc != 0)) {
+ if (rc == EAI_SYSTEM) {
+ int errno_saved = errno;
+ return Status::NetworkError(Substitute("getnameinfo: $0", gai_strerror(rc)),
+ strerror(errno_saved), errno_saved);
+ }
+ return Status::NetworkError("getnameinfo", gai_strerror(rc), rc);
+ }
+ *hostname = host;
+ return Status::OK();
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/net/sockaddr.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/sockaddr.h b/be/src/kudu/util/net/sockaddr.h
new file mode 100644
index 0000000..dffd151
--- /dev/null
+++ b/be/src/kudu/util/net/sockaddr.h
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_NET_SOCKADDR_H
+#define KUDU_UTIL_NET_SOCKADDR_H
+
+#include <netinet/in.h>
+
+#include <cstdint>
+#include <functional>
+#include <string>
+
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+///
+/// Represents a sockaddr.
+///
+/// Currently only IPv4 is implemented. When IPv6 and UNIX domain are
+/// implemented, this should become an abstract base class and those should be
+/// multiple implementations.
+///
+class Sockaddr {
+ public:
+ Sockaddr();
+ explicit Sockaddr(const struct sockaddr_in &addr);
+
+ // Parse a string IP address of the form "A.B.C.D:port", storing the result
+ // in this Sockaddr object. If no ':port' is specified, uses 'default_port'.
+ // Note that this function will not handle resolving hostnames.
+ //
+ // Returns a bad Status if the input is malformed.
+ Status ParseString(const std::string& s, uint16_t default_port);
+
+ Sockaddr& operator=(const struct sockaddr_in &addr);
+
+ bool operator==(const Sockaddr& other) const;
+
+ // Compare the endpoints of two sockaddrs.
+ // The port number is ignored in this comparison.
+ bool operator<(const Sockaddr &rhs) const;
+
+ uint32_t HashCode() const;
+
+ // Returns the dotted-decimal string '1.2.3.4' of the host component of this address.
+ std::string host() const;
+
+ void set_port(int port);
+ int port() const;
+ const struct sockaddr_in& addr() const;
+
+ // Returns the stringified address in '1.2.3.4:<port>' format.
+ std::string ToString() const;
+
+ // Returns true if the address is 0.0.0.0
+ bool IsWildcard() const;
+
+ // Returns true if the address is 127.*.*.*
+ bool IsAnyLocalAddress() const;
+
+ // Does reverse DNS lookup of the address and stores it in hostname.
+ Status LookupHostname(std::string* hostname) const;
+
+ // the default auto-generated copy constructor is fine here
+ private:
+ struct sockaddr_in addr_;
+};
+
+} // namespace kudu
+
+// Specialize std::hash for Sockaddr
+namespace std {
+template<>
+struct hash<kudu::Sockaddr> {
+ int operator()(const kudu::Sockaddr& addr) const {
+ return addr.HashCode();
+ }
+};
+} // namespace std
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/net/socket-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/socket-test.cc b/be/src/kudu/util/net/socket-test.cc
new file mode 100644
index 0000000..8ecea4e
--- /dev/null
+++ b/be/src/kudu/util/net/socket-test.cc
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/net/socket.h"
+
+#include <thread>
+
+#include <cstdint>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <memory>
+#include <stddef.h>
+#include <string>
+
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+
+namespace kudu {
+
+constexpr size_t kEchoChunkSize = 32 * 1024 * 1024;
+
+class SocketTest : public KuduTest {
+ protected:
+ void DoTest(bool accept, const std::string &message) {
+ Sockaddr address;
+ address.ParseString("0.0.0.0", 0);
+ Socket listener_;
+
+ CHECK_OK(listener_.Init(0));
+ CHECK_OK(listener_.BindAndListen(address, 0));
+ Sockaddr listen_address;
+ CHECK_OK(listener_.GetSocketAddress(&listen_address));
+
+ std::thread t([&]{
+ if (accept) {
+ Sockaddr new_addr;
+ Socket sock;
+ CHECK_OK(listener_.Accept(&sock, &new_addr, 0));
+ CHECK_OK(sock.Close());
+ } else {
+ SleepFor(MonoDelta::FromMilliseconds(200));
+ CHECK_OK(listener_.Close());
+ }
+ });
+
+ Socket client;
+ ASSERT_OK(client.Init(0));
+ ASSERT_OK(client.Connect(listen_address));
+ CHECK_OK(client.SetRecvTimeout(MonoDelta::FromMilliseconds(100)));
+
+ int n;
+ std::unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
+ Status s = client.Recv(buf.get(), kEchoChunkSize, &n);
+
+ ASSERT_TRUE(!s.ok());
+ ASSERT_TRUE(s.IsNetworkError());
+ ASSERT_STR_MATCHES(s.message().ToString(), message);
+
+ t.join();
+ }
+};
+
+TEST_F(SocketTest, TestRecvReset) {
+ DoTest(false, "recv error from 127.0.0.1:[0-9]+: Resource temporarily unavailable");
+}
+
+TEST_F(SocketTest, TestRecvEOF) {
+ DoTest(true, "recv got EOF from 127.0.0.1:[0-9]+");
+}
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/net/socket.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/socket.cc b/be/src/kudu/util/net/socket.cc
new file mode 100644
index 0000000..cc14702
--- /dev/null
+++ b/be/src/kudu/util/net/socket.cc
@@ -0,0 +1,590 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/net/socket.h"
+
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <unistd.h>
+
+#include <cerrno>
+#include <cinttypes>
+#include <cstring>
+#include <limits>
+#include <ostream>
+#include <string>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/slice.h"
+
+DEFINE_string(local_ip_for_outbound_sockets, "",
+ "IP to bind to when making outgoing socket connections. "
+ "This must be an IP address of the form A.B.C.D, not a hostname. "
+ "Advanced parameter, subject to change.");
+TAG_FLAG(local_ip_for_outbound_sockets, experimental);
+
+DEFINE_bool(socket_inject_short_recvs, false,
+ "Inject short recv() responses which return less data than "
+ "requested");
+TAG_FLAG(socket_inject_short_recvs, hidden);
+TAG_FLAG(socket_inject_short_recvs, unsafe);
+
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+
+Socket::Socket()
+ : fd_(-1) {
+}
+
+Socket::Socket(int fd)
+ : fd_(fd) {
+}
+
+void Socket::Reset(int fd) {
+ ignore_result(Close());
+ fd_ = fd;
+}
+
+int Socket::Release() {
+ int fd = fd_;
+ fd_ = -1;
+ return fd;
+}
+
+Socket::~Socket() {
+ ignore_result(Close());
+}
+
+Status Socket::Close() {
+ if (fd_ < 0) {
+ return Status::OK();
+ }
+ int fd = fd_;
+ int ret;
+ RETRY_ON_EINTR(ret, ::close(fd));
+ if (ret < 0) {
+ int err = errno;
+ return Status::NetworkError("close error", ErrnoToString(err), err);
+ }
+ fd_ = -1;
+ return Status::OK();
+}
+
+Status Socket::Shutdown(bool shut_read, bool shut_write) {
+ DCHECK_GE(fd_, 0);
+ int flags = 0;
+ if (shut_read && shut_write) {
+ flags |= SHUT_RDWR;
+ } else if (shut_read) {
+ flags |= SHUT_RD;
+ } else if (shut_write) {
+ flags |= SHUT_WR;
+ }
+ if (::shutdown(fd_, flags) < 0) {
+ int err = errno;
+ return Status::NetworkError("shutdown error", ErrnoToString(err), err);
+ }
+ return Status::OK();
+}
+
+int Socket::GetFd() const {
+ return fd_;
+}
+
+bool Socket::IsTemporarySocketError(int err) {
+ return ((err == EAGAIN) || (err == EWOULDBLOCK) || (err == EINTR));
+}
+
+#if defined(__linux__)
+
+Status Socket::Init(int flags) {
+ int nonblocking_flag = (flags & FLAG_NONBLOCKING) ? SOCK_NONBLOCK : 0;
+ Reset(::socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC | nonblocking_flag, 0));
+ if (fd_ < 0) {
+ int err = errno;
+ return Status::NetworkError("error opening socket", ErrnoToString(err), err);
+ }
+
+ return Status::OK();
+}
+
+#else
+
+Status Socket::Init(int flags) {
+ Reset(::socket(AF_INET, SOCK_STREAM, 0));
+ if (fd_ < 0) {
+ int err = errno;
+ return Status::NetworkError("error opening socket", ErrnoToString(err), err);
+ }
+ RETURN_NOT_OK(SetNonBlocking(flags & FLAG_NONBLOCKING));
+ RETURN_NOT_OK(SetCloseOnExec());
+
+ // Disable SIGPIPE.
+ int set = 1;
+ RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, SO_NOSIGPIPE, set),
+ "failed to set SO_NOSIGPIPE");
+ return Status::OK();
+}
+
+#endif // defined(__linux__)
+
+Status Socket::SetNoDelay(bool enabled) {
+ int flag = enabled ? 1 : 0;
+ RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_NODELAY, flag),
+ "failed to set TCP_NODELAY");
+ return Status::OK();
+}
+
+Status Socket::SetTcpCork(bool enabled) {
+#if defined(__linux__)
+ int flag = enabled ? 1 : 0;
+ RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_CORK, flag),
+ "failed to set TCP_CORK");
+#endif // defined(__linux__)
+ // TODO(unknown): Use TCP_NOPUSH for OSX if perf becomes an issue.
+ return Status::OK();
+}
+
+Status Socket::SetNonBlocking(bool enabled) {
+ int curflags = ::fcntl(fd_, F_GETFL, 0);
+ if (curflags == -1) {
+ int err = errno;
+ return Status::NetworkError(
+ StringPrintf("Failed to get file status flags on fd %d", fd_),
+ ErrnoToString(err), err);
+ }
+ int newflags = (enabled) ? (curflags | O_NONBLOCK) : (curflags & ~O_NONBLOCK);
+ if (::fcntl(fd_, F_SETFL, newflags) == -1) {
+ int err = errno;
+ if (enabled) {
+ return Status::NetworkError(
+ StringPrintf("Failed to set O_NONBLOCK on fd %d", fd_),
+ ErrnoToString(err), err);
+ } else {
+ return Status::NetworkError(
+ StringPrintf("Failed to clear O_NONBLOCK on fd %d", fd_),
+ ErrnoToString(err), err);
+ }
+ }
+ return Status::OK();
+}
+
+Status Socket::IsNonBlocking(bool* is_nonblock) const {
+ int curflags = ::fcntl(fd_, F_GETFL, 0);
+ if (curflags == -1) {
+ int err = errno;
+ return Status::NetworkError(
+ StringPrintf("Failed to get file status flags on fd %d", fd_),
+ ErrnoToString(err), err);
+ }
+ *is_nonblock = ((curflags & O_NONBLOCK) != 0);
+ return Status::OK();
+}
+
+Status Socket::SetCloseOnExec() {
+ int curflags = fcntl(fd_, F_GETFD, 0);
+ if (curflags == -1) {
+ int err = errno;
+ Reset(-1);
+ return Status::NetworkError("fcntl(F_GETFD) error", ErrnoToString(err), err);
+ }
+ if (fcntl(fd_, F_SETFD, curflags | FD_CLOEXEC) == -1) {
+ int err = errno;
+ Reset(-1);
+ return Status::NetworkError("fcntl(F_SETFD) error", ErrnoToString(err), err);
+ }
+ return Status::OK();
+}
+
+Status Socket::SetSendTimeout(const MonoDelta& timeout) {
+ return SetTimeout(SO_SNDTIMEO, "SO_SNDTIMEO", timeout);
+}
+
+Status Socket::SetRecvTimeout(const MonoDelta& timeout) {
+ return SetTimeout(SO_RCVTIMEO, "SO_RCVTIMEO", timeout);
+}
+
+Status Socket::SetReuseAddr(bool flag) {
+ int int_flag = flag ? 1 : 0;
+ RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, SO_REUSEADDR, int_flag),
+ "failed to set SO_REUSEADDR");
+ return Status::OK();
+}
+
+Status Socket::SetReusePort(bool flag) {
+ int int_flag = flag ? 1 : 0;
+ RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, SO_REUSEPORT, int_flag),
+ "failed to set SO_REUSEPORT");
+ return Status::OK();
+}
+
+Status Socket::BindAndListen(const Sockaddr &sockaddr,
+ int listen_queue_size) {
+ RETURN_NOT_OK(SetReuseAddr(true));
+ RETURN_NOT_OK(Bind(sockaddr));
+ RETURN_NOT_OK(Listen(listen_queue_size));
+ return Status::OK();
+}
+
+Status Socket::Listen(int listen_queue_size) {
+ if (listen(fd_, listen_queue_size)) {
+ int err = errno;
+ return Status::NetworkError("listen() error", ErrnoToString(err));
+ }
+ return Status::OK();
+}
+
+Status Socket::GetSocketAddress(Sockaddr *cur_addr) const {
+ struct sockaddr_in sin;
+ socklen_t len = sizeof(sin);
+ DCHECK_GE(fd_, 0);
+ if (::getsockname(fd_, reinterpret_cast<struct sockaddr*>(&sin), &len) == -1) {
+ int err = errno;
+ return Status::NetworkError("getsockname error", ErrnoToString(err), err);
+ }
+ *cur_addr = sin;
+ return Status::OK();
+}
+
+Status Socket::GetPeerAddress(Sockaddr *cur_addr) const {
+ struct sockaddr_in sin;
+ socklen_t len = sizeof(sin);
+ DCHECK_GE(fd_, 0);
+ if (::getpeername(fd_, reinterpret_cast<struct sockaddr*>(&sin), &len) == -1) {
+ int err = errno;
+ return Status::NetworkError("getpeername error", ErrnoToString(err), err);
+ }
+ *cur_addr = sin;
+ return Status::OK();
+}
+
+bool Socket::IsLoopbackConnection() const {
+ Sockaddr local, remote;
+ if (!GetSocketAddress(&local).ok()) return false;
+ if (!GetPeerAddress(&remote).ok()) return false;
+
+ // Compare without comparing ports.
+ local.set_port(0);
+ remote.set_port(0);
+ return local == remote;
+}
+
+Status Socket::Bind(const Sockaddr& bind_addr) {
+ struct sockaddr_in addr = bind_addr.addr();
+
+ DCHECK_GE(fd_, 0);
+ if (PREDICT_FALSE(::bind(fd_, (struct sockaddr*) &addr, sizeof(addr)))) {
+ int err = errno;
+ Status s = Status::NetworkError(
+ strings::Substitute("error binding socket to $0: $1",
+ bind_addr.ToString(), ErrnoToString(err)),
+ Slice(), err);
+
+ if (s.IsNetworkError() && s.posix_code() == EADDRINUSE && bind_addr.port() != 0) {
+ TryRunLsof(bind_addr);
+ }
+ return s;
+ }
+
+ return Status::OK();
+}
+
+Status Socket::Accept(Socket *new_conn, Sockaddr *remote, int flags) {
+ TRACE_EVENT0("net", "Socket::Accept");
+ struct sockaddr_in addr;
+ socklen_t olen = sizeof(addr);
+ DCHECK_GE(fd_, 0);
+#if defined(__linux__)
+ int accept_flags = SOCK_CLOEXEC;
+ if (flags & FLAG_NONBLOCKING) {
+ accept_flags |= SOCK_NONBLOCK;
+ }
+ int fd = -1;
+ RETRY_ON_EINTR(fd, accept4(fd_, (struct sockaddr*)&addr,
+ &olen, accept_flags));
+ if (fd < 0) {
+ int err = errno;
+ return Status::NetworkError("accept4(2) error", ErrnoToString(err), err);
+ }
+ new_conn->Reset(fd);
+
+#else
+ int fd = -1;
+ RETRY_ON_EINTR(fd, accept(fd_, (struct sockaddr*)&addr, &olen));
+ if (fd < 0) {
+ int err = errno;
+ return Status::NetworkError("accept(2) error", ErrnoToString(err), err);
+ }
+ new_conn->Reset(fd);
+ RETURN_NOT_OK(new_conn->SetNonBlocking(flags & FLAG_NONBLOCKING));
+ RETURN_NOT_OK(new_conn->SetCloseOnExec());
+#endif // defined(__linux__)
+
+ *remote = addr;
+ TRACE_EVENT_INSTANT1("net", "Accepted", TRACE_EVENT_SCOPE_THREAD,
+ "remote", remote->ToString());
+ return Status::OK();
+}
+
+Status Socket::BindForOutgoingConnection() {
+ Sockaddr bind_host;
+ Status s = bind_host.ParseString(FLAGS_local_ip_for_outbound_sockets, 0);
+ CHECK(s.ok() && bind_host.port() == 0)
+ << "Invalid local IP set for 'local_ip_for_outbound_sockets': '"
+ << FLAGS_local_ip_for_outbound_sockets << "': " << s.ToString();
+
+ RETURN_NOT_OK(Bind(bind_host));
+ return Status::OK();
+}
+
+Status Socket::Connect(const Sockaddr &remote) {
+ TRACE_EVENT1("net", "Socket::Connect",
+ "remote", remote.ToString());
+ if (PREDICT_FALSE(!FLAGS_local_ip_for_outbound_sockets.empty())) {
+ RETURN_NOT_OK(BindForOutgoingConnection());
+ }
+
+ struct sockaddr_in addr;
+ memcpy(&addr, &remote.addr(), sizeof(sockaddr_in));
+ DCHECK_GE(fd_, 0);
+ int ret;
+ RETRY_ON_EINTR(ret, ::connect(
+ fd_, reinterpret_cast<const struct sockaddr*>(&addr), sizeof(addr)));
+ if (ret < 0) {
+ int err = errno;
+ return Status::NetworkError("connect(2) error", ErrnoToString(err), err);
+ }
+ return Status::OK();
+}
+
+Status Socket::GetSockError() const {
+ int val = 0, ret;
+ socklen_t val_len = sizeof(val);
+ DCHECK_GE(fd_, 0);
+ ret = ::getsockopt(fd_, SOL_SOCKET, SO_ERROR, &val, &val_len);
+ if (ret) {
+ int err = errno;
+ return Status::NetworkError("getsockopt(SO_ERROR) failed", ErrnoToString(err), err);
+ }
+ if (val != 0) {
+ return Status::NetworkError(ErrnoToString(val), Slice(), val);
+ }
+ return Status::OK();
+}
+
+Status Socket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
+ if (amt <= 0) {
+ return Status::NetworkError(
+ StringPrintf("invalid send of %" PRId32 " bytes",
+ amt), Slice(), EINVAL);
+ }
+ DCHECK_GE(fd_, 0);
+ int res;
+ RETRY_ON_EINTR(res, ::send(fd_, buf, amt, MSG_NOSIGNAL));
+ if (res < 0) {
+ int err = errno;
+ return Status::NetworkError("write error", ErrnoToString(err), err);
+ }
+ *nwritten = res;
+ return Status::OK();
+}
+
+Status Socket::Writev(const struct ::iovec *iov, int iov_len,
+ int64_t *nwritten) {
+ if (PREDICT_FALSE(iov_len <= 0)) {
+ return Status::NetworkError(
+ StringPrintf("writev: invalid io vector length of %d",
+ iov_len),
+ Slice(), EINVAL);
+ }
+ DCHECK_GE(fd_, 0);
+
+ struct msghdr msg;
+ memset(&msg, 0, sizeof(struct msghdr));
+ msg.msg_iov = const_cast<iovec *>(iov);
+ msg.msg_iovlen = iov_len;
+ ssize_t res;
+ RETRY_ON_EINTR(res, ::sendmsg(fd_, &msg, MSG_NOSIGNAL));
+ if (PREDICT_FALSE(res < 0)) {
+ int err = errno;
+ return Status::NetworkError("sendmsg error", ErrnoToString(err), err);
+ }
+
+ *nwritten = res;
+ return Status::OK();
+}
+
+// Mostly follows writen() from Stevens (2004) or Kerrisk (2010).
+Status Socket::BlockingWrite(const uint8_t *buf, size_t buflen, size_t *nwritten,
+ const MonoTime& deadline) {
+ DCHECK_LE(buflen, std::numeric_limits<int32_t>::max()) << "Writes > INT32_MAX not supported";
+ DCHECK(nwritten);
+
+ size_t tot_written = 0;
+ while (tot_written < buflen) {
+ int32_t inc_num_written = 0;
+ int32_t num_to_write = buflen - tot_written;
+ MonoDelta timeout = deadline - MonoTime::Now();
+ if (PREDICT_FALSE(timeout.ToNanoseconds() <= 0)) {
+ return Status::TimedOut("BlockingWrite timed out");
+ }
+ RETURN_NOT_OK(SetSendTimeout(timeout));
+ Status s = Write(buf, num_to_write, &inc_num_written);
+ tot_written += inc_num_written;
+ buf += inc_num_written;
+ *nwritten = tot_written;
+
+ if (PREDICT_FALSE(!s.ok())) {
+ // Continue silently when the syscall is interrupted.
+ if (s.posix_code() == EINTR) {
+ continue;
+ }
+ if (s.posix_code() == EAGAIN) {
+ return Status::TimedOut("");
+ }
+ return s.CloneAndPrepend("BlockingWrite error");
+ }
+ if (PREDICT_FALSE(inc_num_written == 0)) {
+ // Shouldn't happen on Linux with a blocking socket. Maybe other Unices.
+ break;
+ }
+ }
+
+ if (tot_written < buflen) {
+ return Status::IOError("Wrote zero bytes on a BlockingWrite() call",
+ StringPrintf("Transferred %zu of %zu bytes", tot_written, buflen));
+ }
+ return Status::OK();
+}
+
+Status Socket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
+ if (amt <= 0) {
+ return Status::NetworkError(
+ StringPrintf("invalid recv of %d bytes", amt), Slice(), EINVAL);
+ }
+
+ // The recv() call can return fewer than the requested number of bytes.
+ // Especially when 'amt' is small, this is very unlikely to happen in
+ // the context of unit tests. So, we provide an injection hook which
+ // simulates the same behavior.
+ if (PREDICT_FALSE(FLAGS_socket_inject_short_recvs && amt > 1)) {
+ Random r(GetRandomSeed32());
+ amt = 1 + r.Uniform(amt - 1);
+ }
+
+ DCHECK_GE(fd_, 0);
+ int res;
+ RETRY_ON_EINTR(res, recv(fd_, buf, amt, 0));
+ if (res <= 0) {
+ Sockaddr remote;
+ GetPeerAddress(&remote);
+ if (res == 0) {
+ string error_message = Substitute("recv got EOF from $0", remote.ToString());
+ return Status::NetworkError(error_message, Slice(), ESHUTDOWN);
+ }
+ int err = errno;
+ string error_message = Substitute("recv error from $0", remote.ToString());
+ return Status::NetworkError(error_message, ErrnoToString(err), err);
+ }
+ *nread = res;
+ return Status::OK();
+}
+
+// Mostly follows readn() from Stevens (2004) or Kerrisk (2010).
+// One place where we deviate: we consider EOF a failure if < amt bytes are read.
+Status Socket::BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const MonoTime& deadline) {
+ DCHECK_LE(amt, std::numeric_limits<int32_t>::max()) << "Reads > INT32_MAX not supported";
+ DCHECK(nread);
+ size_t tot_read = 0;
+ while (tot_read < amt) {
+ int32_t inc_num_read = 0;
+ int32_t num_to_read = amt - tot_read;
+ MonoDelta timeout = deadline - MonoTime::Now();
+ if (PREDICT_FALSE(timeout.ToNanoseconds() <= 0)) {
+ return Status::TimedOut("");
+ }
+ RETURN_NOT_OK(SetRecvTimeout(timeout));
+ Status s = Recv(buf, num_to_read, &inc_num_read);
+ tot_read += inc_num_read;
+ buf += inc_num_read;
+ *nread = tot_read;
+
+ if (PREDICT_FALSE(!s.ok())) {
+ // Continue silently when the syscall is interrupted.
+ if (s.posix_code() == EINTR) {
+ continue;
+ }
+ if (s.posix_code() == EAGAIN) {
+ return Status::TimedOut("");
+ }
+ return s.CloneAndPrepend("BlockingRecv error");
+ }
+ if (PREDICT_FALSE(inc_num_read == 0)) {
+ // EOF.
+ break;
+ }
+ }
+
+ if (PREDICT_FALSE(tot_read < amt)) {
+ return Status::IOError("Read zero bytes on a blocking Recv() call",
+ StringPrintf("Transferred %zu of %zu bytes", tot_read, amt));
+ }
+ return Status::OK();
+}
+
+Status Socket::SetTimeout(int opt, const char* optname, const MonoDelta& timeout) {
+ if (PREDICT_FALSE(timeout.ToNanoseconds() < 0)) {
+ return Status::InvalidArgument("Timeout specified as negative to SetTimeout",
+ timeout.ToString());
+ }
+ struct timeval tv;
+ timeout.ToTimeVal(&tv);
+ RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, opt, tv),
+ Substitute("failed to set socket option $0 to $1",
+ optname, timeout.ToString()));
+ return Status::OK();
+}
+
+template<typename T>
+Status Socket::SetSockOpt(int level, int option, const T& value) {
+ if (::setsockopt(fd_, level, option, &value, sizeof(T)) == -1) {
+ int err = errno;
+ return Status::NetworkError(ErrnoToString(err), Slice(), err);
+ }
+ return Status::OK();
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/net/socket.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/socket.h b/be/src/kudu/util/net/socket.h
new file mode 100644
index 0000000..992c44a
--- /dev/null
+++ b/be/src/kudu/util/net/socket.h
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_NET_SOCKET_H
+#define KUDU_UTIL_NET_SOCKET_H
+
+#include <cstddef>
+#include <cstdint>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+struct iovec;
+
+namespace kudu {
+
+class MonoDelta;
+class MonoTime;
+class Sockaddr;
+
+class Socket {
+ public:
+ static const int FLAG_NONBLOCKING = 0x1;
+
+ // Create a new invalid Socket object.
+ Socket();
+
+ // Start managing a socket.
+ explicit Socket(int fd);
+
+ // Close the socket. Errors will be ignored.
+ virtual ~Socket();
+
+ // Close the Socket, checking for errors.
+ virtual Status Close();
+
+ // call shutdown() on the socket
+ Status Shutdown(bool shut_read, bool shut_write);
+
+ // Start managing a socket.
+ void Reset(int fd);
+
+ // Stop managing the socket and return it.
+ int Release();
+
+ // Get the raw file descriptor, or -1 if there is no file descriptor being
+ // managed.
+ int GetFd() const;
+
+ // Returns true if the error is temporary and will go away if we retry on
+ // the socket.
+ static bool IsTemporarySocketError(int err);
+
+ Status Init(int flags); // See FLAG_NONBLOCKING
+
+ // Set or clear TCP_NODELAY
+ Status SetNoDelay(bool enabled);
+
+ // Set or clear TCP_CORK
+ Status SetTcpCork(bool enabled);
+
+ // Set or clear O_NONBLOCK
+ Status SetNonBlocking(bool enabled);
+ Status IsNonBlocking(bool* is_nonblock) const;
+
+ // Set SO_SENDTIMEO to the specified value. Should only be used for blocking sockets.
+ Status SetSendTimeout(const MonoDelta& timeout);
+
+ // Set SO_RCVTIMEO to the specified value. Should only be used for blocking sockets.
+ Status SetRecvTimeout(const MonoDelta& timeout);
+
+ // Sets SO_REUSEADDR to 'flag'. Should be used prior to Bind().
+ Status SetReuseAddr(bool flag);
+
+ // Sets SO_REUSEPORT to 'flag'. Should be used prior to Bind().
+ Status SetReusePort(bool flag);
+
+ // Convenience method to invoke the common sequence:
+ // 1) SetReuseAddr(true)
+ // 2) Bind()
+ // 3) Listen()
+ Status BindAndListen(const Sockaddr &sockaddr, int listen_queue_size);
+
+ // Start listening for new connections, with the given backlog size.
+ // Requires that the socket has already been bound using Bind().
+ Status Listen(int listen_queue_size);
+
+ // Call getsockname to get the address of this socket.
+ Status GetSocketAddress(Sockaddr *cur_addr) const;
+
+ // Call getpeername to get the address of the connected peer.
+ // It is virtual so that tests can override.
+ virtual Status GetPeerAddress(Sockaddr *cur_addr) const;
+
+ // Return true if this socket is determined to be a loopback connection
+ // (i.e. the local and remote peer share an IP address).
+ //
+ // If any error occurs while determining this, returns false.
+ bool IsLoopbackConnection() const;
+
+ // Call bind() to bind the socket to a given address.
+ // If bind() fails and indicates that the requested port is already in use,
+ // generates an informative log message by calling 'lsof' if available.
+ Status Bind(const Sockaddr& bind_addr);
+
+ // Call accept(2) to get a new connection.
+ Status Accept(Socket *new_conn, Sockaddr *remote, int flags);
+
+ // start connecting this socket to a remote address.
+ Status Connect(const Sockaddr &remote);
+
+ // get the error status using getsockopt(2)
+ Status GetSockError() const;
+
+ // Write up to 'amt' bytes from 'buf' to the socket. The number of bytes
+ // actually written will be stored in 'nwritten'. If an error is returned,
+ // the value of 'nwritten' is undefined.
+ virtual Status Write(const uint8_t *buf, int32_t amt, int32_t *nwritten);
+
+ // Vectorized Write.
+ // If there is an error, that error needs to be resolved before calling again.
+ // If there was no error, but not all the bytes were written, the unwritten
+ // bytes must be retried. See writev(2) for more information.
+ virtual Status Writev(const struct ::iovec *iov, int iov_len, int64_t *nwritten);
+
+ // Blocking Write call, returns IOError unless full buffer is sent.
+ // Underlying Socket expected to be in blocking mode. Fails if any Write() sends 0 bytes.
+ // Returns OK if buflen bytes were sent, otherwise IOError.
+ // Upon return, nwritten will contain the number of bytes actually written.
+ // See also writen() from Stevens (2004) or Kerrisk (2010)
+ Status BlockingWrite(const uint8_t *buf, size_t buflen, size_t *nwritten,
+ const MonoTime& deadline);
+
+ virtual Status Recv(uint8_t *buf, int32_t amt, int32_t *nread);
+
+ // Blocking Recv call, returns IOError unless requested amt bytes are read.
+ // Underlying Socket expected to be in blocking mode. Fails if any Recv() reads 0 bytes.
+ // Returns OK if amt bytes were read, otherwise IOError.
+ // Upon return, nread will contain the number of bytes actually read.
+ // See also readn() from Stevens (2004) or Kerrisk (2010)
+ Status BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const MonoTime& deadline);
+
+ private:
+ // Called internally from SetSend/RecvTimeout().
+ Status SetTimeout(int opt, const char* optname, const MonoDelta& timeout);
+
+ // Called internally during socket setup.
+ Status SetCloseOnExec();
+
+ // Bind the socket to a local address before making an outbound connection,
+ // based on the value of FLAGS_local_ip_for_outbound_sockets.
+ Status BindForOutgoingConnection();
+
+ // Set an option on the socket.
+ template<typename T>
+ Status SetSockOpt(int level, int option, const T& value);
+
+ int fd_;
+
+ DISALLOW_COPY_AND_ASSIGN(Socket);
+};
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/nvm_cache.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/nvm_cache.cc b/be/src/kudu/util/nvm_cache.cc
new file mode 100644
index 0000000..ef52ec0
--- /dev/null
+++ b/be/src/kudu/util/nvm_cache.cc
@@ -0,0 +1,577 @@
+// This file is derived from cache.cc in the LevelDB project:
+//
+// Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+//
+// ------------------------------------------------------------
+// This file implements a cache based on the NVML library (http://pmem.io),
+// specifically its "libvmem" component. This library makes it easy to program
+// against persistent memory hardware by exposing an API which parallels
+// malloc/free, but allocates from persistent memory instead of DRAM.
+//
+// We use this API to implement a cache which treats persistent memory or
+// non-volatile memory as if it were a larger cheaper bank of volatile memory. We
+// currently make no use of its persistence properties.
+//
+// Currently, we only store key/value in NVM. All other data structures such as the
+// ShardedLRUCache instances, hash table, etc are in DRAM. The assumption is that
+// the ratio of data stored vs overhead is quite high.
+
+#include "kudu/util/nvm_cache.h"
+
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <libvmem.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/atomic_refcount.h"
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/hash/city.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/cache_metrics.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/slice.h"
+
+DEFINE_string(nvm_cache_path, "/vmem",
+ "The path at which the NVM cache will try to allocate its memory. "
+ "This can be a tmpfs or ramfs for testing purposes.");
+TAG_FLAG(nvm_cache_path, experimental);
+
+DEFINE_int32(nvm_cache_allocation_retry_count, 10,
+ "The number of times that the NVM cache will retry attempts to allocate "
+ "memory for new entries. In between attempts, a cache entry will be "
+ "evicted.");
+TAG_FLAG(nvm_cache_allocation_retry_count, advanced);
+TAG_FLAG(nvm_cache_allocation_retry_count, experimental);
+
+DEFINE_bool(nvm_cache_simulate_allocation_failure, false,
+ "If true, the NVM cache will inject failures in calls to vmem_malloc "
+ "for testing.");
+TAG_FLAG(nvm_cache_simulate_allocation_failure, unsafe);
+
+
+namespace kudu {
+
+namespace {
+
+using std::shared_ptr;
+using std::string;
+using std::vector;
+
+typedef simple_spinlock MutexType;
+
+// LRU cache implementation
+
+// An entry is a variable length heap-allocated structure. Entries
+// are kept in a circular doubly linked list ordered by access time.
+struct LRUHandle {
+ Cache::EvictionCallback* eviction_callback;
+ LRUHandle* next_hash;
+ LRUHandle* next;
+ LRUHandle* prev;
+ size_t charge; // TODO(opt): Only allow uint32_t?
+ uint32_t key_length;
+ uint32_t val_length;
+ Atomic32 refs;
+ uint32_t hash; // Hash of key(); used for fast sharding and comparisons
+ uint8_t* kv_data;
+
+ Slice key() const {
+ return Slice(kv_data, key_length);
+ }
+
+ Slice value() const {
+ return Slice(&kv_data[key_length], val_length);
+ }
+
+ uint8_t* val_ptr() {
+ return &kv_data[key_length];
+ }
+};
+
+// We provide our own simple hash table since it removes a whole bunch
+// of porting hacks and is also faster than some of the built-in hash
+// table implementations in some of the compiler/runtime combinations
+// we have tested. E.g., readrandom speeds up by ~5% over the g++
+// 4.4.3's builtin hashtable.
+class HandleTable {
+ public:
+ HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); }
+ ~HandleTable() { delete[] list_; }
+
+ LRUHandle* Lookup(const Slice& key, uint32_t hash) {
+ return *FindPointer(key, hash);
+ }
+
+ LRUHandle* Insert(LRUHandle* h) {
+ LRUHandle** ptr = FindPointer(h->key(), h->hash);
+ LRUHandle* old = *ptr;
+ h->next_hash = (old == NULL ? NULL : old->next_hash);
+ *ptr = h;
+ if (old == NULL) {
+ ++elems_;
+ if (elems_ > length_) {
+ // Since each cache entry is fairly large, we aim for a small
+ // average linked list length (<= 1).
+ Resize();
+ }
+ }
+ return old;
+ }
+
+ LRUHandle* Remove(const Slice& key, uint32_t hash) {
+ LRUHandle** ptr = FindPointer(key, hash);
+ LRUHandle* result = *ptr;
+ if (result != NULL) {
+ *ptr = result->next_hash;
+ --elems_;
+ }
+ return result;
+ }
+
+ private:
+ // The table consists of an array of buckets where each bucket is
+ // a linked list of cache entries that hash into the bucket.
+ uint32_t length_;
+ uint32_t elems_;
+ LRUHandle** list_;
+
+ // Return a pointer to slot that points to a cache entry that
+ // matches key/hash. If there is no such cache entry, return a
+ // pointer to the trailing slot in the corresponding linked list.
+ LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
+ LRUHandle** ptr = &list_[hash & (length_ - 1)];
+ while (*ptr != NULL &&
+ ((*ptr)->hash != hash || key != (*ptr)->key())) {
+ ptr = &(*ptr)->next_hash;
+ }
+ return ptr;
+ }
+
+ void Resize() {
+ uint32_t new_length = 16;
+ while (new_length < elems_ * 1.5) {
+ new_length *= 2;
+ }
+ LRUHandle** new_list = new LRUHandle*[new_length];
+ memset(new_list, 0, sizeof(new_list[0]) * new_length);
+ uint32_t count = 0;
+ for (uint32_t i = 0; i < length_; i++) {
+ LRUHandle* h = list_[i];
+ while (h != NULL) {
+ LRUHandle* next = h->next_hash;
+ uint32_t hash = h->hash;
+ LRUHandle** ptr = &new_list[hash & (new_length - 1)];
+ h->next_hash = *ptr;
+ *ptr = h;
+ h = next;
+ count++;
+ }
+ }
+ DCHECK_EQ(elems_, count);
+ delete[] list_;
+ list_ = new_list;
+ length_ = new_length;
+ }
+};
+
+// A single shard of sharded cache.
+class NvmLRUCache {
+ public:
+ explicit NvmLRUCache(VMEM *vmp);
+ ~NvmLRUCache();
+
+ // Separate from constructor so caller can easily make an array of LRUCache
+ void SetCapacity(size_t capacity) { capacity_ = capacity; }
+
+ void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; }
+
+ Cache::Handle* Insert(LRUHandle* h, Cache::EvictionCallback* eviction_callback);
+
+ // Like Cache::Lookup, but with an extra "hash" parameter.
+ Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
+ void Release(Cache::Handle* handle);
+ void Erase(const Slice& key, uint32_t hash);
+ void* AllocateAndRetry(size_t size);
+
+ private:
+ void NvmLRU_Remove(LRUHandle* e);
+ void NvmLRU_Append(LRUHandle* e);
+ // Just reduce the reference count by 1.
+ // Return true if last reference
+ bool Unref(LRUHandle* e);
+ void FreeEntry(LRUHandle* e);
+
+ // Evict the LRU item in the cache, adding it to the linked list
+ // pointed to by 'to_remove_head'.
+ void EvictOldestUnlocked(LRUHandle** to_remove_head);
+
+ // Free all of the entries in the linked list that has to_free_head
+ // as its head.
+ void FreeLRUEntries(LRUHandle* to_free_head);
+
+ // Wrapper around vmem_malloc which injects failures based on a flag.
+ void* VmemMalloc(size_t size);
+
+ // Initialized before use.
+ size_t capacity_;
+
+ // mutex_ protects the following state.
+ MutexType mutex_;
+ size_t usage_;
+
+ // Dummy head of LRU list.
+ // lru.prev is newest entry, lru.next is oldest entry.
+ LRUHandle lru_;
+
+ HandleTable table_;
+
+ VMEM* vmp_;
+
+ CacheMetrics* metrics_;
+};
+
+NvmLRUCache::NvmLRUCache(VMEM* vmp)
+ : usage_(0),
+ vmp_(vmp),
+ metrics_(NULL) {
+ // Make empty circular linked list
+ lru_.next = &lru_;
+ lru_.prev = &lru_;
+}
+
+NvmLRUCache::~NvmLRUCache() {
+ for (LRUHandle* e = lru_.next; e != &lru_; ) {
+ LRUHandle* next = e->next;
+ DCHECK_EQ(e->refs, 1); // Error if caller has an unreleased handle
+ if (Unref(e)) {
+ FreeEntry(e);
+ }
+ e = next;
+ }
+}
+
+void* NvmLRUCache::VmemMalloc(size_t size) {
+ if (PREDICT_FALSE(FLAGS_nvm_cache_simulate_allocation_failure)) {
+ return NULL;
+ }
+ return vmem_malloc(vmp_, size);
+}
+
+bool NvmLRUCache::Unref(LRUHandle* e) {
+ DCHECK_GT(ANNOTATE_UNPROTECTED_READ(e->refs), 0);
+ return !base::RefCountDec(&e->refs);
+}
+
+void NvmLRUCache::FreeEntry(LRUHandle* e) {
+ DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(e->refs), 0);
+ if (e->eviction_callback) {
+ e->eviction_callback->EvictedEntry(e->key(), e->value());
+ }
+ if (PREDICT_TRUE(metrics_)) {
+ metrics_->cache_usage->DecrementBy(e->charge);
+ metrics_->evictions->Increment();
+ }
+ vmem_free(vmp_, e);
+}
+
+// Allocate nvm memory. Try until successful or FLAGS_nvm_cache_allocation_retry_count
+// has been exceeded.
+void *NvmLRUCache::AllocateAndRetry(size_t size) {
+ void *tmp;
+ // There may be times that an allocation fails. With NVM we have
+ // a fixed size to allocate from. If we cannot allocate the size
+ // that was asked for, we will remove entries from the cache and
+ // retry up to the configured number of retries. If this fails, we
+ // return NULL, which will cause the caller to not insert anything
+ // into the cache.
+ LRUHandle *to_remove_head = NULL;
+ tmp = VmemMalloc(size);
+
+ if (tmp == NULL) {
+ std::unique_lock<MutexType> l(mutex_);
+
+ int retries_remaining = FLAGS_nvm_cache_allocation_retry_count;
+ while (tmp == NULL && retries_remaining-- > 0 && lru_.next != &lru_) {
+ EvictOldestUnlocked(&to_remove_head);
+
+ // Unlock while allocating memory.
+ l.unlock();
+ tmp = VmemMalloc(size);
+ l.lock();
+ }
+ }
+
+ // we free the entries here outside of mutex for
+ // performance reasons
+ FreeLRUEntries(to_remove_head);
+ return tmp;
+}
+
+void NvmLRUCache::NvmLRU_Remove(LRUHandle* e) {
+ e->next->prev = e->prev;
+ e->prev->next = e->next;
+ usage_ -= e->charge;
+}
+
+void NvmLRUCache::NvmLRU_Append(LRUHandle* e) {
+ // Make "e" newest entry by inserting just before lru_
+ e->next = &lru_;
+ e->prev = lru_.prev;
+ e->prev->next = e;
+ e->next->prev = e;
+ usage_ += e->charge;
+}
+
+Cache::Handle* NvmLRUCache::Lookup(const Slice& key, uint32_t hash, bool caching) {
+ LRUHandle* e;
+ {
+ std::lock_guard<MutexType> l(mutex_);
+ e = table_.Lookup(key, hash);
+ if (e != NULL) {
+ // If an entry exists, remove the old entry from the cache
+ // and re-add to the end of the linked list.
+ base::RefCountInc(&e->refs);
+ NvmLRU_Remove(e);
+ NvmLRU_Append(e);
+ }
+ }
+
+ // Do the metrics outside of the lock.
+ if (metrics_) {
+ metrics_->lookups->Increment();
+ bool was_hit = (e != NULL);
+ if (was_hit) {
+ if (caching) {
+ metrics_->cache_hits_caching->Increment();
+ } else {
+ metrics_->cache_hits->Increment();
+ }
+ } else {
+ if (caching) {
+ metrics_->cache_misses_caching->Increment();
+ } else {
+ metrics_->cache_misses->Increment();
+ }
+ }
+ }
+
+ return reinterpret_cast<Cache::Handle*>(e);
+}
+
+void NvmLRUCache::Release(Cache::Handle* handle) {
+ LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
+ bool last_reference = Unref(e);
+ if (last_reference) {
+ FreeEntry(e);
+ }
+}
+
+void NvmLRUCache::EvictOldestUnlocked(LRUHandle** to_remove_head) {
+ LRUHandle* old = lru_.next;
+ NvmLRU_Remove(old);
+ table_.Remove(old->key(), old->hash);
+ if (Unref(old)) {
+ old->next = *to_remove_head;
+ *to_remove_head = old;
+ }
+}
+
+void NvmLRUCache::FreeLRUEntries(LRUHandle* to_free_head) {
+ while (to_free_head != NULL) {
+ LRUHandle* next = to_free_head->next;
+ FreeEntry(to_free_head);
+ to_free_head = next;
+ }
+}
+
+Cache::Handle* NvmLRUCache::Insert(LRUHandle* e,
+ Cache::EvictionCallback* eviction_callback) {
+ DCHECK(e);
+ LRUHandle* to_remove_head = NULL;
+
+ e->refs = 2; // One from LRUCache, one for the returned handle
+ e->eviction_callback = eviction_callback;
+ if (PREDICT_TRUE(metrics_)) {
+ metrics_->cache_usage->IncrementBy(e->charge);
+ metrics_->inserts->Increment();
+ }
+
+ {
+ std::lock_guard<MutexType> l(mutex_);
+
+ NvmLRU_Append(e);
+
+ LRUHandle* old = table_.Insert(e);
+ if (old != NULL) {
+ NvmLRU_Remove(old);
+ if (Unref(old)) {
+ old->next = to_remove_head;
+ to_remove_head = old;
+ }
+ }
+
+ while (usage_ > capacity_ && lru_.next != &lru_) {
+ EvictOldestUnlocked(&to_remove_head);
+ }
+ }
+
+ // we free the entries here outside of mutex for
+ // performance reasons
+ FreeLRUEntries(to_remove_head);
+
+ return reinterpret_cast<Cache::Handle*>(e);
+}
+
+void NvmLRUCache::Erase(const Slice& key, uint32_t hash) {
+ LRUHandle* e;
+ bool last_reference = false;
+ {
+ std::lock_guard<MutexType> l(mutex_);
+ e = table_.Remove(key, hash);
+ if (e != NULL) {
+ NvmLRU_Remove(e);
+ last_reference = Unref(e);
+ }
+ }
+ // mutex not held here
+ // last_reference will only be true if e != NULL
+ if (last_reference) {
+ FreeEntry(e);
+ }
+}
+static const int kNumShardBits = 4;
+static const int kNumShards = 1 << kNumShardBits;
+
+class ShardedLRUCache : public Cache {
+ private:
+ gscoped_ptr<CacheMetrics> metrics_;
+ vector<NvmLRUCache*> shards_;
+ VMEM* vmp_;
+
+ static inline uint32_t HashSlice(const Slice& s) {
+ return util_hash::CityHash64(
+ reinterpret_cast<const char *>(s.data()), s.size());
+ }
+
+ static uint32_t Shard(uint32_t hash) {
+ return hash >> (32 - kNumShardBits);
+ }
+
+ public:
+ explicit ShardedLRUCache(size_t capacity, const string& /*id*/, VMEM* vmp)
+ : vmp_(vmp) {
+
+ const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
+ for (int s = 0; s < kNumShards; s++) {
+ gscoped_ptr<NvmLRUCache> shard(new NvmLRUCache(vmp_));
+ shard->SetCapacity(per_shard);
+ shards_.push_back(shard.release());
+ }
+ }
+
+ virtual ~ShardedLRUCache() {
+ STLDeleteElements(&shards_);
+ // Per the note at the top of this file, our cache is entirely volatile.
+ // Hence, when the cache is destructed, we delete the underlying
+ // VMEM pool.
+ vmem_delete(vmp_);
+ }
+
+ virtual Handle* Insert(PendingHandle* handle,
+ Cache::EvictionCallback* eviction_callback) OVERRIDE {
+ LRUHandle* h = reinterpret_cast<LRUHandle*>(DCHECK_NOTNULL(handle));
+ return shards_[Shard(h->hash)]->Insert(h, eviction_callback);
+ }
+ virtual Handle* Lookup(const Slice& key, CacheBehavior caching) OVERRIDE {
+ const uint32_t hash = HashSlice(key);
+ return shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE);
+ }
+ virtual void Release(Handle* handle) OVERRIDE {
+ LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
+ shards_[Shard(h->hash)]->Release(handle);
+ }
+ virtual void Erase(const Slice& key) OVERRIDE {
+ const uint32_t hash = HashSlice(key);
+ shards_[Shard(hash)]->Erase(key, hash);
+ }
+ virtual Slice Value(Handle* handle) OVERRIDE {
+ return reinterpret_cast<LRUHandle*>(handle)->value();
+ }
+ virtual uint8_t* MutableValue(PendingHandle* handle) OVERRIDE {
+ return reinterpret_cast<LRUHandle*>(handle)->val_ptr();
+ }
+
+ virtual void SetMetrics(const scoped_refptr<MetricEntity>& entity) OVERRIDE {
+ metrics_.reset(new CacheMetrics(entity));
+ for (NvmLRUCache* cache : shards_) {
+ cache->SetMetrics(metrics_.get());
+ }
+ }
+ virtual PendingHandle* Allocate(Slice key, int val_len, int charge) OVERRIDE {
+ int key_len = key.size();
+ DCHECK_GE(key_len, 0);
+ DCHECK_GE(val_len, 0);
+ LRUHandle* handle = nullptr;
+
+ // Try allocating from each of the shards -- if vmem is tight,
+ // this can cause eviction, so we might have better luck in different
+ // shards.
+ for (NvmLRUCache* cache : shards_) {
+ uint8_t* buf = static_cast<uint8_t*>(cache->AllocateAndRetry(
+ sizeof(LRUHandle) + key_len + val_len));
+ if (buf) {
+ handle = reinterpret_cast<LRUHandle*>(buf);
+ handle->kv_data = &buf[sizeof(LRUHandle)];
+ handle->val_length = val_len;
+ handle->key_length = key_len;
+ handle->charge = (charge == kAutomaticCharge) ?
+ vmem_malloc_usable_size(vmp_, buf) : charge;
+ handle->hash = HashSlice(key);
+ memcpy(handle->kv_data, key.data(), key.size());
+ return reinterpret_cast<PendingHandle*>(handle);
+ }
+ }
+ // TODO: increment a metric here on allocation failure.
+ return nullptr;
+ }
+
+ virtual void Free(PendingHandle* ph) OVERRIDE {
+ vmem_free(vmp_, ph);
+ }
+};
+
+} // end anonymous namespace
+
+Cache* NewLRUNvmCache(size_t capacity, const std::string& id) {
+ // vmem_create() will fail if the capacity is too small, but with
+ // an inscrutable error. So, we'll check ourselves.
+ CHECK_GE(capacity, VMEM_MIN_POOL)
+ << "configured capacity " << capacity << " bytes is less than "
+ << "the minimum capacity for an NVM cache: " << VMEM_MIN_POOL;
+
+ VMEM* vmp = vmem_create(FLAGS_nvm_cache_path.c_str(), capacity);
+ // If we cannot create the cache pool we should not retry.
+ PLOG_IF(FATAL, vmp == NULL) << "Could not initialize NVM cache library in path "
+ << FLAGS_nvm_cache_path.c_str();
+
+ return new ShardedLRUCache(capacity, id, vmp);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/nvm_cache.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/nvm_cache.h b/be/src/kudu/util/nvm_cache.h
new file mode 100644
index 0000000..9a65316
--- /dev/null
+++ b/be/src/kudu/util/nvm_cache.h
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_NVM_CACHE_H_
+#define KUDU_UTIL_NVM_CACHE_H_
+
+#include <cstddef>
+#include <string>
+
+namespace kudu {
+class Cache;
+
+// Create a cache in persistent memory with the given capacity.
+Cache* NewLRUNvmCache(size_t capacity, const std::string& id);
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/object_pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/object_pool-test.cc b/be/src/kudu/util/object_pool-test.cc
new file mode 100644
index 0000000..ecfd641
--- /dev/null
+++ b/be/src/kudu/util/object_pool-test.cc
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/util/object_pool.h"
+
+namespace kudu {
+
+// Simple class which maintains a count of how many objects
+// are currently alive.
+class MyClass {
+ public:
+ MyClass() {
+ instance_count_++;
+ }
+
+ ~MyClass() {
+ instance_count_--;
+ }
+
+ static int instance_count() {
+ return instance_count_;
+ }
+
+ static void ResetCount() {
+ instance_count_ = 0;
+ }
+
+ private:
+ static int instance_count_;
+};
+int MyClass::instance_count_ = 0;
+
+TEST(TestObjectPool, TestPooling) {
+ MyClass::ResetCount();
+ {
+ ObjectPool<MyClass> pool;
+ ASSERT_EQ(0, MyClass::instance_count());
+ MyClass *a = pool.Construct();
+ ASSERT_EQ(1, MyClass::instance_count());
+ MyClass *b = pool.Construct();
+ ASSERT_EQ(2, MyClass::instance_count());
+ ASSERT_TRUE(a != b);
+ pool.Destroy(b);
+ ASSERT_EQ(1, MyClass::instance_count());
+ MyClass *c = pool.Construct();
+ ASSERT_EQ(2, MyClass::instance_count());
+ ASSERT_TRUE(c == b) << "should reuse instance";
+ pool.Destroy(c);
+
+ ASSERT_EQ(1, MyClass::instance_count());
+ }
+
+ ASSERT_EQ(0, MyClass::instance_count())
+ << "destructing pool should have cleared instances";
+}
+
+TEST(TestObjectPool, TestScopedPtr) {
+ MyClass::ResetCount();
+ ASSERT_EQ(0, MyClass::instance_count());
+ ObjectPool<MyClass> pool;
+ {
+ ObjectPool<MyClass>::scoped_ptr sptr(
+ pool.make_scoped_ptr(pool.Construct()));
+ ASSERT_EQ(1, MyClass::instance_count());
+ }
+ ASSERT_EQ(0, MyClass::instance_count());
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/object_pool.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/object_pool.h b/be/src/kudu/util/object_pool.h
new file mode 100644
index 0000000..64d4b5c
--- /dev/null
+++ b/be/src/kudu/util/object_pool.h
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Simple pool/freelist for objects of the same type, typically used
+// in local context.
+#ifndef KUDU_UTIL_OBJECT_POOL_H
+#define KUDU_UTIL_OBJECT_POOL_H
+
+#include <glog/logging.h>
+#include <stdint.h>
+#include "kudu/gutil/manual_constructor.h"
+#include "kudu/gutil/gscoped_ptr.h"
+
+namespace kudu {
+
+template<class T>
+class ReturnToPool;
+
+// An object pool allocates and destroys a single class of objects
+// off of a free-list.
+//
+// Upon destruction of the pool, any objects allocated from this pool are
+// destroyed, regardless of whether they have been explicitly returned to the
+// pool.
+//
+// This class is similar to the boost::pool::object_pool, except that the boost
+// implementation seems to have O(n) deallocation performance and benchmarked
+// really poorly.
+//
+// This class is not thread-safe.
+template<typename T>
+class ObjectPool {
+ public:
+ typedef ReturnToPool<T> deleter_type;
+ typedef gscoped_ptr<T, deleter_type> scoped_ptr;
+
+ ObjectPool() :
+ free_list_head_(NULL),
+ alloc_list_head_(NULL),
+ deleter_(this) {
+ }
+
+ ~ObjectPool() {
+ // Delete all objects ever allocated from this pool
+ ListNode *node = alloc_list_head_;
+ while (node != NULL) {
+ ListNode *tmp = node;
+ node = node->next_on_alloc_list;
+ if (!tmp->is_on_freelist) {
+ // Have to run the actual destructor if the user forgot to free it.
+ tmp->Destroy();
+ }
+ delete tmp;
+ }
+ }
+
+ // Construct a new object instance from the pool.
+ T *Construct() {
+ base::ManualConstructor<T> *obj = GetObject();
+ obj->Init();
+ return obj->get();
+ }
+
+ template<class Arg1>
+ T *Construct(Arg1 arg1) {
+ base::ManualConstructor<T> *obj = GetObject();
+ obj->Init(arg1);
+ return obj->get();
+ }
+
+ // Destroy an object, running its destructor and returning it to the
+ // free-list.
+ void Destroy(T *t) {
+ CHECK_NOTNULL(t);
+ ListNode *node = static_cast<ListNode *>(
+ reinterpret_cast<base::ManualConstructor<T> *>(t));
+
+ node->Destroy();
+
+ DCHECK(!node->is_on_freelist);
+ node->is_on_freelist = true;
+ node->next_on_free_list = free_list_head_;
+ free_list_head_ = node;
+ }
+
+ // Create a scoped_ptr wrapper around the given pointer which came from this
+ // pool.
+ // When the scoped_ptr goes out of scope, the object will get released back
+ // to the pool.
+ scoped_ptr make_scoped_ptr(T *ptr) {
+ return scoped_ptr(ptr, deleter_);
+ }
+
+ private:
+ class ListNode : base::ManualConstructor<T> {
+ friend class ObjectPool<T>;
+
+ ListNode *next_on_free_list;
+ ListNode *next_on_alloc_list;
+
+ bool is_on_freelist;
+ };
+
+
+ base::ManualConstructor<T> *GetObject() {
+ if (free_list_head_ != NULL) {
+ ListNode *tmp = free_list_head_;
+ free_list_head_ = tmp->next_on_free_list;
+ tmp->next_on_free_list = NULL;
+ DCHECK(tmp->is_on_freelist);
+ tmp->is_on_freelist = false;
+
+ return static_cast<base::ManualConstructor<T> *>(tmp);
+ }
+ auto new_node = new ListNode();
+ new_node->next_on_free_list = NULL;
+ new_node->next_on_alloc_list = alloc_list_head_;
+ new_node->is_on_freelist = false;
+ alloc_list_head_ = new_node;
+ return new_node;
+ }
+
+ // Keeps track of free objects in this pool.
+ ListNode *free_list_head_;
+
+ // Keeps track of all objects ever allocated by this pool.
+ ListNode *alloc_list_head_;
+
+ deleter_type deleter_;
+};
+
+// Functor which returns the passed objects to a specific object pool.
+// This can be used in conjunction with scoped_ptr to automatically release
+// an object back to a pool when it goes out of scope.
+template<class T>
+class ReturnToPool {
+ public:
+ explicit ReturnToPool(ObjectPool<T> *pool) :
+ pool_(pool) {
+ }
+
+ inline void operator()(T *ptr) const {
+ pool_->Destroy(ptr);
+ }
+
+ private:
+ ObjectPool<T> *pool_;
+};
+
+
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/oid_generator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/oid_generator-test.cc b/be/src/kudu/util/oid_generator-test.cc
new file mode 100644
index 0000000..be88061
--- /dev/null
+++ b/be/src/kudu/util/oid_generator-test.cc
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/oid_generator.h"
+
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using std::string;
+
+namespace kudu {
+
+TEST(ObjectIdGeneratorTest, TestCanoicalizeUuid) {
+ ObjectIdGenerator gen;
+ const string kExpectedCanonicalized = "0123456789abcdef0123456789abcdef";
+ string canonicalized;
+ Status s = gen.Canonicalize("not_a_uuid", &canonicalized);
+ {
+ SCOPED_TRACE(s.ToString());
+ ASSERT_TRUE(s.IsInvalidArgument());
+ ASSERT_STR_CONTAINS(s.ToString(), "invalid uuid");
+ }
+ ASSERT_OK(gen.Canonicalize(
+ "01234567-89ab-cdef-0123-456789abcdef", &canonicalized));
+ ASSERT_EQ(kExpectedCanonicalized, canonicalized);
+ ASSERT_OK(gen.Canonicalize(
+ "0123456789abcdef0123456789abcdef", &canonicalized));
+ ASSERT_EQ(kExpectedCanonicalized, canonicalized);
+ ASSERT_OK(gen.Canonicalize(
+ "0123456789AbCdEf0123456789aBcDeF", &canonicalized));
+ ASSERT_EQ(kExpectedCanonicalized, canonicalized);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/oid_generator.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/oid_generator.cc b/be/src/kudu/util/oid_generator.cc
new file mode 100644
index 0000000..eee7316
--- /dev/null
+++ b/be/src/kudu/util/oid_generator.cc
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/oid_generator.h"
+
+#include <cstdint>
+#include <exception>
+#include <mutex>
+#include <string>
+
+#include <boost/uuid/uuid.hpp>
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/status.h"
+
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+
+namespace {
+
+string ConvertUuidToString(const boost::uuids::uuid& to_convert) {
+ const uint8_t* uuid = to_convert.data;
+ return StringPrintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
+ uuid[0], uuid[1], uuid[2], uuid[3], uuid[4], uuid[5], uuid[6], uuid[7],
+ uuid[8], uuid[9], uuid[10], uuid[11], uuid[12], uuid[13], uuid[14], uuid[15]);
+}
+
+} // anonymous namespace
+
+string ObjectIdGenerator::Next() {
+ std::lock_guard<LockType> l(oid_lock_);
+ boost::uuids::uuid uuid = oid_generator_();
+ return ConvertUuidToString(uuid);
+}
+
+Status ObjectIdGenerator::Canonicalize(const string& input,
+ string* output) const {
+ try {
+ boost::uuids::uuid uuid = oid_validator_(input);
+ *output = ConvertUuidToString(uuid);
+ return Status::OK();
+ } catch (std::exception& e) {
+ return Status::InvalidArgument(Substitute("invalid uuid $0: $1",
+ input, e.what()));
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/oid_generator.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/oid_generator.h b/be/src/kudu/util/oid_generator.h
new file mode 100644
index 0000000..c1cc88f
--- /dev/null
+++ b/be/src/kudu/util/oid_generator.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_UTIL_OID_GENERATOR_H
+#define KUDU_UTIL_OID_GENERATOR_H
+
+#include <string>
+
+#include <boost/uuid/random_generator.hpp>
+#include <boost/uuid/string_generator.hpp>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// Generates a unique 32byte id, based on uuid v4.
+// This class is thread safe
+class ObjectIdGenerator {
+ public:
+ ObjectIdGenerator() {}
+ ~ObjectIdGenerator() {}
+
+ // Generates and returns a new UUID.
+ std::string Next();
+
+ // Validates an existing UUID and converts it into the format used by Kudu
+ // (that is, 16 hexadecimal bytes without any dashes).
+ Status Canonicalize(const std::string& input, std::string* output) const;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(ObjectIdGenerator);
+
+ typedef simple_spinlock LockType;
+
+ // Protects 'oid_generator_'.
+ LockType oid_lock_;
+
+ // Generates new UUIDs.
+ boost::uuids::random_generator oid_generator_;
+
+ // Validates provided UUIDs.
+ boost::uuids::string_generator oid_validator_;
+};
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/once-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/once-test.cc b/be/src/kudu/util/once-test.cc
new file mode 100644
index 0000000..c0a79b1
--- /dev/null
+++ b/be/src/kudu/util/once-test.cc
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <ostream>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/once.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/thread.h"
+
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+namespace {
+
+struct Thing {
+ explicit Thing(bool should_fail)
+ : should_fail_(should_fail),
+ value_(0) {
+ }
+
+ Status Init() {
+ return once_.Init(&Thing::InitOnce, this);
+ }
+
+ Status InitOnce() {
+ if (should_fail_) {
+ return Status::IllegalState("Whoops!");
+ }
+ value_ = 1;
+ return Status::OK();
+ }
+
+ const bool should_fail_;
+ int value_;
+ KuduOnceDynamic once_;
+};
+
+} // anonymous namespace
+
+TEST(TestOnce, KuduOnceDynamicTest) {
+ {
+ Thing t(false);
+ ASSERT_EQ(0, t.value_);
+ ASSERT_FALSE(t.once_.init_succeeded());
+
+ for (int i = 0; i < 2; i++) {
+ ASSERT_OK(t.Init());
+ ASSERT_EQ(1, t.value_);
+ ASSERT_TRUE(t.once_.init_succeeded());
+ }
+ }
+
+ {
+ Thing t(true);
+ for (int i = 0; i < 2; i++) {
+ ASSERT_TRUE(t.Init().IsIllegalState());
+ ASSERT_EQ(0, t.value_);
+ ASSERT_FALSE(t.once_.init_succeeded());
+ }
+ }
+}
+
+static void InitOrGetInitted(Thing* t, int i) {
+ if (i % 2 == 0) {
+ LOG(INFO) << "Thread " << i << " initting";
+ t->Init();
+ } else {
+ LOG(INFO) << "Thread " << i << " value: " << t->once_.init_succeeded();
+ }
+}
+
+TEST(TestOnce, KuduOnceDynamicThreadSafeTest) {
+ Thing thing(false);
+
+ // The threads will read and write to thing.once_.initted. If access to
+ // it is not synchronized, TSAN will flag the access as data races.
+ vector<scoped_refptr<Thread> > threads;
+ for (int i = 0; i < 10; i++) {
+ scoped_refptr<Thread> t;
+ ASSERT_OK(Thread::Create("test", Substitute("thread $0", i),
+ &InitOrGetInitted, &thing, i, &t));
+ threads.push_back(t);
+ }
+
+ for (const scoped_refptr<Thread>& t : threads) {
+ t->Join();
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/once.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/once.cc b/be/src/kudu/util/once.cc
new file mode 100644
index 0000000..fada777
--- /dev/null
+++ b/be/src/kudu/util/once.cc
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/once.h"
+
+#include "kudu/util/malloc.h"
+
+namespace kudu {
+
+size_t KuduOnceDynamic::memory_footprint_excluding_this() const {
+ return status_.memory_footprint_excluding_this();
+}
+
+size_t KuduOnceDynamic::memory_footprint_including_this() const {
+ return kudu_malloc_usable_size(this) + memory_footprint_excluding_this();
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/once.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/once.h b/be/src/kudu/util/once.h
new file mode 100644
index 0000000..0f43064
--- /dev/null
+++ b/be/src/kudu/util/once.h
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_ONCE_H
+#define KUDU_UTIL_ONCE_H
+
+#include <stddef.h>
+
+#include "kudu/gutil/once.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class KuduOnceDynamic;
+
+namespace internal {
+
+// Cheap, single-arg "bound callback" (similar to kudu::Callback) for use
+// in KuduOnceDynamic.
+template<typename T>
+struct MemberFunc {
+ KuduOnceDynamic* once;
+ T* instance;
+ Status (T::*member_func)();
+};
+
+template<typename T>
+void InitCb(void* arg) {
+ MemberFunc<T>* mf = reinterpret_cast<MemberFunc<T>*>(arg);
+ mf->once->status_ = (mf->instance->*mf->member_func)();
+ if (PREDICT_TRUE(mf->once->status_.ok())) {
+ mf->once->set_init_succeeded();
+ }
+}
+
+} // namespace internal
+
+// More versatile version of GoogleOnceDynamic, including the following:
+// - Non-static member functions are registered and run via Init().
+// - The first time Init() is called, the registered function is run and the
+// resulting status is stored.
+// - Regardless of whether Init() succeeded, the function will cease to run on
+// subsequent calls to Init(), and the stored result will be returned instead.
+// - Access to initialization state is safe for concurrent use.
+class KuduOnceDynamic {
+ public:
+ KuduOnceDynamic()
+ : init_succeeded_(false) {
+ }
+
+ // If the underlying GoogleOnceDynamic has yet to be invoked, invokes the
+ // provided member function and stores its return value. Otherwise,
+ // returns the stored Status.
+ //
+ // T: the type of the member passed in.
+ template<typename T>
+ Status Init(Status (T::*member_func)(), T* instance) {
+ internal::MemberFunc<T> mf = { this, instance, member_func };
+
+ // Clang UBSAN doesn't like it when GoogleOnceDynamic handles the cast
+ // of the argument:
+ //
+ // runtime error: call to function
+ // kudu::cfile::BloomFileReader::InitOnceCb(kudu::cfile::BloomFileReader*)
+ // through pointer to incorrect function type 'void (*)(void *)'
+ //
+ // So let's do the cast ourselves, to void* here and back in InitCb().
+ once_.Init(&internal::InitCb<T>, reinterpret_cast<void*>(&mf));
+ return status_;
+ }
+
+ // kMemOrderAcquire ensures that loads/stores that come after init_succeeded()
+ // aren't reordered to come before it instead. kMemOrderRelease ensures
+ // the opposite (i.e. loads/stores before set_init_succeeded() aren't reordered
+ // to come after it).
+ //
+ // Taken together, threads can safely synchronize on init_succeeded_.
+ bool init_succeeded() const { return init_succeeded_.Load(kMemOrderAcquire); }
+
+ // Returns the memory usage of this object without the object itself. Should
+ // be used when embedded inside another object.
+ size_t memory_footprint_excluding_this() const;
+
+ // Returns the memory usage of this object including the object itself.
+ // Should be used when allocated on the heap.
+ size_t memory_footprint_including_this() const;
+
+ private:
+ template<typename T>
+ friend void internal::InitCb(void* arg);
+
+ void set_init_succeeded() { init_succeeded_.Store(true, kMemOrderRelease); }
+
+ AtomicBool init_succeeded_;
+ GoogleOnceDynamic once_;
+ Status status_;
+};
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/os-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/os-util-test.cc b/be/src/kudu/util/os-util-test.cc
new file mode 100644
index 0000000..a96e69d
--- /dev/null
+++ b/be/src/kudu/util/os-util-test.cc
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/os-util.h"
+
+#include <unistd.h>
+
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/test_macros.h"
+
+using std::string;
+
+namespace kudu {
+
+void RunTest(const string& name, int user_ticks, int kernel_ticks, int io_wait) {
+ string buf = strings::Substitute(string("0 ($0) S 0 0 0 0 0 0 0") +
+ " 0 0 0 $1 $2 0 0 0 0 0" +
+ " 0 0 0 0 0 0 0 0 0 0 " +
+ " 0 0 0 0 0 0 0 0 0 0 " +
+ " 0 $3 0 0 0 0 0 0 0 0 " +
+ " 0 0",
+ name, user_ticks, kernel_ticks, io_wait);
+ ThreadStats stats;
+ string extracted_name;
+ ASSERT_OK(ParseStat(buf, &extracted_name, &stats));
+ ASSERT_EQ(name, extracted_name);
+ ASSERT_EQ(user_ticks * (1e9 / sysconf(_SC_CLK_TCK)), stats.user_ns);
+ ASSERT_EQ(kernel_ticks * (1e9 / sysconf(_SC_CLK_TCK)), stats.kernel_ns);
+ ASSERT_EQ(io_wait * (1e9 / sysconf(_SC_CLK_TCK)), stats.iowait_ns);
+}
+
+TEST(OsUtilTest, TestSelf) {
+ RunTest("test", 111, 222, 333);
+}
+
+TEST(OsUtilTest, TestSelfNameWithSpace) {
+ RunTest("a space", 111, 222, 333);
+}
+
+TEST(OsUtilTest, TestSelfNameWithParens) {
+ RunTest("a(b(c((d))e)", 111, 222, 333);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/os-util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/os-util.cc b/be/src/kudu/util/os-util.cc
new file mode 100644
index 0000000..df7761f
--- /dev/null
+++ b/be/src/kudu/util/os-util.cc
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Imported from Impala. Changes include:
+// - Namespace and imports.
+// - Replaced GetStrErrMsg with ErrnoToString.
+// - Replaced StringParser with strings/numbers.
+// - Fixes for cpplint.
+// - Fixed parsing when thread names have spaces.
+
+#include "kudu/util/os-util.h"
+
+#include <fcntl.h>
+#include <sys/resource.h>
+#include <unistd.h>
+
+#include <cstddef>
+#include <fstream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/logging.h"
+
+using std::ifstream;
+using std::istreambuf_iterator;
+using std::ostringstream;
+using std::string;
+using std::vector;
+using strings::Split;
+using strings::Substitute;
+
+namespace kudu {
+
+// Ensure that Impala compiles on earlier kernels. If the target kernel does not support
+// _SC_CLK_TCK, sysconf(_SC_CLK_TCK) will return -1.
+#ifndef _SC_CLK_TCK
+#define _SC_CLK_TCK 2
+#endif
+
+static const int64_t kTicksPerSec = sysconf(_SC_CLK_TCK);
+
+// Offsets into the ../stat file array of per-thread statistics.
+//
+// They are themselves offset by two because the pid and comm fields of the
+// file are parsed separately.
+static const int64_t kUserTicks = 13 - 2;
+static const int64_t kKernelTicks = 14 - 2;
+static const int64_t kIoWait = 41 - 2;
+
+// Largest offset we are interested in, to check we get a well formed stat file.
+static const int64_t kMaxOffset = kIoWait;
+
+Status ParseStat(const std::string& buffer, std::string* name, ThreadStats* stats) {
+ DCHECK(stats != nullptr);
+
+ // The thread name should be the only field with parentheses. But the name
+ // itself may contain parentheses.
+ size_t open_paren = buffer.find('(');
+ size_t close_paren = buffer.rfind(')');
+ if (open_paren == string::npos || // '(' must exist
+ close_paren == string::npos || // ')' must exist
+ open_paren >= close_paren || // '(' must come before ')'
+ close_paren + 2 == buffer.size()) { // there must be at least two chars after ')'
+ return Status::IOError("Unrecognised /proc format");
+ }
+ string extracted_name = buffer.substr(open_paren + 1, close_paren - (open_paren + 1));
+ string rest = buffer.substr(close_paren + 2);
+ vector<string> splits = Split(rest, " ", strings::SkipEmpty());
+ if (splits.size() < kMaxOffset) {
+ return Status::IOError("Unrecognised /proc format");
+ }
+
+ int64_t tmp;
+ if (safe_strto64(splits[kUserTicks], &tmp)) {
+ stats->user_ns = tmp * (1e9 / kTicksPerSec);
+ }
+ if (safe_strto64(splits[kKernelTicks], &tmp)) {
+ stats->kernel_ns = tmp * (1e9 / kTicksPerSec);
+ }
+ if (safe_strto64(splits[kIoWait], &tmp)) {
+ stats->iowait_ns = tmp * (1e9 / kTicksPerSec);
+ }
+ if (name != nullptr) {
+ *name = extracted_name;
+ }
+ return Status::OK();
+
+}
+
+Status GetThreadStats(int64_t tid, ThreadStats* stats) {
+ DCHECK(stats != nullptr);
+ if (kTicksPerSec <= 0) {
+ return Status::NotSupported("ThreadStats not supported");
+ }
+
+ ostringstream proc_path;
+ proc_path << "/proc/self/task/" << tid << "/stat";
+ ifstream proc_file(proc_path.str().c_str());
+ if (!proc_file.is_open()) {
+ return Status::IOError("Could not open ifstream");
+ }
+
+ string buffer((istreambuf_iterator<char>(proc_file)),
+ istreambuf_iterator<char>());
+
+ return ParseStat(buffer, nullptr, stats); // don't want the name
+}
+
+void DisableCoreDumps() {
+ struct rlimit lim;
+ PCHECK(getrlimit(RLIMIT_CORE, &lim) == 0);
+ lim.rlim_cur = 0;
+ PCHECK(setrlimit(RLIMIT_CORE, &lim) == 0);
+
+ // Set coredump_filter to not dump any parts of the address space.
+ // Although the above disables core dumps to files, if core_pattern
+ // is set to a pipe rather than a file, it's not sufficient. Setting
+ // this pattern results in piping a very minimal dump into the core
+ // processor (eg abrtd), thus speeding up the crash.
+ int f;
+ RETRY_ON_EINTR(f, open("/proc/self/coredump_filter", O_WRONLY));
+ if (f >= 0) {
+ ssize_t ret;
+ RETRY_ON_EINTR(ret, write(f, "00000000", 8));
+ int close_ret;
+ RETRY_ON_EINTR(close_ret, close(f));
+ }
+}
+
+bool IsBeingDebugged() {
+#ifndef __linux__
+ return false;
+#else
+ // Look for the TracerPid line in /proc/self/status.
+ // If this is non-zero, we are being ptraced, which is indicative of gdb or strace
+ // being attached.
+ faststring buf;
+ Status s = ReadFileToString(Env::Default(), "/proc/self/status", &buf);
+ if (!s.ok()) {
+ KLOG_FIRST_N(WARNING, 1) << "could not read /proc/self/status: " << s.ToString();
+ return false;
+ }
+ StringPiece buf_sp(reinterpret_cast<const char*>(buf.data()), buf.size());
+ vector<StringPiece> lines = Split(buf_sp, "\n");
+ for (const auto& l : lines) {
+ if (!HasPrefixString(l, "TracerPid:")) continue;
+ std::pair<StringPiece, StringPiece> key_val = Split(l, "\t");
+ int64_t tracer_pid = -1;
+ if (!safe_strto64(key_val.second.data(), key_val.second.size(), &tracer_pid)) {
+ KLOG_FIRST_N(WARNING, 1) << "Invalid line in /proc/self/status: " << l;
+ return false;
+ }
+ return tracer_pid != 0;
+ }
+ KLOG_FIRST_N(WARNING, 1) << "Could not find TracerPid line in /proc/self/status";
+ return false;
+#endif // __linux__
+}
+
+} // namespace kudu