You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:23 UTC

[11/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/net/sockaddr.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/sockaddr.cc b/be/src/kudu/util/net/sockaddr.cc
new file mode 100644
index 0000000..60905b2
--- /dev/null
+++ b/be/src/kudu/util/net/sockaddr.cc
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/net/sockaddr.h"
+
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <sys/socket.h>
+
+#include <cerrno>
+#include <cstring>
+#include <string>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/hash/builtin_type_hash.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/stopwatch.h"
+
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+
+///
+/// Sockaddr
+///
+Sockaddr::Sockaddr() {
+  memset(&addr_, 0, sizeof(addr_));
+  addr_.sin_family = AF_INET;
+  addr_.sin_addr.s_addr = INADDR_ANY;
+}
+
+Sockaddr::Sockaddr(const struct sockaddr_in& addr) {
+  memcpy(&addr_, &addr, sizeof(struct sockaddr_in));
+}
+
+Status Sockaddr::ParseString(const std::string& s, uint16_t default_port) {
+  HostPort hp;
+  RETURN_NOT_OK(hp.ParseString(s, default_port));
+
+  if (inet_pton(AF_INET, hp.host().c_str(), &addr_.sin_addr) != 1) {
+    return Status::InvalidArgument("Invalid IP address", hp.host());
+  }
+  set_port(hp.port());
+  return Status::OK();
+}
+
+Sockaddr& Sockaddr::operator=(const struct sockaddr_in &addr) {
+  memcpy(&addr_, &addr, sizeof(struct sockaddr_in));
+  return *this;
+}
+
+bool Sockaddr::operator==(const Sockaddr& other) const {
+  return memcmp(&other.addr_, &addr_, sizeof(addr_)) == 0;
+}
+
+bool Sockaddr::operator<(const Sockaddr &rhs) const {
+  return addr_.sin_addr.s_addr < rhs.addr_.sin_addr.s_addr;
+}
+
+uint32_t Sockaddr::HashCode() const {
+  uint32_t hash = Hash32NumWithSeed(addr_.sin_addr.s_addr, 0);
+  hash = Hash32NumWithSeed(addr_.sin_port, hash);
+  return hash;
+}
+
+void Sockaddr::set_port(int port) {
+  addr_.sin_port = htons(port);
+}
+
+int Sockaddr::port() const {
+  return ntohs(addr_.sin_port);
+}
+
+std::string Sockaddr::host() const {
+  char str[INET_ADDRSTRLEN];
+  ::inet_ntop(AF_INET, &addr_.sin_addr, str, INET_ADDRSTRLEN);
+  return str;
+}
+
+const struct sockaddr_in& Sockaddr::addr() const {
+  return addr_;
+}
+
+std::string Sockaddr::ToString() const {
+  return Substitute("$0:$1", host(), port());
+}
+
+bool Sockaddr::IsWildcard() const {
+  return addr_.sin_addr.s_addr == 0;
+}
+
+bool Sockaddr::IsAnyLocalAddress() const {
+  return (NetworkByteOrder::FromHost32(addr_.sin_addr.s_addr) >> 24) == 127;
+}
+
+Status Sockaddr::LookupHostname(string* hostname) const {
+  char host[NI_MAXHOST];
+  int flags = 0;
+
+  int rc;
+  LOG_SLOW_EXECUTION(WARNING, 200,
+                     Substitute("DNS reverse-lookup for $0", ToString())) {
+    rc = getnameinfo((struct sockaddr *) &addr_, sizeof(sockaddr_in),
+                     host, NI_MAXHOST,
+                     nullptr, 0, flags);
+  }
+  if (PREDICT_FALSE(rc != 0)) {
+    if (rc == EAI_SYSTEM) {
+      int errno_saved = errno;
+      return Status::NetworkError(Substitute("getnameinfo: $0", gai_strerror(rc)),
+                                  strerror(errno_saved), errno_saved);
+    }
+    return Status::NetworkError("getnameinfo", gai_strerror(rc), rc);
+  }
+  *hostname = host;
+  return Status::OK();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/net/sockaddr.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/sockaddr.h b/be/src/kudu/util/net/sockaddr.h
new file mode 100644
index 0000000..dffd151
--- /dev/null
+++ b/be/src/kudu/util/net/sockaddr.h
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_NET_SOCKADDR_H
+#define KUDU_UTIL_NET_SOCKADDR_H
+
+#include <netinet/in.h>
+
+#include <cstdint>
+#include <functional>
+#include <string>
+
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+///
+/// Represents a sockaddr.
+///
+/// Currently only IPv4 is implemented.  When IPv6 and UNIX domain are
+/// implemented, this should become an abstract base class and those should be
+/// multiple implementations.
+///
+class Sockaddr {
+ public:
+  Sockaddr();
+  explicit Sockaddr(const struct sockaddr_in &addr);
+
+  // Parse a string IP address of the form "A.B.C.D:port", storing the result
+  // in this Sockaddr object. If no ':port' is specified, uses 'default_port'.
+  // Note that this function will not handle resolving hostnames.
+  //
+  // Returns a bad Status if the input is malformed.
+  Status ParseString(const std::string& s, uint16_t default_port);
+
+  Sockaddr& operator=(const struct sockaddr_in &addr);
+
+  bool operator==(const Sockaddr& other) const;
+
+  // Compare the endpoints of two sockaddrs.
+  // The port number is ignored in this comparison.
+  bool operator<(const Sockaddr &rhs) const;
+
+  uint32_t HashCode() const;
+
+  // Returns the dotted-decimal string '1.2.3.4' of the host component of this address.
+  std::string host() const;
+
+  void set_port(int port);
+  int port() const;
+  const struct sockaddr_in& addr() const;
+
+  // Returns the stringified address in '1.2.3.4:<port>' format.
+  std::string ToString() const;
+
+  // Returns true if the address is 0.0.0.0
+  bool IsWildcard() const;
+
+  // Returns true if the address is 127.*.*.*
+  bool IsAnyLocalAddress() const;
+
+  // Does reverse DNS lookup of the address and stores it in hostname.
+  Status LookupHostname(std::string* hostname) const;
+
+  // the default auto-generated copy constructor is fine here
+ private:
+  struct sockaddr_in addr_;
+};
+
+} // namespace kudu
+
+// Specialize std::hash for Sockaddr
+namespace std {
+template<>
+struct hash<kudu::Sockaddr> {
+  int operator()(const kudu::Sockaddr& addr) const {
+    return addr.HashCode();
+  }
+};
+} // namespace std
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/net/socket-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/socket-test.cc b/be/src/kudu/util/net/socket-test.cc
new file mode 100644
index 0000000..8ecea4e
--- /dev/null
+++ b/be/src/kudu/util/net/socket-test.cc
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/net/socket.h"
+
+#include <thread>
+
+#include <cstdint>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <memory>
+#include <stddef.h>
+#include <string>
+
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+
+namespace kudu {
+
+constexpr size_t kEchoChunkSize = 32 * 1024 * 1024;
+
+class SocketTest : public KuduTest {
+ protected:
+  void DoTest(bool accept, const std::string &message) {
+    Sockaddr address;
+    address.ParseString("0.0.0.0", 0);
+    Socket listener_;
+
+    CHECK_OK(listener_.Init(0));
+    CHECK_OK(listener_.BindAndListen(address, 0));
+    Sockaddr listen_address;
+    CHECK_OK(listener_.GetSocketAddress(&listen_address));
+
+    std::thread t([&]{
+      if (accept) {
+        Sockaddr new_addr;
+        Socket sock;
+        CHECK_OK(listener_.Accept(&sock, &new_addr, 0));
+        CHECK_OK(sock.Close());
+      } else {
+        SleepFor(MonoDelta::FromMilliseconds(200));
+        CHECK_OK(listener_.Close());
+      }
+    });
+
+    Socket client;
+    ASSERT_OK(client.Init(0));
+    ASSERT_OK(client.Connect(listen_address));
+    CHECK_OK(client.SetRecvTimeout(MonoDelta::FromMilliseconds(100)));
+
+    int n;
+    std::unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
+    Status s = client.Recv(buf.get(), kEchoChunkSize, &n);
+
+    ASSERT_TRUE(!s.ok());
+    ASSERT_TRUE(s.IsNetworkError());
+    ASSERT_STR_MATCHES(s.message().ToString(), message);
+
+    t.join();
+  }
+};
+
+TEST_F(SocketTest, TestRecvReset) {
+  DoTest(false, "recv error from 127.0.0.1:[0-9]+: Resource temporarily unavailable");
+}
+
+TEST_F(SocketTest, TestRecvEOF) {
+  DoTest(true, "recv got EOF from 127.0.0.1:[0-9]+");
+}
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/net/socket.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/socket.cc b/be/src/kudu/util/net/socket.cc
new file mode 100644
index 0000000..cc14702
--- /dev/null
+++ b/be/src/kudu/util/net/socket.cc
@@ -0,0 +1,590 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/net/socket.h"
+
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <unistd.h>
+
+#include <cerrno>
+#include <cinttypes>
+#include <cstring>
+#include <limits>
+#include <ostream>
+#include <string>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/slice.h"
+
+DEFINE_string(local_ip_for_outbound_sockets, "",
+              "IP to bind to when making outgoing socket connections. "
+              "This must be an IP address of the form A.B.C.D, not a hostname. "
+              "Advanced parameter, subject to change.");
+TAG_FLAG(local_ip_for_outbound_sockets, experimental);
+
+DEFINE_bool(socket_inject_short_recvs, false,
+            "Inject short recv() responses which return less data than "
+            "requested");
+TAG_FLAG(socket_inject_short_recvs, hidden);
+TAG_FLAG(socket_inject_short_recvs, unsafe);
+
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+
+Socket::Socket()
+  : fd_(-1) {
+}
+
+Socket::Socket(int fd)
+  : fd_(fd) {
+}
+
+void Socket::Reset(int fd) {
+  ignore_result(Close());
+  fd_ = fd;
+}
+
+int Socket::Release() {
+  int fd = fd_;
+  fd_ = -1;
+  return fd;
+}
+
+Socket::~Socket() {
+  ignore_result(Close());
+}
+
+Status Socket::Close() {
+  if (fd_ < 0) {
+    return Status::OK();
+  }
+  int fd = fd_;
+  int ret;
+  RETRY_ON_EINTR(ret, ::close(fd));
+  if (ret < 0) {
+    int err = errno;
+    return Status::NetworkError("close error", ErrnoToString(err), err);
+  }
+  fd_ = -1;
+  return Status::OK();
+}
+
+Status Socket::Shutdown(bool shut_read, bool shut_write) {
+  DCHECK_GE(fd_, 0);
+  int flags = 0;
+  if (shut_read && shut_write) {
+    flags |= SHUT_RDWR;
+  } else if (shut_read) {
+    flags |= SHUT_RD;
+  } else if (shut_write) {
+    flags |= SHUT_WR;
+  }
+  if (::shutdown(fd_, flags) < 0) {
+    int err = errno;
+    return Status::NetworkError("shutdown error", ErrnoToString(err), err);
+  }
+  return Status::OK();
+}
+
+int Socket::GetFd() const {
+  return fd_;
+}
+
+bool Socket::IsTemporarySocketError(int err) {
+  return ((err == EAGAIN) || (err == EWOULDBLOCK) || (err == EINTR));
+}
+
+#if defined(__linux__)
+
+Status Socket::Init(int flags) {
+  int nonblocking_flag = (flags & FLAG_NONBLOCKING) ? SOCK_NONBLOCK : 0;
+  Reset(::socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC | nonblocking_flag, 0));
+  if (fd_ < 0) {
+    int err = errno;
+    return Status::NetworkError("error opening socket", ErrnoToString(err), err);
+  }
+
+  return Status::OK();
+}
+
+#else
+
+Status Socket::Init(int flags) {
+  Reset(::socket(AF_INET, SOCK_STREAM, 0));
+  if (fd_ < 0) {
+    int err = errno;
+    return Status::NetworkError("error opening socket", ErrnoToString(err), err);
+  }
+  RETURN_NOT_OK(SetNonBlocking(flags & FLAG_NONBLOCKING));
+  RETURN_NOT_OK(SetCloseOnExec());
+
+  // Disable SIGPIPE.
+  int set = 1;
+  RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, SO_NOSIGPIPE, set),
+                        "failed to set SO_NOSIGPIPE");
+  return Status::OK();
+}
+
+#endif // defined(__linux__)
+
+Status Socket::SetNoDelay(bool enabled) {
+  int flag = enabled ? 1 : 0;
+  RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_NODELAY, flag),
+                        "failed to set TCP_NODELAY");
+  return Status::OK();
+}
+
+Status Socket::SetTcpCork(bool enabled) {
+#if defined(__linux__)
+  int flag = enabled ? 1 : 0;
+  RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_CORK, flag),
+                        "failed to set TCP_CORK");
+#endif // defined(__linux__)
+  // TODO(unknown): Use TCP_NOPUSH for OSX if perf becomes an issue.
+  return Status::OK();
+}
+
+Status Socket::SetNonBlocking(bool enabled) {
+  int curflags = ::fcntl(fd_, F_GETFL, 0);
+  if (curflags == -1) {
+    int err = errno;
+    return Status::NetworkError(
+        StringPrintf("Failed to get file status flags on fd %d", fd_),
+        ErrnoToString(err), err);
+  }
+  int newflags = (enabled) ? (curflags | O_NONBLOCK) : (curflags & ~O_NONBLOCK);
+  if (::fcntl(fd_, F_SETFL, newflags) == -1) {
+    int err = errno;
+    if (enabled) {
+      return Status::NetworkError(
+          StringPrintf("Failed to set O_NONBLOCK on fd %d", fd_),
+          ErrnoToString(err), err);
+    } else {
+      return Status::NetworkError(
+          StringPrintf("Failed to clear O_NONBLOCK on fd %d", fd_),
+          ErrnoToString(err), err);
+    }
+  }
+  return Status::OK();
+}
+
+Status Socket::IsNonBlocking(bool* is_nonblock) const {
+  int curflags = ::fcntl(fd_, F_GETFL, 0);
+  if (curflags == -1) {
+    int err = errno;
+    return Status::NetworkError(
+        StringPrintf("Failed to get file status flags on fd %d", fd_),
+        ErrnoToString(err), err);
+  }
+  *is_nonblock = ((curflags & O_NONBLOCK) != 0);
+  return Status::OK();
+}
+
+Status Socket::SetCloseOnExec() {
+  int curflags = fcntl(fd_, F_GETFD, 0);
+  if (curflags == -1) {
+    int err = errno;
+    Reset(-1);
+    return Status::NetworkError("fcntl(F_GETFD) error", ErrnoToString(err), err);
+  }
+  if (fcntl(fd_, F_SETFD, curflags | FD_CLOEXEC) == -1) {
+    int err = errno;
+    Reset(-1);
+    return Status::NetworkError("fcntl(F_SETFD) error", ErrnoToString(err), err);
+  }
+  return Status::OK();
+}
+
+Status Socket::SetSendTimeout(const MonoDelta& timeout) {
+  return SetTimeout(SO_SNDTIMEO, "SO_SNDTIMEO", timeout);
+}
+
+Status Socket::SetRecvTimeout(const MonoDelta& timeout) {
+  return SetTimeout(SO_RCVTIMEO, "SO_RCVTIMEO", timeout);
+}
+
+Status Socket::SetReuseAddr(bool flag) {
+  int int_flag = flag ? 1 : 0;
+  RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, SO_REUSEADDR, int_flag),
+                        "failed to set SO_REUSEADDR");
+  return Status::OK();
+}
+
+Status Socket::SetReusePort(bool flag) {
+  int int_flag = flag ? 1 : 0;
+  RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, SO_REUSEPORT, int_flag),
+                        "failed to set SO_REUSEPORT");
+  return Status::OK();
+}
+
+Status Socket::BindAndListen(const Sockaddr &sockaddr,
+                             int listen_queue_size) {
+  RETURN_NOT_OK(SetReuseAddr(true));
+  RETURN_NOT_OK(Bind(sockaddr));
+  RETURN_NOT_OK(Listen(listen_queue_size));
+  return Status::OK();
+}
+
+Status Socket::Listen(int listen_queue_size) {
+  if (listen(fd_, listen_queue_size)) {
+    int err = errno;
+    return Status::NetworkError("listen() error", ErrnoToString(err));
+  }
+  return Status::OK();
+}
+
+Status Socket::GetSocketAddress(Sockaddr *cur_addr) const {
+  struct sockaddr_in sin;
+  socklen_t len = sizeof(sin);
+  DCHECK_GE(fd_, 0);
+  if (::getsockname(fd_, reinterpret_cast<struct sockaddr*>(&sin), &len) == -1) {
+    int err = errno;
+    return Status::NetworkError("getsockname error", ErrnoToString(err), err);
+  }
+  *cur_addr = sin;
+  return Status::OK();
+}
+
+Status Socket::GetPeerAddress(Sockaddr *cur_addr) const {
+  struct sockaddr_in sin;
+  socklen_t len = sizeof(sin);
+  DCHECK_GE(fd_, 0);
+  if (::getpeername(fd_, reinterpret_cast<struct sockaddr*>(&sin), &len) == -1) {
+    int err = errno;
+    return Status::NetworkError("getpeername error", ErrnoToString(err), err);
+  }
+  *cur_addr = sin;
+  return Status::OK();
+}
+
+bool Socket::IsLoopbackConnection() const {
+  Sockaddr local, remote;
+  if (!GetSocketAddress(&local).ok()) return false;
+  if (!GetPeerAddress(&remote).ok()) return false;
+
+  // Compare without comparing ports.
+  local.set_port(0);
+  remote.set_port(0);
+  return local == remote;
+}
+
+Status Socket::Bind(const Sockaddr& bind_addr) {
+  struct sockaddr_in addr = bind_addr.addr();
+
+  DCHECK_GE(fd_, 0);
+  if (PREDICT_FALSE(::bind(fd_, (struct sockaddr*) &addr, sizeof(addr)))) {
+    int err = errno;
+    Status s = Status::NetworkError(
+        strings::Substitute("error binding socket to $0: $1",
+                            bind_addr.ToString(), ErrnoToString(err)),
+        Slice(), err);
+
+    if (s.IsNetworkError() && s.posix_code() == EADDRINUSE && bind_addr.port() != 0) {
+      TryRunLsof(bind_addr);
+    }
+    return s;
+  }
+
+  return Status::OK();
+}
+
+Status Socket::Accept(Socket *new_conn, Sockaddr *remote, int flags) {
+  TRACE_EVENT0("net", "Socket::Accept");
+  struct sockaddr_in addr;
+  socklen_t olen = sizeof(addr);
+  DCHECK_GE(fd_, 0);
+#if defined(__linux__)
+  int accept_flags = SOCK_CLOEXEC;
+  if (flags & FLAG_NONBLOCKING) {
+    accept_flags |= SOCK_NONBLOCK;
+  }
+  int fd = -1;
+  RETRY_ON_EINTR(fd, accept4(fd_, (struct sockaddr*)&addr,
+                             &olen, accept_flags));
+  if (fd < 0) {
+    int err = errno;
+    return Status::NetworkError("accept4(2) error", ErrnoToString(err), err);
+  }
+  new_conn->Reset(fd);
+
+#else
+  int fd = -1;
+  RETRY_ON_EINTR(fd, accept(fd_, (struct sockaddr*)&addr, &olen));
+  if (fd < 0) {
+    int err = errno;
+    return Status::NetworkError("accept(2) error", ErrnoToString(err), err);
+  }
+  new_conn->Reset(fd);
+  RETURN_NOT_OK(new_conn->SetNonBlocking(flags & FLAG_NONBLOCKING));
+  RETURN_NOT_OK(new_conn->SetCloseOnExec());
+#endif // defined(__linux__)
+
+  *remote = addr;
+  TRACE_EVENT_INSTANT1("net", "Accepted", TRACE_EVENT_SCOPE_THREAD,
+                       "remote", remote->ToString());
+  return Status::OK();
+}
+
+Status Socket::BindForOutgoingConnection() {
+  Sockaddr bind_host;
+  Status s = bind_host.ParseString(FLAGS_local_ip_for_outbound_sockets, 0);
+  CHECK(s.ok() && bind_host.port() == 0)
+    << "Invalid local IP set for 'local_ip_for_outbound_sockets': '"
+    << FLAGS_local_ip_for_outbound_sockets << "': " << s.ToString();
+
+  RETURN_NOT_OK(Bind(bind_host));
+  return Status::OK();
+}
+
+Status Socket::Connect(const Sockaddr &remote) {
+  TRACE_EVENT1("net", "Socket::Connect",
+               "remote", remote.ToString());
+  if (PREDICT_FALSE(!FLAGS_local_ip_for_outbound_sockets.empty())) {
+    RETURN_NOT_OK(BindForOutgoingConnection());
+  }
+
+  struct sockaddr_in addr;
+  memcpy(&addr, &remote.addr(), sizeof(sockaddr_in));
+  DCHECK_GE(fd_, 0);
+  int ret;
+  RETRY_ON_EINTR(ret, ::connect(
+      fd_, reinterpret_cast<const struct sockaddr*>(&addr), sizeof(addr)));
+  if (ret < 0) {
+    int err = errno;
+    return Status::NetworkError("connect(2) error", ErrnoToString(err), err);
+  }
+  return Status::OK();
+}
+
+Status Socket::GetSockError() const {
+  int val = 0, ret;
+  socklen_t val_len = sizeof(val);
+  DCHECK_GE(fd_, 0);
+  ret = ::getsockopt(fd_, SOL_SOCKET, SO_ERROR, &val, &val_len);
+  if (ret) {
+    int err = errno;
+    return Status::NetworkError("getsockopt(SO_ERROR) failed", ErrnoToString(err), err);
+  }
+  if (val != 0) {
+    return Status::NetworkError(ErrnoToString(val), Slice(), val);
+  }
+  return Status::OK();
+}
+
+Status Socket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
+  if (amt <= 0) {
+    return Status::NetworkError(
+              StringPrintf("invalid send of %" PRId32 " bytes",
+                           amt), Slice(), EINVAL);
+  }
+  DCHECK_GE(fd_, 0);
+  int res;
+  RETRY_ON_EINTR(res, ::send(fd_, buf, amt, MSG_NOSIGNAL));
+  if (res < 0) {
+    int err = errno;
+    return Status::NetworkError("write error", ErrnoToString(err), err);
+  }
+  *nwritten = res;
+  return Status::OK();
+}
+
+Status Socket::Writev(const struct ::iovec *iov, int iov_len,
+                      int64_t *nwritten) {
+  if (PREDICT_FALSE(iov_len <= 0)) {
+    return Status::NetworkError(
+                StringPrintf("writev: invalid io vector length of %d",
+                             iov_len),
+                Slice(), EINVAL);
+  }
+  DCHECK_GE(fd_, 0);
+
+  struct msghdr msg;
+  memset(&msg, 0, sizeof(struct msghdr));
+  msg.msg_iov = const_cast<iovec *>(iov);
+  msg.msg_iovlen = iov_len;
+  ssize_t res;
+  RETRY_ON_EINTR(res, ::sendmsg(fd_, &msg, MSG_NOSIGNAL));
+  if (PREDICT_FALSE(res < 0)) {
+    int err = errno;
+    return Status::NetworkError("sendmsg error", ErrnoToString(err), err);
+  }
+
+  *nwritten = res;
+  return Status::OK();
+}
+
+// Mostly follows writen() from Stevens (2004) or Kerrisk (2010).
+Status Socket::BlockingWrite(const uint8_t *buf, size_t buflen, size_t *nwritten,
+    const MonoTime& deadline) {
+  DCHECK_LE(buflen, std::numeric_limits<int32_t>::max()) << "Writes > INT32_MAX not supported";
+  DCHECK(nwritten);
+
+  size_t tot_written = 0;
+  while (tot_written < buflen) {
+    int32_t inc_num_written = 0;
+    int32_t num_to_write = buflen - tot_written;
+    MonoDelta timeout = deadline - MonoTime::Now();
+    if (PREDICT_FALSE(timeout.ToNanoseconds() <= 0)) {
+      return Status::TimedOut("BlockingWrite timed out");
+    }
+    RETURN_NOT_OK(SetSendTimeout(timeout));
+    Status s = Write(buf, num_to_write, &inc_num_written);
+    tot_written += inc_num_written;
+    buf += inc_num_written;
+    *nwritten = tot_written;
+
+    if (PREDICT_FALSE(!s.ok())) {
+      // Continue silently when the syscall is interrupted.
+      if (s.posix_code() == EINTR) {
+        continue;
+      }
+      if (s.posix_code() == EAGAIN) {
+        return Status::TimedOut("");
+      }
+      return s.CloneAndPrepend("BlockingWrite error");
+    }
+    if (PREDICT_FALSE(inc_num_written == 0)) {
+      // Shouldn't happen on Linux with a blocking socket. Maybe other Unices.
+      break;
+    }
+  }
+
+  if (tot_written < buflen) {
+    return Status::IOError("Wrote zero bytes on a BlockingWrite() call",
+        StringPrintf("Transferred %zu of %zu bytes", tot_written, buflen));
+  }
+  return Status::OK();
+}
+
+Status Socket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
+  if (amt <= 0) {
+    return Status::NetworkError(
+          StringPrintf("invalid recv of %d bytes", amt), Slice(), EINVAL);
+  }
+
+  // The recv() call can return fewer than the requested number of bytes.
+  // Especially when 'amt' is small, this is very unlikely to happen in
+  // the context of unit tests. So, we provide an injection hook which
+  // simulates the same behavior.
+  if (PREDICT_FALSE(FLAGS_socket_inject_short_recvs && amt > 1)) {
+    Random r(GetRandomSeed32());
+    amt = 1 + r.Uniform(amt - 1);
+  }
+
+  DCHECK_GE(fd_, 0);
+  int res;
+  RETRY_ON_EINTR(res, recv(fd_, buf, amt, 0));
+  if (res <= 0) {
+    Sockaddr remote;
+    GetPeerAddress(&remote);
+    if (res == 0) {
+      string error_message = Substitute("recv got EOF from $0", remote.ToString());
+      return Status::NetworkError(error_message, Slice(), ESHUTDOWN);
+    }
+    int err = errno;
+    string error_message = Substitute("recv error from $0", remote.ToString());
+    return Status::NetworkError(error_message, ErrnoToString(err), err);
+  }
+  *nread = res;
+  return Status::OK();
+}
+
+// Mostly follows readn() from Stevens (2004) or Kerrisk (2010).
+// One place where we deviate: we consider EOF a failure if < amt bytes are read.
+Status Socket::BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const MonoTime& deadline) {
+  DCHECK_LE(amt, std::numeric_limits<int32_t>::max()) << "Reads > INT32_MAX not supported";
+  DCHECK(nread);
+  size_t tot_read = 0;
+  while (tot_read < amt) {
+    int32_t inc_num_read = 0;
+    int32_t num_to_read = amt - tot_read;
+    MonoDelta timeout = deadline - MonoTime::Now();
+    if (PREDICT_FALSE(timeout.ToNanoseconds() <= 0)) {
+      return Status::TimedOut("");
+    }
+    RETURN_NOT_OK(SetRecvTimeout(timeout));
+    Status s = Recv(buf, num_to_read, &inc_num_read);
+    tot_read += inc_num_read;
+    buf += inc_num_read;
+    *nread = tot_read;
+
+    if (PREDICT_FALSE(!s.ok())) {
+      // Continue silently when the syscall is interrupted.
+      if (s.posix_code() == EINTR) {
+        continue;
+      }
+      if (s.posix_code() == EAGAIN) {
+        return Status::TimedOut("");
+      }
+      return s.CloneAndPrepend("BlockingRecv error");
+    }
+    if (PREDICT_FALSE(inc_num_read == 0)) {
+      // EOF.
+      break;
+    }
+  }
+
+  if (PREDICT_FALSE(tot_read < amt)) {
+    return Status::IOError("Read zero bytes on a blocking Recv() call",
+        StringPrintf("Transferred %zu of %zu bytes", tot_read, amt));
+  }
+  return Status::OK();
+}
+
+Status Socket::SetTimeout(int opt, const char* optname, const MonoDelta& timeout) {
+  if (PREDICT_FALSE(timeout.ToNanoseconds() < 0)) {
+    return Status::InvalidArgument("Timeout specified as negative to SetTimeout",
+                                   timeout.ToString());
+  }
+  struct timeval tv;
+  timeout.ToTimeVal(&tv);
+  RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, opt, tv),
+                        Substitute("failed to set socket option $0 to $1",
+                                   optname, timeout.ToString()));
+  return Status::OK();
+}
+
+template<typename T>
+Status Socket::SetSockOpt(int level, int option, const T& value) {
+  if (::setsockopt(fd_, level, option, &value, sizeof(T)) == -1) {
+    int err = errno;
+    return Status::NetworkError(ErrnoToString(err), Slice(), err);
+  }
+  return Status::OK();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/net/socket.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/socket.h b/be/src/kudu/util/net/socket.h
new file mode 100644
index 0000000..992c44a
--- /dev/null
+++ b/be/src/kudu/util/net/socket.h
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_NET_SOCKET_H
+#define KUDU_UTIL_NET_SOCKET_H
+
+#include <cstddef>
+#include <cstdint>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+struct iovec;
+
+namespace kudu {
+
+class MonoDelta;
+class MonoTime;
+class Sockaddr;
+
+class Socket {
+ public:
+  static const int FLAG_NONBLOCKING = 0x1;
+
+  // Create a new invalid Socket object.
+  Socket();
+
+  // Start managing a socket.
+  explicit Socket(int fd);
+
+  // Close the socket.  Errors will be ignored.
+  virtual ~Socket();
+
+  // Close the Socket, checking for errors.
+  virtual Status Close();
+
+  // call shutdown() on the socket
+  Status Shutdown(bool shut_read, bool shut_write);
+
+  // Start managing a socket.
+  void Reset(int fd);
+
+  // Stop managing the socket and return it.
+  int Release();
+
+  // Get the raw file descriptor, or -1 if there is no file descriptor being
+  // managed.
+  int GetFd() const;
+
+  // Returns true if the error is temporary and will go away if we retry on
+  // the socket.
+  static bool IsTemporarySocketError(int err);
+
+  Status Init(int flags); // See FLAG_NONBLOCKING
+
+  // Set or clear TCP_NODELAY
+  Status SetNoDelay(bool enabled);
+
+  // Set or clear TCP_CORK
+  Status SetTcpCork(bool enabled);
+
+  // Set or clear O_NONBLOCK
+  Status SetNonBlocking(bool enabled);
+  Status IsNonBlocking(bool* is_nonblock) const;
+
+  // Set SO_SENDTIMEO to the specified value. Should only be used for blocking sockets.
+  Status SetSendTimeout(const MonoDelta& timeout);
+
+  // Set SO_RCVTIMEO to the specified value. Should only be used for blocking sockets.
+  Status SetRecvTimeout(const MonoDelta& timeout);
+
+  // Sets SO_REUSEADDR to 'flag'. Should be used prior to Bind().
+  Status SetReuseAddr(bool flag);
+
+  // Sets SO_REUSEPORT to 'flag'. Should be used prior to Bind().
+  Status SetReusePort(bool flag);
+
+  // Convenience method to invoke the common sequence:
+  // 1) SetReuseAddr(true)
+  // 2) Bind()
+  // 3) Listen()
+  Status BindAndListen(const Sockaddr &sockaddr, int listen_queue_size);
+
+  // Start listening for new connections, with the given backlog size.
+  // Requires that the socket has already been bound using Bind().
+  Status Listen(int listen_queue_size);
+
+  // Call getsockname to get the address of this socket.
+  Status GetSocketAddress(Sockaddr *cur_addr) const;
+
+  // Call getpeername to get the address of the connected peer.
+  // It is virtual so that tests can override.
+  virtual Status GetPeerAddress(Sockaddr *cur_addr) const;
+
+  // Return true if this socket is determined to be a loopback connection
+  // (i.e. the local and remote peer share an IP address).
+  //
+  // If any error occurs while determining this, returns false.
+  bool IsLoopbackConnection() const;
+
+  // Call bind() to bind the socket to a given address.
+  // If bind() fails and indicates that the requested port is already in use,
+  // generates an informative log message by calling 'lsof' if available.
+  Status Bind(const Sockaddr& bind_addr);
+
+  // Call accept(2) to get a new connection.
+  Status Accept(Socket *new_conn, Sockaddr *remote, int flags);
+
+  // start connecting this socket to a remote address.
+  Status Connect(const Sockaddr &remote);
+
+  // get the error status using getsockopt(2)
+  Status GetSockError() const;
+
+  // Write up to 'amt' bytes from 'buf' to the socket. The number of bytes
+  // actually written will be stored in 'nwritten'. If an error is returned,
+  // the value of 'nwritten' is undefined.
+  virtual Status Write(const uint8_t *buf, int32_t amt, int32_t *nwritten);
+
+  // Vectorized Write.
+  // If there is an error, that error needs to be resolved before calling again.
+  // If there was no error, but not all the bytes were written, the unwritten
+  // bytes must be retried. See writev(2) for more information.
+  virtual Status Writev(const struct ::iovec *iov, int iov_len, int64_t *nwritten);
+
+  // Blocking Write call, returns IOError unless full buffer is sent.
+  // Underlying Socket expected to be in blocking mode. Fails if any Write() sends 0 bytes.
+  // Returns OK if buflen bytes were sent, otherwise IOError.
+  // Upon return, nwritten will contain the number of bytes actually written.
+  // See also writen() from Stevens (2004) or Kerrisk (2010)
+  Status BlockingWrite(const uint8_t *buf, size_t buflen, size_t *nwritten,
+      const MonoTime& deadline);
+
+  virtual Status Recv(uint8_t *buf, int32_t amt, int32_t *nread);
+
+  // Blocking Recv call, returns IOError unless requested amt bytes are read.
+  // Underlying Socket expected to be in blocking mode. Fails if any Recv() reads 0 bytes.
+  // Returns OK if amt bytes were read, otherwise IOError.
+  // Upon return, nread will contain the number of bytes actually read.
+  // See also readn() from Stevens (2004) or Kerrisk (2010)
+  Status BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const MonoTime& deadline);
+
+ private:
+  // Called internally from SetSend/RecvTimeout().
+  Status SetTimeout(int opt, const char* optname, const MonoDelta& timeout);
+
+  // Called internally during socket setup.
+  Status SetCloseOnExec();
+
+  // Bind the socket to a local address before making an outbound connection,
+  // based on the value of FLAGS_local_ip_for_outbound_sockets.
+  Status BindForOutgoingConnection();
+
+  // Set an option on the socket.
+  template<typename T>
+  Status SetSockOpt(int level, int option, const T& value);
+
+  int fd_;
+
+  DISALLOW_COPY_AND_ASSIGN(Socket);
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/nvm_cache.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/nvm_cache.cc b/be/src/kudu/util/nvm_cache.cc
new file mode 100644
index 0000000..ef52ec0
--- /dev/null
+++ b/be/src/kudu/util/nvm_cache.cc
@@ -0,0 +1,577 @@
+// This file is derived from cache.cc in the LevelDB project:
+//
+//   Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved.
+//   Use of this source code is governed by a BSD-style license that can be
+//   found in the LICENSE file.
+//
+// ------------------------------------------------------------
+// This file implements a cache based on the NVML library (http://pmem.io),
+// specifically its "libvmem" component. This library makes it easy to program
+// against persistent memory hardware by exposing an API which parallels
+// malloc/free, but allocates from persistent memory instead of DRAM.
+//
+// We use this API to implement a cache which treats persistent memory or
+// non-volatile memory as if it were a larger cheaper bank of volatile memory. We
+// currently make no use of its persistence properties.
+//
+// Currently, we only store key/value in NVM. All other data structures such as the
+// ShardedLRUCache instances, hash table, etc are in DRAM. The assumption is that
+// the ratio of data stored vs overhead is quite high.
+
+#include "kudu/util/nvm_cache.h"
+
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <libvmem.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/atomic_refcount.h"
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/hash/city.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/cache_metrics.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/slice.h"
+
+DEFINE_string(nvm_cache_path, "/vmem",
+              "The path at which the NVM cache will try to allocate its memory. "
+              "This can be a tmpfs or ramfs for testing purposes.");
+TAG_FLAG(nvm_cache_path, experimental);
+
+DEFINE_int32(nvm_cache_allocation_retry_count, 10,
+             "The number of times that the NVM cache will retry attempts to allocate "
+             "memory for new entries. In between attempts, a cache entry will be "
+             "evicted.");
+TAG_FLAG(nvm_cache_allocation_retry_count, advanced);
+TAG_FLAG(nvm_cache_allocation_retry_count, experimental);
+
+DEFINE_bool(nvm_cache_simulate_allocation_failure, false,
+            "If true, the NVM cache will inject failures in calls to vmem_malloc "
+            "for testing.");
+TAG_FLAG(nvm_cache_simulate_allocation_failure, unsafe);
+
+
+namespace kudu {
+
+namespace {
+
+using std::shared_ptr;
+using std::string;
+using std::vector;
+
+typedef simple_spinlock MutexType;
+
+// LRU cache implementation
+
+// An entry is a variable length heap-allocated structure.  Entries
+// are kept in a circular doubly linked list ordered by access time.
+struct LRUHandle {
+  Cache::EvictionCallback* eviction_callback;
+  LRUHandle* next_hash;
+  LRUHandle* next;
+  LRUHandle* prev;
+  size_t charge;      // TODO(opt): Only allow uint32_t?
+  uint32_t key_length;
+  uint32_t val_length;
+  Atomic32 refs;
+  uint32_t hash;      // Hash of key(); used for fast sharding and comparisons
+  uint8_t* kv_data;
+
+  Slice key() const {
+    return Slice(kv_data, key_length);
+  }
+
+  Slice value() const {
+    return Slice(&kv_data[key_length], val_length);
+  }
+
+  uint8_t* val_ptr() {
+    return &kv_data[key_length];
+  }
+};
+
+// We provide our own simple hash table since it removes a whole bunch
+// of porting hacks and is also faster than some of the built-in hash
+// table implementations in some of the compiler/runtime combinations
+// we have tested.  E.g., readrandom speeds up by ~5% over the g++
+// 4.4.3's builtin hashtable.
+class HandleTable {
+ public:
+  HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); }
+  ~HandleTable() { delete[] list_; }
+
+  LRUHandle* Lookup(const Slice& key, uint32_t hash) {
+    return *FindPointer(key, hash);
+  }
+
+  LRUHandle* Insert(LRUHandle* h) {
+    LRUHandle** ptr = FindPointer(h->key(), h->hash);
+    LRUHandle* old = *ptr;
+    h->next_hash = (old == NULL ? NULL : old->next_hash);
+    *ptr = h;
+    if (old == NULL) {
+      ++elems_;
+      if (elems_ > length_) {
+        // Since each cache entry is fairly large, we aim for a small
+        // average linked list length (<= 1).
+        Resize();
+      }
+    }
+    return old;
+  }
+
+  LRUHandle* Remove(const Slice& key, uint32_t hash) {
+    LRUHandle** ptr = FindPointer(key, hash);
+    LRUHandle* result = *ptr;
+    if (result != NULL) {
+      *ptr = result->next_hash;
+      --elems_;
+    }
+    return result;
+  }
+
+ private:
+  // The table consists of an array of buckets where each bucket is
+  // a linked list of cache entries that hash into the bucket.
+  uint32_t length_;
+  uint32_t elems_;
+  LRUHandle** list_;
+
+  // Return a pointer to slot that points to a cache entry that
+  // matches key/hash.  If there is no such cache entry, return a
+  // pointer to the trailing slot in the corresponding linked list.
+  LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
+    LRUHandle** ptr = &list_[hash & (length_ - 1)];
+    while (*ptr != NULL &&
+           ((*ptr)->hash != hash || key != (*ptr)->key())) {
+      ptr = &(*ptr)->next_hash;
+    }
+    return ptr;
+  }
+
+  void Resize() {
+    uint32_t new_length = 16;
+    while (new_length < elems_ * 1.5) {
+      new_length *= 2;
+    }
+    LRUHandle** new_list = new LRUHandle*[new_length];
+    memset(new_list, 0, sizeof(new_list[0]) * new_length);
+    uint32_t count = 0;
+    for (uint32_t i = 0; i < length_; i++) {
+      LRUHandle* h = list_[i];
+      while (h != NULL) {
+        LRUHandle* next = h->next_hash;
+        uint32_t hash = h->hash;
+        LRUHandle** ptr = &new_list[hash & (new_length - 1)];
+        h->next_hash = *ptr;
+        *ptr = h;
+        h = next;
+        count++;
+      }
+    }
+    DCHECK_EQ(elems_, count);
+    delete[] list_;
+    list_ = new_list;
+    length_ = new_length;
+  }
+};
+
+// A single shard of sharded cache.
+class NvmLRUCache {
+ public:
+  explicit NvmLRUCache(VMEM *vmp);
+  ~NvmLRUCache();
+
+  // Separate from constructor so caller can easily make an array of LRUCache
+  void SetCapacity(size_t capacity) { capacity_ = capacity; }
+
+  void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; }
+
+  Cache::Handle* Insert(LRUHandle* h, Cache::EvictionCallback* eviction_callback);
+
+  // Like Cache::Lookup, but with an extra "hash" parameter.
+  Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
+  void Release(Cache::Handle* handle);
+  void Erase(const Slice& key, uint32_t hash);
+  void* AllocateAndRetry(size_t size);
+
+ private:
+  void NvmLRU_Remove(LRUHandle* e);
+  void NvmLRU_Append(LRUHandle* e);
+  // Just reduce the reference count by 1.
+  // Return true if last reference
+  bool Unref(LRUHandle* e);
+  void FreeEntry(LRUHandle* e);
+
+  // Evict the LRU item in the cache, adding it to the linked list
+  // pointed to by 'to_remove_head'.
+  void EvictOldestUnlocked(LRUHandle** to_remove_head);
+
+  // Free all of the entries in the linked list that has to_free_head
+  // as its head.
+  void FreeLRUEntries(LRUHandle* to_free_head);
+
+  // Wrapper around vmem_malloc which injects failures based on a flag.
+  void* VmemMalloc(size_t size);
+
+  // Initialized before use.
+  size_t capacity_;
+
+  // mutex_ protects the following state.
+  MutexType mutex_;
+  size_t usage_;
+
+  // Dummy head of LRU list.
+  // lru.prev is newest entry, lru.next is oldest entry.
+  LRUHandle lru_;
+
+  HandleTable table_;
+
+  VMEM* vmp_;
+
+  CacheMetrics* metrics_;
+};
+
+NvmLRUCache::NvmLRUCache(VMEM* vmp)
+  : usage_(0),
+  vmp_(vmp),
+  metrics_(NULL) {
+  // Make empty circular linked list
+  lru_.next = &lru_;
+  lru_.prev = &lru_;
+}
+
+NvmLRUCache::~NvmLRUCache() {
+  for (LRUHandle* e = lru_.next; e != &lru_; ) {
+    LRUHandle* next = e->next;
+    DCHECK_EQ(e->refs, 1);  // Error if caller has an unreleased handle
+    if (Unref(e)) {
+      FreeEntry(e);
+    }
+    e = next;
+  }
+}
+
+void* NvmLRUCache::VmemMalloc(size_t size) {
+  if (PREDICT_FALSE(FLAGS_nvm_cache_simulate_allocation_failure)) {
+    return NULL;
+  }
+  return vmem_malloc(vmp_, size);
+}
+
+bool NvmLRUCache::Unref(LRUHandle* e) {
+  DCHECK_GT(ANNOTATE_UNPROTECTED_READ(e->refs), 0);
+  return !base::RefCountDec(&e->refs);
+}
+
+void NvmLRUCache::FreeEntry(LRUHandle* e) {
+  DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(e->refs), 0);
+  if (e->eviction_callback) {
+    e->eviction_callback->EvictedEntry(e->key(), e->value());
+  }
+  if (PREDICT_TRUE(metrics_)) {
+    metrics_->cache_usage->DecrementBy(e->charge);
+    metrics_->evictions->Increment();
+  }
+  vmem_free(vmp_, e);
+}
+
+// Allocate nvm memory. Try until successful or FLAGS_nvm_cache_allocation_retry_count
+// has been exceeded.
+void *NvmLRUCache::AllocateAndRetry(size_t size) {
+  void *tmp;
+  // There may be times that an allocation fails. With NVM we have
+  // a fixed size to allocate from. If we cannot allocate the size
+  // that was asked for, we will remove entries from the cache and
+  // retry up to the configured number of retries. If this fails, we
+  // return NULL, which will cause the caller to not insert anything
+  // into the cache.
+  LRUHandle *to_remove_head = NULL;
+  tmp = VmemMalloc(size);
+
+  if (tmp == NULL) {
+    std::unique_lock<MutexType> l(mutex_);
+
+    int retries_remaining = FLAGS_nvm_cache_allocation_retry_count;
+    while (tmp == NULL && retries_remaining-- > 0 && lru_.next != &lru_) {
+      EvictOldestUnlocked(&to_remove_head);
+
+      // Unlock while allocating memory.
+      l.unlock();
+      tmp = VmemMalloc(size);
+      l.lock();
+    }
+  }
+
+  // we free the entries here outside of mutex for
+  // performance reasons
+  FreeLRUEntries(to_remove_head);
+  return tmp;
+}
+
+void NvmLRUCache::NvmLRU_Remove(LRUHandle* e) {
+  e->next->prev = e->prev;
+  e->prev->next = e->next;
+  usage_ -= e->charge;
+}
+
+void NvmLRUCache::NvmLRU_Append(LRUHandle* e) {
+  // Make "e" newest entry by inserting just before lru_
+  e->next = &lru_;
+  e->prev = lru_.prev;
+  e->prev->next = e;
+  e->next->prev = e;
+  usage_ += e->charge;
+}
+
+Cache::Handle* NvmLRUCache::Lookup(const Slice& key, uint32_t hash, bool caching) {
+ LRUHandle* e;
+  {
+    std::lock_guard<MutexType> l(mutex_);
+    e = table_.Lookup(key, hash);
+    if (e != NULL) {
+      // If an entry exists, remove the old entry from the cache
+      // and re-add to the end of the linked list.
+      base::RefCountInc(&e->refs);
+      NvmLRU_Remove(e);
+      NvmLRU_Append(e);
+    }
+  }
+
+  // Do the metrics outside of the lock.
+  if (metrics_) {
+    metrics_->lookups->Increment();
+    bool was_hit = (e != NULL);
+    if (was_hit) {
+      if (caching) {
+        metrics_->cache_hits_caching->Increment();
+      } else {
+        metrics_->cache_hits->Increment();
+      }
+    } else {
+      if (caching) {
+        metrics_->cache_misses_caching->Increment();
+      } else {
+        metrics_->cache_misses->Increment();
+      }
+    }
+  }
+
+  return reinterpret_cast<Cache::Handle*>(e);
+}
+
+void NvmLRUCache::Release(Cache::Handle* handle) {
+  LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
+  bool last_reference = Unref(e);
+  if (last_reference) {
+    FreeEntry(e);
+  }
+}
+
+void NvmLRUCache::EvictOldestUnlocked(LRUHandle** to_remove_head) {
+  LRUHandle* old = lru_.next;
+  NvmLRU_Remove(old);
+  table_.Remove(old->key(), old->hash);
+  if (Unref(old)) {
+    old->next = *to_remove_head;
+    *to_remove_head = old;
+  }
+}
+
+void NvmLRUCache::FreeLRUEntries(LRUHandle* to_free_head) {
+  while (to_free_head != NULL) {
+    LRUHandle* next = to_free_head->next;
+    FreeEntry(to_free_head);
+    to_free_head = next;
+  }
+}
+
+Cache::Handle* NvmLRUCache::Insert(LRUHandle* e,
+                                   Cache::EvictionCallback* eviction_callback) {
+  DCHECK(e);
+  LRUHandle* to_remove_head = NULL;
+
+  e->refs = 2;  // One from LRUCache, one for the returned handle
+  e->eviction_callback = eviction_callback;
+  if (PREDICT_TRUE(metrics_)) {
+    metrics_->cache_usage->IncrementBy(e->charge);
+    metrics_->inserts->Increment();
+  }
+
+  {
+    std::lock_guard<MutexType> l(mutex_);
+
+    NvmLRU_Append(e);
+
+    LRUHandle* old = table_.Insert(e);
+    if (old != NULL) {
+      NvmLRU_Remove(old);
+      if (Unref(old)) {
+        old->next = to_remove_head;
+        to_remove_head = old;
+      }
+    }
+
+    while (usage_ > capacity_ && lru_.next != &lru_) {
+      EvictOldestUnlocked(&to_remove_head);
+    }
+  }
+
+  // we free the entries here outside of mutex for
+  // performance reasons
+  FreeLRUEntries(to_remove_head);
+
+  return reinterpret_cast<Cache::Handle*>(e);
+}
+
+void NvmLRUCache::Erase(const Slice& key, uint32_t hash) {
+  LRUHandle* e;
+  bool last_reference = false;
+  {
+    std::lock_guard<MutexType> l(mutex_);
+    e = table_.Remove(key, hash);
+    if (e != NULL) {
+      NvmLRU_Remove(e);
+      last_reference = Unref(e);
+    }
+  }
+  // mutex not held here
+  // last_reference will only be true if e != NULL
+  if (last_reference) {
+    FreeEntry(e);
+  }
+}
+static const int kNumShardBits = 4;
+static const int kNumShards = 1 << kNumShardBits;
+
+class ShardedLRUCache : public Cache {
+ private:
+  gscoped_ptr<CacheMetrics> metrics_;
+  vector<NvmLRUCache*> shards_;
+  VMEM* vmp_;
+
+  static inline uint32_t HashSlice(const Slice& s) {
+    return util_hash::CityHash64(
+      reinterpret_cast<const char *>(s.data()), s.size());
+  }
+
+  static uint32_t Shard(uint32_t hash) {
+    return hash >> (32 - kNumShardBits);
+  }
+
+ public:
+  explicit ShardedLRUCache(size_t capacity, const string& /*id*/, VMEM* vmp)
+        : vmp_(vmp) {
+
+    const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
+    for (int s = 0; s < kNumShards; s++) {
+      gscoped_ptr<NvmLRUCache> shard(new NvmLRUCache(vmp_));
+      shard->SetCapacity(per_shard);
+      shards_.push_back(shard.release());
+    }
+  }
+
+  virtual ~ShardedLRUCache() {
+    STLDeleteElements(&shards_);
+    // Per the note at the top of this file, our cache is entirely volatile.
+    // Hence, when the cache is destructed, we delete the underlying
+    // VMEM pool.
+    vmem_delete(vmp_);
+  }
+
+  virtual Handle* Insert(PendingHandle* handle,
+                         Cache::EvictionCallback* eviction_callback) OVERRIDE {
+    LRUHandle* h = reinterpret_cast<LRUHandle*>(DCHECK_NOTNULL(handle));
+    return shards_[Shard(h->hash)]->Insert(h, eviction_callback);
+  }
+  virtual Handle* Lookup(const Slice& key, CacheBehavior caching) OVERRIDE {
+    const uint32_t hash = HashSlice(key);
+    return shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE);
+  }
+  virtual void Release(Handle* handle) OVERRIDE {
+    LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
+    shards_[Shard(h->hash)]->Release(handle);
+  }
+  virtual void Erase(const Slice& key) OVERRIDE {
+    const uint32_t hash = HashSlice(key);
+    shards_[Shard(hash)]->Erase(key, hash);
+  }
+  virtual Slice Value(Handle* handle) OVERRIDE {
+    return reinterpret_cast<LRUHandle*>(handle)->value();
+  }
+  virtual uint8_t* MutableValue(PendingHandle* handle) OVERRIDE {
+    return reinterpret_cast<LRUHandle*>(handle)->val_ptr();
+  }
+
+  virtual void SetMetrics(const scoped_refptr<MetricEntity>& entity) OVERRIDE {
+    metrics_.reset(new CacheMetrics(entity));
+    for (NvmLRUCache* cache : shards_) {
+      cache->SetMetrics(metrics_.get());
+    }
+  }
+  virtual PendingHandle* Allocate(Slice key, int val_len, int charge) OVERRIDE {
+    int key_len = key.size();
+    DCHECK_GE(key_len, 0);
+    DCHECK_GE(val_len, 0);
+    LRUHandle* handle = nullptr;
+
+    // Try allocating from each of the shards -- if vmem is tight,
+    // this can cause eviction, so we might have better luck in different
+    // shards.
+    for (NvmLRUCache* cache : shards_) {
+      uint8_t* buf = static_cast<uint8_t*>(cache->AllocateAndRetry(
+          sizeof(LRUHandle) + key_len + val_len));
+      if (buf) {
+        handle = reinterpret_cast<LRUHandle*>(buf);
+        handle->kv_data = &buf[sizeof(LRUHandle)];
+        handle->val_length = val_len;
+        handle->key_length = key_len;
+        handle->charge = (charge == kAutomaticCharge) ?
+            vmem_malloc_usable_size(vmp_, buf) : charge;
+        handle->hash = HashSlice(key);
+        memcpy(handle->kv_data, key.data(), key.size());
+        return reinterpret_cast<PendingHandle*>(handle);
+      }
+    }
+    // TODO: increment a metric here on allocation failure.
+    return nullptr;
+  }
+
+  virtual void Free(PendingHandle* ph) OVERRIDE {
+    vmem_free(vmp_, ph);
+  }
+};
+
+} // end anonymous namespace
+
+Cache* NewLRUNvmCache(size_t capacity, const std::string& id) {
+  // vmem_create() will fail if the capacity is too small, but with
+  // an inscrutable error. So, we'll check ourselves.
+  CHECK_GE(capacity, VMEM_MIN_POOL)
+    << "configured capacity " << capacity << " bytes is less than "
+    << "the minimum capacity for an NVM cache: " << VMEM_MIN_POOL;
+
+  VMEM* vmp = vmem_create(FLAGS_nvm_cache_path.c_str(), capacity);
+  // If we cannot create the cache pool we should not retry.
+  PLOG_IF(FATAL, vmp == NULL) << "Could not initialize NVM cache library in path "
+                              << FLAGS_nvm_cache_path.c_str();
+
+  return new ShardedLRUCache(capacity, id, vmp);
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/nvm_cache.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/nvm_cache.h b/be/src/kudu/util/nvm_cache.h
new file mode 100644
index 0000000..9a65316
--- /dev/null
+++ b/be/src/kudu/util/nvm_cache.h
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_NVM_CACHE_H_
+#define KUDU_UTIL_NVM_CACHE_H_
+
+#include <cstddef>
+#include <string>
+
+namespace kudu {
+class Cache;
+
+// Create a cache in persistent memory with the given capacity.
+Cache* NewLRUNvmCache(size_t capacity, const std::string& id);
+
+}  // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/object_pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/object_pool-test.cc b/be/src/kudu/util/object_pool-test.cc
new file mode 100644
index 0000000..ecfd641
--- /dev/null
+++ b/be/src/kudu/util/object_pool-test.cc
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/util/object_pool.h"
+
+namespace kudu {
+
+// Simple class which maintains a count of how many objects
+// are currently alive.
+class MyClass {
+ public:
+  MyClass() {
+    instance_count_++;
+  }
+
+  ~MyClass() {
+    instance_count_--;
+  }
+
+  static int instance_count() {
+    return instance_count_;
+  }
+
+  static void ResetCount() {
+    instance_count_ = 0;
+  }
+
+ private:
+  static int instance_count_;
+};
+int MyClass::instance_count_ = 0;
+
+TEST(TestObjectPool, TestPooling) {
+  MyClass::ResetCount();
+  {
+    ObjectPool<MyClass> pool;
+    ASSERT_EQ(0, MyClass::instance_count());
+    MyClass *a = pool.Construct();
+    ASSERT_EQ(1, MyClass::instance_count());
+    MyClass *b = pool.Construct();
+    ASSERT_EQ(2, MyClass::instance_count());
+    ASSERT_TRUE(a != b);
+    pool.Destroy(b);
+    ASSERT_EQ(1, MyClass::instance_count());
+    MyClass *c = pool.Construct();
+    ASSERT_EQ(2, MyClass::instance_count());
+    ASSERT_TRUE(c == b) << "should reuse instance";
+    pool.Destroy(c);
+
+    ASSERT_EQ(1, MyClass::instance_count());
+  }
+
+  ASSERT_EQ(0, MyClass::instance_count())
+    << "destructing pool should have cleared instances";
+}
+
+TEST(TestObjectPool, TestScopedPtr) {
+  MyClass::ResetCount();
+  ASSERT_EQ(0, MyClass::instance_count());
+  ObjectPool<MyClass> pool;
+  {
+    ObjectPool<MyClass>::scoped_ptr sptr(
+      pool.make_scoped_ptr(pool.Construct()));
+    ASSERT_EQ(1, MyClass::instance_count());
+  }
+  ASSERT_EQ(0, MyClass::instance_count());
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/object_pool.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/object_pool.h b/be/src/kudu/util/object_pool.h
new file mode 100644
index 0000000..64d4b5c
--- /dev/null
+++ b/be/src/kudu/util/object_pool.h
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Simple pool/freelist for objects of the same type, typically used
+// in local context.
+#ifndef KUDU_UTIL_OBJECT_POOL_H
+#define KUDU_UTIL_OBJECT_POOL_H
+
+#include <glog/logging.h>
+#include <stdint.h>
+#include "kudu/gutil/manual_constructor.h"
+#include "kudu/gutil/gscoped_ptr.h"
+
+namespace kudu {
+
+template<class T>
+class ReturnToPool;
+
+// An object pool allocates and destroys a single class of objects
+// off of a free-list.
+//
+// Upon destruction of the pool, any objects allocated from this pool are
+// destroyed, regardless of whether they have been explicitly returned to the
+// pool.
+//
+// This class is similar to the boost::pool::object_pool, except that the boost
+// implementation seems to have O(n) deallocation performance and benchmarked
+// really poorly.
+//
+// This class is not thread-safe.
+template<typename T>
+class ObjectPool {
+ public:
+  typedef ReturnToPool<T> deleter_type;
+  typedef gscoped_ptr<T, deleter_type> scoped_ptr;
+
+  ObjectPool() :
+    free_list_head_(NULL),
+    alloc_list_head_(NULL),
+    deleter_(this) {
+  }
+
+  ~ObjectPool() {
+    // Delete all objects ever allocated from this pool
+    ListNode *node = alloc_list_head_;
+    while (node != NULL) {
+      ListNode *tmp = node;
+      node = node->next_on_alloc_list;
+      if (!tmp->is_on_freelist) {
+        // Have to run the actual destructor if the user forgot to free it.
+        tmp->Destroy();
+      }
+      delete tmp;
+    }
+  }
+
+  // Construct a new object instance from the pool.
+  T *Construct() {
+    base::ManualConstructor<T> *obj = GetObject();
+    obj->Init();
+    return obj->get();
+  }
+
+  template<class Arg1>
+  T *Construct(Arg1 arg1) {
+    base::ManualConstructor<T> *obj = GetObject();
+    obj->Init(arg1);
+    return obj->get();
+  }
+
+  // Destroy an object, running its destructor and returning it to the
+  // free-list.
+  void Destroy(T *t) {
+    CHECK_NOTNULL(t);
+    ListNode *node = static_cast<ListNode *>(
+      reinterpret_cast<base::ManualConstructor<T> *>(t));
+
+    node->Destroy();
+
+    DCHECK(!node->is_on_freelist);
+    node->is_on_freelist = true;
+    node->next_on_free_list = free_list_head_;
+    free_list_head_ = node;
+  }
+
+  // Create a scoped_ptr wrapper around the given pointer which came from this
+  // pool.
+  // When the scoped_ptr goes out of scope, the object will get released back
+  // to the pool.
+  scoped_ptr make_scoped_ptr(T *ptr) {
+    return scoped_ptr(ptr, deleter_);
+  }
+
+ private:
+  class ListNode : base::ManualConstructor<T> {
+    friend class ObjectPool<T>;
+
+    ListNode *next_on_free_list;
+    ListNode *next_on_alloc_list;
+
+    bool is_on_freelist;
+  };
+
+
+  base::ManualConstructor<T> *GetObject() {
+    if (free_list_head_ != NULL) {
+      ListNode *tmp = free_list_head_;
+      free_list_head_ = tmp->next_on_free_list;
+      tmp->next_on_free_list = NULL;
+      DCHECK(tmp->is_on_freelist);
+      tmp->is_on_freelist = false;
+
+      return static_cast<base::ManualConstructor<T> *>(tmp);
+    }
+    auto new_node = new ListNode();
+    new_node->next_on_free_list = NULL;
+    new_node->next_on_alloc_list = alloc_list_head_;
+    new_node->is_on_freelist = false;
+    alloc_list_head_ = new_node;
+    return new_node;
+  }
+
+  // Keeps track of free objects in this pool.
+  ListNode *free_list_head_;
+
+  // Keeps track of all objects ever allocated by this pool.
+  ListNode *alloc_list_head_;
+
+  deleter_type deleter_;
+};
+
+// Functor which returns the passed objects to a specific object pool.
+// This can be used in conjunction with scoped_ptr to automatically release
+// an object back to a pool when it goes out of scope.
+template<class T>
+class ReturnToPool {
+ public:
+  explicit ReturnToPool(ObjectPool<T> *pool) :
+    pool_(pool) {
+  }
+
+  inline void operator()(T *ptr) const {
+    pool_->Destroy(ptr);
+  }
+
+ private:
+  ObjectPool<T> *pool_;
+};
+
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/oid_generator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/oid_generator-test.cc b/be/src/kudu/util/oid_generator-test.cc
new file mode 100644
index 0000000..be88061
--- /dev/null
+++ b/be/src/kudu/util/oid_generator-test.cc
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/oid_generator.h"
+
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using std::string;
+
+namespace kudu {
+
+TEST(ObjectIdGeneratorTest, TestCanoicalizeUuid) {
+  ObjectIdGenerator gen;
+  const string kExpectedCanonicalized = "0123456789abcdef0123456789abcdef";
+  string canonicalized;
+  Status s = gen.Canonicalize("not_a_uuid", &canonicalized);
+  {
+    SCOPED_TRACE(s.ToString());
+    ASSERT_TRUE(s.IsInvalidArgument());
+    ASSERT_STR_CONTAINS(s.ToString(), "invalid uuid");
+  }
+  ASSERT_OK(gen.Canonicalize(
+      "01234567-89ab-cdef-0123-456789abcdef", &canonicalized));
+  ASSERT_EQ(kExpectedCanonicalized, canonicalized);
+  ASSERT_OK(gen.Canonicalize(
+      "0123456789abcdef0123456789abcdef", &canonicalized));
+  ASSERT_EQ(kExpectedCanonicalized, canonicalized);
+  ASSERT_OK(gen.Canonicalize(
+      "0123456789AbCdEf0123456789aBcDeF", &canonicalized));
+  ASSERT_EQ(kExpectedCanonicalized, canonicalized);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/oid_generator.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/oid_generator.cc b/be/src/kudu/util/oid_generator.cc
new file mode 100644
index 0000000..eee7316
--- /dev/null
+++ b/be/src/kudu/util/oid_generator.cc
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/oid_generator.h"
+
+#include <cstdint>
+#include <exception>
+#include <mutex>
+#include <string>
+
+#include <boost/uuid/uuid.hpp>
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/status.h"
+
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+
+namespace {
+
+string ConvertUuidToString(const boost::uuids::uuid& to_convert) {
+  const uint8_t* uuid = to_convert.data;
+  return StringPrintf("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
+               uuid[0], uuid[1], uuid[2], uuid[3], uuid[4], uuid[5], uuid[6], uuid[7],
+               uuid[8], uuid[9], uuid[10], uuid[11], uuid[12], uuid[13], uuid[14], uuid[15]);
+}
+
+} // anonymous namespace
+
+string ObjectIdGenerator::Next() {
+  std::lock_guard<LockType> l(oid_lock_);
+  boost::uuids::uuid uuid = oid_generator_();
+  return ConvertUuidToString(uuid);
+}
+
+Status ObjectIdGenerator::Canonicalize(const string& input,
+                                       string* output) const {
+  try {
+    boost::uuids::uuid uuid = oid_validator_(input);
+    *output = ConvertUuidToString(uuid);
+    return Status::OK();
+  } catch (std::exception& e) {
+    return Status::InvalidArgument(Substitute("invalid uuid $0: $1",
+                                              input, e.what()));
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/oid_generator.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/oid_generator.h b/be/src/kudu/util/oid_generator.h
new file mode 100644
index 0000000..c1cc88f
--- /dev/null
+++ b/be/src/kudu/util/oid_generator.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_UTIL_OID_GENERATOR_H
+#define KUDU_UTIL_OID_GENERATOR_H
+
+#include <string>
+
+#include <boost/uuid/random_generator.hpp>
+#include <boost/uuid/string_generator.hpp>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// Generates a unique 32byte id, based on uuid v4.
+// This class is thread safe
+class ObjectIdGenerator {
+ public:
+  ObjectIdGenerator() {}
+  ~ObjectIdGenerator() {}
+
+  // Generates and returns a new UUID.
+  std::string Next();
+
+  // Validates an existing UUID and converts it into the format used by Kudu
+  // (that is, 16 hexadecimal bytes without any dashes).
+  Status Canonicalize(const std::string& input, std::string* output) const;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(ObjectIdGenerator);
+
+  typedef simple_spinlock LockType;
+
+  // Protects 'oid_generator_'.
+  LockType oid_lock_;
+
+  // Generates new UUIDs.
+  boost::uuids::random_generator oid_generator_;
+
+  // Validates provided UUIDs.
+  boost::uuids::string_generator oid_validator_;
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/once-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/once-test.cc b/be/src/kudu/util/once-test.cc
new file mode 100644
index 0000000..c0a79b1
--- /dev/null
+++ b/be/src/kudu/util/once-test.cc
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <ostream>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/once.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/thread.h"
+
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+namespace {
+
+struct Thing {
+  explicit Thing(bool should_fail)
+    : should_fail_(should_fail),
+      value_(0) {
+  }
+
+  Status Init() {
+    return once_.Init(&Thing::InitOnce, this);
+  }
+
+  Status InitOnce() {
+    if (should_fail_) {
+      return Status::IllegalState("Whoops!");
+    }
+    value_ = 1;
+    return Status::OK();
+  }
+
+  const bool should_fail_;
+  int value_;
+  KuduOnceDynamic once_;
+};
+
+} // anonymous namespace
+
+TEST(TestOnce, KuduOnceDynamicTest) {
+  {
+    Thing t(false);
+    ASSERT_EQ(0, t.value_);
+    ASSERT_FALSE(t.once_.init_succeeded());
+
+    for (int i = 0; i < 2; i++) {
+      ASSERT_OK(t.Init());
+      ASSERT_EQ(1, t.value_);
+      ASSERT_TRUE(t.once_.init_succeeded());
+    }
+  }
+
+  {
+    Thing t(true);
+    for (int i = 0; i < 2; i++) {
+      ASSERT_TRUE(t.Init().IsIllegalState());
+      ASSERT_EQ(0, t.value_);
+      ASSERT_FALSE(t.once_.init_succeeded());
+    }
+  }
+}
+
+static void InitOrGetInitted(Thing* t, int i) {
+  if (i % 2 == 0) {
+    LOG(INFO) << "Thread " << i << " initting";
+    t->Init();
+  } else {
+    LOG(INFO) << "Thread " << i << " value: " << t->once_.init_succeeded();
+  }
+}
+
+TEST(TestOnce, KuduOnceDynamicThreadSafeTest) {
+  Thing thing(false);
+
+  // The threads will read and write to thing.once_.initted. If access to
+  // it is not synchronized, TSAN will flag the access as data races.
+  vector<scoped_refptr<Thread> > threads;
+  for (int i = 0; i < 10; i++) {
+    scoped_refptr<Thread> t;
+    ASSERT_OK(Thread::Create("test", Substitute("thread $0", i),
+                             &InitOrGetInitted, &thing, i, &t));
+    threads.push_back(t);
+  }
+
+  for (const scoped_refptr<Thread>& t : threads) {
+    t->Join();
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/once.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/once.cc b/be/src/kudu/util/once.cc
new file mode 100644
index 0000000..fada777
--- /dev/null
+++ b/be/src/kudu/util/once.cc
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/once.h"
+
+#include "kudu/util/malloc.h"
+
+namespace kudu {
+
+size_t KuduOnceDynamic::memory_footprint_excluding_this() const {
+  return status_.memory_footprint_excluding_this();
+}
+
+size_t KuduOnceDynamic::memory_footprint_including_this() const {
+  return kudu_malloc_usable_size(this) + memory_footprint_excluding_this();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/once.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/once.h b/be/src/kudu/util/once.h
new file mode 100644
index 0000000..0f43064
--- /dev/null
+++ b/be/src/kudu/util/once.h
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_ONCE_H
+#define KUDU_UTIL_ONCE_H
+
+#include <stddef.h>
+
+#include "kudu/gutil/once.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class KuduOnceDynamic;
+
+namespace internal {
+
+// Cheap, single-arg "bound callback" (similar to kudu::Callback) for use
+// in KuduOnceDynamic.
+template<typename T>
+struct MemberFunc {
+  KuduOnceDynamic* once;
+  T* instance;
+  Status (T::*member_func)();
+};
+
+template<typename T>
+void InitCb(void* arg) {
+  MemberFunc<T>* mf = reinterpret_cast<MemberFunc<T>*>(arg);
+  mf->once->status_ = (mf->instance->*mf->member_func)();
+  if (PREDICT_TRUE(mf->once->status_.ok())) {
+    mf->once->set_init_succeeded();
+  }
+}
+
+} // namespace internal
+
+// More versatile version of GoogleOnceDynamic, including the following:
+// - Non-static member functions are registered and run via Init().
+// - The first time Init() is called, the registered function is run and the
+//   resulting status is stored.
+// - Regardless of whether Init() succeeded, the function will cease to run on
+//   subsequent calls to Init(), and the stored result will be returned instead.
+// - Access to initialization state is safe for concurrent use.
+class KuduOnceDynamic {
+ public:
+  KuduOnceDynamic()
+    : init_succeeded_(false) {
+  }
+
+  // If the underlying GoogleOnceDynamic has yet to be invoked, invokes the
+  // provided member function and stores its return value. Otherwise,
+  // returns the stored Status.
+  //
+  // T: the type of the member passed in.
+  template<typename T>
+  Status Init(Status (T::*member_func)(), T* instance) {
+    internal::MemberFunc<T> mf = { this, instance, member_func };
+
+    // Clang UBSAN doesn't like it when GoogleOnceDynamic handles the cast
+    // of the argument:
+    //
+    //   runtime error: call to function
+    //   kudu::cfile::BloomFileReader::InitOnceCb(kudu::cfile::BloomFileReader*)
+    //   through pointer to incorrect function type 'void (*)(void *)'
+    //
+    // So let's do the cast ourselves, to void* here and back in InitCb().
+    once_.Init(&internal::InitCb<T>, reinterpret_cast<void*>(&mf));
+    return status_;
+  }
+
+  // kMemOrderAcquire ensures that loads/stores that come after init_succeeded()
+  // aren't reordered to come before it instead. kMemOrderRelease ensures
+  // the opposite (i.e. loads/stores before set_init_succeeded() aren't reordered
+  // to come after it).
+  //
+  // Taken together, threads can safely synchronize on init_succeeded_.
+  bool init_succeeded() const { return init_succeeded_.Load(kMemOrderAcquire); }
+
+  // Returns the memory usage of this object without the object itself. Should
+  // be used when embedded inside another object.
+  size_t memory_footprint_excluding_this() const;
+
+  // Returns the memory usage of this object including the object itself.
+  // Should be used when allocated on the heap.
+  size_t memory_footprint_including_this() const;
+
+ private:
+  template<typename T>
+  friend void internal::InitCb(void* arg);
+
+  void set_init_succeeded() { init_succeeded_.Store(true, kMemOrderRelease); }
+
+  AtomicBool init_succeeded_;
+  GoogleOnceDynamic once_;
+  Status status_;
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/os-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/os-util-test.cc b/be/src/kudu/util/os-util-test.cc
new file mode 100644
index 0000000..a96e69d
--- /dev/null
+++ b/be/src/kudu/util/os-util-test.cc
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/os-util.h"
+
+#include <unistd.h>
+
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/test_macros.h"
+
+using std::string;
+
+namespace kudu {
+
+void RunTest(const string& name, int user_ticks, int kernel_ticks, int io_wait) {
+  string buf = strings::Substitute(string("0 ($0) S 0 0 0 0 0 0 0") +
+                                   " 0 0 0 $1 $2 0 0 0 0 0"         +
+                                   " 0 0 0 0 0 0 0 0 0 0 "          +
+                                   " 0 0 0 0 0 0 0 0 0 0 "          +
+                                   " 0 $3 0 0 0 0 0 0 0 0 "         +
+                                   " 0 0",
+                                   name, user_ticks, kernel_ticks, io_wait);
+  ThreadStats stats;
+  string extracted_name;
+  ASSERT_OK(ParseStat(buf, &extracted_name, &stats));
+  ASSERT_EQ(name, extracted_name);
+  ASSERT_EQ(user_ticks * (1e9 / sysconf(_SC_CLK_TCK)), stats.user_ns);
+  ASSERT_EQ(kernel_ticks * (1e9 / sysconf(_SC_CLK_TCK)), stats.kernel_ns);
+  ASSERT_EQ(io_wait * (1e9 / sysconf(_SC_CLK_TCK)), stats.iowait_ns);
+}
+
+TEST(OsUtilTest, TestSelf) {
+  RunTest("test", 111, 222, 333);
+}
+
+TEST(OsUtilTest, TestSelfNameWithSpace) {
+  RunTest("a space", 111, 222, 333);
+}
+
+TEST(OsUtilTest, TestSelfNameWithParens) {
+  RunTest("a(b(c((d))e)", 111, 222, 333);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/os-util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/os-util.cc b/be/src/kudu/util/os-util.cc
new file mode 100644
index 0000000..df7761f
--- /dev/null
+++ b/be/src/kudu/util/os-util.cc
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Imported from Impala. Changes include:
+// - Namespace and imports.
+// - Replaced GetStrErrMsg with ErrnoToString.
+// - Replaced StringParser with strings/numbers.
+// - Fixes for cpplint.
+// - Fixed parsing when thread names have spaces.
+
+#include "kudu/util/os-util.h"
+
+#include <fcntl.h>
+#include <sys/resource.h>
+#include <unistd.h>
+
+#include <cstddef>
+#include <fstream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/logging.h"
+
+using std::ifstream;
+using std::istreambuf_iterator;
+using std::ostringstream;
+using std::string;
+using std::vector;
+using strings::Split;
+using strings::Substitute;
+
+namespace kudu {
+
+// Ensure that Impala compiles on earlier kernels. If the target kernel does not support
+// _SC_CLK_TCK, sysconf(_SC_CLK_TCK) will return -1.
+#ifndef _SC_CLK_TCK
+#define _SC_CLK_TCK 2
+#endif
+
+static const int64_t kTicksPerSec = sysconf(_SC_CLK_TCK);
+
+// Offsets into the ../stat file array of per-thread statistics.
+//
+// They are themselves offset by two because the pid and comm fields of the
+// file are parsed separately.
+static const int64_t kUserTicks = 13 - 2;
+static const int64_t kKernelTicks = 14 - 2;
+static const int64_t kIoWait = 41 - 2;
+
+// Largest offset we are interested in, to check we get a well formed stat file.
+static const int64_t kMaxOffset = kIoWait;
+
+Status ParseStat(const std::string& buffer, std::string* name, ThreadStats* stats) {
+  DCHECK(stats != nullptr);
+
+  // The thread name should be the only field with parentheses. But the name
+  // itself may contain parentheses.
+  size_t open_paren = buffer.find('(');
+  size_t close_paren = buffer.rfind(')');
+  if (open_paren == string::npos  ||      // '(' must exist
+      close_paren == string::npos ||      // ')' must exist
+      open_paren >= close_paren   ||      // '(' must come before ')'
+      close_paren + 2 == buffer.size()) { // there must be at least two chars after ')'
+    return Status::IOError("Unrecognised /proc format");
+  }
+  string extracted_name = buffer.substr(open_paren + 1, close_paren - (open_paren + 1));
+  string rest = buffer.substr(close_paren + 2);
+  vector<string> splits = Split(rest, " ", strings::SkipEmpty());
+  if (splits.size() < kMaxOffset) {
+    return Status::IOError("Unrecognised /proc format");
+  }
+
+  int64_t tmp;
+  if (safe_strto64(splits[kUserTicks], &tmp)) {
+    stats->user_ns = tmp * (1e9 / kTicksPerSec);
+  }
+  if (safe_strto64(splits[kKernelTicks], &tmp)) {
+    stats->kernel_ns = tmp * (1e9 / kTicksPerSec);
+  }
+  if (safe_strto64(splits[kIoWait], &tmp)) {
+    stats->iowait_ns = tmp * (1e9 / kTicksPerSec);
+  }
+  if (name != nullptr) {
+    *name = extracted_name;
+  }
+  return Status::OK();
+
+}
+
+Status GetThreadStats(int64_t tid, ThreadStats* stats) {
+  DCHECK(stats != nullptr);
+  if (kTicksPerSec <= 0) {
+    return Status::NotSupported("ThreadStats not supported");
+  }
+
+  ostringstream proc_path;
+  proc_path << "/proc/self/task/" << tid << "/stat";
+  ifstream proc_file(proc_path.str().c_str());
+  if (!proc_file.is_open()) {
+    return Status::IOError("Could not open ifstream");
+  }
+
+  string buffer((istreambuf_iterator<char>(proc_file)),
+      istreambuf_iterator<char>());
+
+  return ParseStat(buffer, nullptr, stats); // don't want the name
+}
+
+void DisableCoreDumps() {
+  struct rlimit lim;
+  PCHECK(getrlimit(RLIMIT_CORE, &lim) == 0);
+  lim.rlim_cur = 0;
+  PCHECK(setrlimit(RLIMIT_CORE, &lim) == 0);
+
+  // Set coredump_filter to not dump any parts of the address space.
+  // Although the above disables core dumps to files, if core_pattern
+  // is set to a pipe rather than a file, it's not sufficient. Setting
+  // this pattern results in piping a very minimal dump into the core
+  // processor (eg abrtd), thus speeding up the crash.
+  int f;
+  RETRY_ON_EINTR(f, open("/proc/self/coredump_filter", O_WRONLY));
+  if (f >= 0) {
+    ssize_t ret;
+    RETRY_ON_EINTR(ret, write(f, "00000000", 8));
+    int close_ret;
+    RETRY_ON_EINTR(close_ret, close(f));
+  }
+}
+
+bool IsBeingDebugged() {
+#ifndef __linux__
+  return false;
+#else
+  // Look for the TracerPid line in /proc/self/status.
+  // If this is non-zero, we are being ptraced, which is indicative of gdb or strace
+  // being attached.
+  faststring buf;
+  Status s = ReadFileToString(Env::Default(), "/proc/self/status", &buf);
+  if (!s.ok()) {
+    KLOG_FIRST_N(WARNING, 1) << "could not read /proc/self/status: " << s.ToString();
+    return false;
+  }
+  StringPiece buf_sp(reinterpret_cast<const char*>(buf.data()), buf.size());
+  vector<StringPiece> lines = Split(buf_sp, "\n");
+  for (const auto& l : lines) {
+    if (!HasPrefixString(l, "TracerPid:")) continue;
+    std::pair<StringPiece, StringPiece> key_val = Split(l, "\t");
+    int64_t tracer_pid = -1;
+    if (!safe_strto64(key_val.second.data(), key_val.second.size(), &tracer_pid)) {
+      KLOG_FIRST_N(WARNING, 1) << "Invalid line in /proc/self/status: " << l;
+      return false;
+    }
+    return tracer_pid != 0;
+  }
+  KLOG_FIRST_N(WARNING, 1) << "Could not find TracerPid line in /proc/self/status";
+  return false;
+#endif // __linux__
+}
+
+} // namespace kudu