You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:33 UTC

[21/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/errno-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/errno-test.cc b/be/src/kudu/util/errno-test.cc
new file mode 100644
index 0000000..f628b55
--- /dev/null
+++ b/be/src/kudu/util/errno-test.cc
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cerrno>
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/errno.h"
+
+using std::string;
+
+namespace kudu {
+
+TEST(OsUtilTest, TestErrnoToString) {
+  int err = ENOENT;
+
+  // Non-truncated result.
+  ASSERT_EQ("No such file or directory", ErrnoToString(err));
+
+  // Truncated because of a short buffer.
+  char buf[2];
+  ErrnoToCString(err, buf, arraysize(buf));
+  ASSERT_EQ("N", string(buf));
+
+  // Unknown error.
+  string expected = "Unknown error";
+  ASSERT_EQ(ErrnoToString(-1).compare(0, expected.length(), expected), 0);
+
+  // Unknown error (truncated).
+  ErrnoToCString(-1, buf, arraysize(buf));
+  ASSERT_EQ("U", string(buf));
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/errno.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/errno.cc b/be/src/kudu/util/errno.cc
new file mode 100644
index 0000000..cc00d0f
--- /dev/null
+++ b/be/src/kudu/util/errno.cc
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/errno.h"
+
+#include <cstring>
+#include <glog/logging.h>
+
+#include "kudu/gutil/dynamic_annotations.h"
+// IWYU pragma: no_include <features.h>
+
+namespace kudu {
+
+void ErrnoToCString(int err, char *buf, size_t buf_len) {
+  CHECK_GT(buf_len, 0);
+#if !defined(__GLIBC__) || \
+  ((_POSIX_C_SOURCE >= 200112 || _XOPEN_SOURCE >= 600) && !defined(_GNU_SOURCE))
+  // Using POSIX version 'int strerror_r(...)'.
+  int ret = strerror_r(err, buf, buf_len);
+  if (ret && ret != ERANGE && ret != EINVAL) {
+    strncpy(buf, "unknown error", buf_len);
+    buf[buf_len - 1] = '\0';
+  }
+#else
+  // Using GLIBC version
+
+  // KUDU-1515: TSAN in Clang 3.9 has an incorrect interceptor for strerror_r:
+  // https://github.com/google/sanitizers/issues/696
+  ANNOTATE_IGNORE_WRITES_BEGIN();
+  char* ret = strerror_r(err, buf, buf_len);
+  ANNOTATE_IGNORE_WRITES_END();
+  if (ret != buf) {
+    strncpy(buf, ret, buf_len);
+    buf[buf_len - 1] = '\0';
+  }
+#endif
+}
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/errno.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/errno.h b/be/src/kudu/util/errno.h
new file mode 100644
index 0000000..89802de
--- /dev/null
+++ b/be/src/kudu/util/errno.h
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_ERRNO_H
+#define KUDU_ERRNO_H
+
+#include <cstddef>
+#include <string>
+
+namespace kudu {
+
+void ErrnoToCString(int err, char *buf, size_t buf_len);
+
+// Return a string representing an errno.
+inline static std::string ErrnoToString(int err) {
+  char buf[512];
+  ErrnoToCString(err, buf, sizeof(buf));
+  return std::string(buf);
+}
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/faststring-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/faststring-test.cc b/be/src/kudu/util/faststring-test.cc
new file mode 100644
index 0000000..07c5697
--- /dev/null
+++ b/be/src/kudu/util/faststring-test.cc
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstring>
+#include <memory>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/faststring.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+class FaststringTest : public KuduTest {};
+
+TEST_F(FaststringTest, TestShrinkToFit_Empty) {
+  faststring s;
+  s.shrink_to_fit();
+  ASSERT_EQ(faststring::kInitialCapacity, s.capacity());
+}
+
+// Test that, if the string contents is shorter than the initial capacity
+// of the faststring, shrink_to_fit() leaves the string in the built-in
+// array.
+TEST_F(FaststringTest, TestShrinkToFit_SmallerThanInitialCapacity) {
+  faststring s;
+  s.append("hello");
+  s.shrink_to_fit();
+  ASSERT_EQ(faststring::kInitialCapacity, s.capacity());
+}
+
+TEST_F(FaststringTest, TestShrinkToFit_Random) {
+  Random r(GetRandomSeed32());
+  int kMaxSize = faststring::kInitialCapacity * 2;
+  std::unique_ptr<char[]> random_bytes(new char[kMaxSize]);
+  RandomString(random_bytes.get(), kMaxSize, &r);
+
+  faststring s;
+  for (int i = 0; i < 100; i++) {
+    int new_size = r.Uniform(kMaxSize);
+    s.resize(new_size);
+    memcpy(s.data(), random_bytes.get(), new_size);
+    s.shrink_to_fit();
+    ASSERT_EQ(0, memcmp(s.data(), random_bytes.get(), new_size));
+    ASSERT_EQ(std::max<int>(faststring::kInitialCapacity, new_size), s.capacity());
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/faststring.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/faststring.cc b/be/src/kudu/util/faststring.cc
new file mode 100644
index 0000000..a1cd26b
--- /dev/null
+++ b/be/src/kudu/util/faststring.cc
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/faststring.h"
+
+#include <glog/logging.h>
+#include <memory>
+
+namespace kudu {
+
+void faststring::GrowByAtLeast(size_t count) {
+  // Not enough space, need to reserve more.
+  // Don't reserve exactly enough space for the new string -- that makes it
+  // too easy to write perf bugs where you get O(n^2) append.
+  // Instead, alwayhs expand by at least 50%.
+
+  size_t to_reserve = len_ + count;
+  if (len_ + count < len_ * 3 / 2) {
+    to_reserve = len_ *  3 / 2;
+  }
+  GrowArray(to_reserve);
+}
+
+void faststring::GrowArray(size_t newcapacity) {
+  DCHECK_GE(newcapacity, capacity_);
+  std::unique_ptr<uint8_t[]> newdata(new uint8_t[newcapacity]);
+  if (len_ > 0) {
+    memcpy(&newdata[0], &data_[0], len_);
+  }
+  capacity_ = newcapacity;
+  if (data_ != initial_data_) {
+    delete[] data_;
+  } else {
+    ASAN_POISON_MEMORY_REGION(initial_data_, arraysize(initial_data_));
+  }
+
+  data_ = newdata.release();
+  ASAN_POISON_MEMORY_REGION(data_ + len_, capacity_ - len_);
+}
+
+void faststring::ShrinkToFitInternal() {
+  DCHECK_NE(data_, initial_data_);
+  if (len_ <= kInitialCapacity) {
+    ASAN_UNPOISON_MEMORY_REGION(initial_data_, len_);
+    memcpy(initial_data_, &data_[0], len_);
+    delete[] data_;
+    data_ = initial_data_;
+    capacity_ = kInitialCapacity;
+  } else {
+    std::unique_ptr<uint8_t[]> newdata(new uint8_t[len_]);
+    memcpy(&newdata[0], &data_[0], len_);
+    delete[] data_;
+    data_ = newdata.release();
+    capacity_ = len_;
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/faststring.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/faststring.h b/be/src/kudu/util/faststring.h
new file mode 100644
index 0000000..992060b
--- /dev/null
+++ b/be/src/kudu/util/faststring.h
@@ -0,0 +1,259 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_FASTSTRING_H
+#define KUDU_UTIL_FASTSTRING_H
+
+#include <cstdint>
+#include <cstring>
+#include <string>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/fastmem.h"
+
+namespace kudu {
+
+// A faststring is similar to a std::string, except that it is faster for many
+// common use cases (in particular, resize() will fill with uninitialized data
+// instead of memsetting to \0)
+class faststring {
+ public:
+  enum {
+    kInitialCapacity = 32
+  };
+
+  faststring() :
+    data_(initial_data_),
+    len_(0),
+    capacity_(kInitialCapacity) {
+  }
+
+  // Construct a string with the given capacity, in bytes.
+  explicit faststring(size_t capacity)
+    : data_(initial_data_),
+      len_(0),
+      capacity_(kInitialCapacity) {
+    if (capacity > capacity_) {
+      data_ = new uint8_t[capacity];
+      capacity_ = capacity;
+    }
+    ASAN_POISON_MEMORY_REGION(data_, capacity_);
+  }
+
+  ~faststring() {
+    ASAN_UNPOISON_MEMORY_REGION(initial_data_, arraysize(initial_data_));
+    if (data_ != initial_data_) {
+      delete[] data_;
+    }
+  }
+
+  // Reset the valid length of the string to 0.
+  //
+  // This does not free up any memory. The capacity of the string remains unchanged.
+  void clear() {
+    resize(0);
+    ASAN_POISON_MEMORY_REGION(data_, capacity_);
+  }
+
+  // Resize the string to the given length.
+  // If the new length is larger than the old length, the capacity is expanded as necessary.
+  //
+  // NOTE: in contrast to std::string's implementation, Any newly "exposed" bytes of data are
+  // not cleared.
+  void resize(size_t newsize) {
+    if (newsize > capacity_) {
+      reserve(newsize);
+    }
+    len_ = newsize;
+    ASAN_POISON_MEMORY_REGION(data_ + len_, capacity_ - len_);
+    ASAN_UNPOISON_MEMORY_REGION(data_, len_);
+  }
+
+  // Releases the underlying array; after this, the buffer is left empty.
+  //
+  // NOTE: the data pointer returned by release() is not necessarily the pointer
+  uint8_t *release() WARN_UNUSED_RESULT {
+    uint8_t *ret = data_;
+    if (ret == initial_data_) {
+      ret = new uint8_t[len_];
+      memcpy(ret, data_, len_);
+    }
+    len_ = 0;
+    capacity_ = kInitialCapacity;
+    data_ = initial_data_;
+    ASAN_POISON_MEMORY_REGION(data_, capacity_);
+    return ret;
+  }
+
+  // Reserve space for the given total amount of data. If the current capacity is already
+  // larger than the newly requested capacity, this is a no-op (i.e. it does not ever free memory).
+  //
+  // NOTE: even though the new capacity is reserved, it is illegal to begin writing into that memory
+  // directly using pointers. If ASAN is enabled, this is ensured using manual memory poisoning.
+  void reserve(size_t newcapacity) {
+    if (PREDICT_TRUE(newcapacity <= capacity_)) return;
+    GrowArray(newcapacity);
+  }
+
+  // Append the given data to the string, resizing capacity as necessary.
+  void append(const void *src_v, size_t count) {
+    const uint8_t *src = reinterpret_cast<const uint8_t *>(src_v);
+    EnsureRoomForAppend(count);
+    ASAN_UNPOISON_MEMORY_REGION(data_ + len_, count);
+
+    // appending short values is common enough that this
+    // actually helps, according to benchmarks. In theory
+    // memcpy_inlined should already be just as good, but this
+    // was ~20% faster for reading a large prefix-coded string file
+    // where each string was only a few chars different
+    if (count <= 4) {
+      uint8_t *p = &data_[len_];
+      for (int i = 0; i < count; i++) {
+        *p++ = *src++;
+      }
+    } else {
+      strings::memcpy_inlined(&data_[len_], src, count);
+    }
+    len_ += count;
+  }
+
+  // Append the given string to this string.
+  void append(const std::string &str) {
+    append(str.data(), str.size());
+  }
+
+  // Append the given character to this string.
+  void push_back(const char byte) {
+    EnsureRoomForAppend(1);
+    ASAN_UNPOISON_MEMORY_REGION(data_ + len_, 1);
+    data_[len_] = byte;
+    len_++;
+  }
+
+  // Return the valid length of this string.
+  size_t length() const {
+    return len_;
+  }
+
+  // Return the valid length of this string (identical to length())
+  size_t size() const {
+    return len_;
+  }
+
+  // Return the allocated capacity of this string.
+  size_t capacity() const {
+    return capacity_;
+  }
+
+  // Return a pointer to the data in this string. Note that this pointer
+  // may be invalidated by any later non-const operation.
+  const uint8_t *data() const {
+    return &data_[0];
+  }
+
+  // Return a pointer to the data in this string. Note that this pointer
+  // may be invalidated by any later non-const operation.
+  uint8_t *data() {
+    return &data_[0];
+  }
+
+  // Return the given element of this string. Note that this does not perform
+  // any bounds checking.
+  const uint8_t &at(size_t i) const {
+    return data_[i];
+  }
+
+  // Return the given element of this string. Note that this does not perform
+  // any bounds checking.
+  const uint8_t &operator[](size_t i) const {
+    return data_[i];
+  }
+
+  // Return the given element of this string. Note that this does not perform
+  // any bounds checking.
+  uint8_t &operator[](size_t i) {
+    return data_[i];
+  }
+
+  // Reset the contents of this string by copying 'len' bytes from 'src'.
+  void assign_copy(const uint8_t *src, size_t len) {
+    // Reset length so that the first resize doesn't need to copy the current
+    // contents of the array.
+    len_ = 0;
+    resize(len);
+    memcpy(data(), src, len);
+  }
+
+  // Reset the contents of this string by copying from the given std::string.
+  void assign_copy(const std::string &str) {
+    assign_copy(reinterpret_cast<const uint8_t *>(str.c_str()),
+                str.size());
+  }
+
+  // Reallocates the internal storage to fit only the current data.
+  //
+  // This may revert to using internal storage if the current length is shorter than
+  // kInitialCapacity. Note that, in that case, after this call, capacity() will return
+  // a capacity larger than the data length.
+  //
+  // Any pointers within this instance are invalidated.
+  void shrink_to_fit() {
+    if (data_ == initial_data_ || capacity_ == len_) return;
+    ShrinkToFitInternal();
+  }
+
+  // Return a copy of this string as a std::string.
+  std::string ToString() const {
+    return std::string(reinterpret_cast<const char *>(data()),
+                       len_);
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(faststring);
+
+  // If necessary, expand the buffer to fit at least 'count' more bytes.
+  // If the array has to be grown, it is grown by at least 50%.
+  void EnsureRoomForAppend(size_t count) {
+    if (PREDICT_TRUE(len_ + count <= capacity_)) {
+      return;
+    }
+
+    // Call the non-inline slow path - this reduces the number of instructions
+    // on the hot path.
+    GrowByAtLeast(count);
+  }
+
+  // The slow path of MakeRoomFor. Grows the buffer by either
+  // 'count' bytes, or 50%, whichever is more.
+  void GrowByAtLeast(size_t count);
+
+  // Grow the array to the given capacity, which must be more than
+  // the current capacity.
+  void GrowArray(size_t newcapacity);
+
+  void ShrinkToFitInternal();
+
+  uint8_t* data_;
+  uint8_t initial_data_[kInitialCapacity];
+  size_t len_;
+  size_t capacity_;
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/fault_injection.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/fault_injection.cc b/be/src/kudu/util/fault_injection.cc
new file mode 100644
index 0000000..6638bb6
--- /dev/null
+++ b/be/src/kudu/util/fault_injection.cc
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/fault_injection.h"
+
+#include <unistd.h>
+
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/once.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+
+namespace kudu {
+namespace fault_injection {
+
+namespace {
+GoogleOnceType g_random_once;
+Random* g_random;
+
+void InitRandom() {
+  LOG(WARNING) << "FAULT INJECTION ENABLED!";
+  LOG(WARNING) << "THIS SERVER MAY CRASH!";
+
+  debug::ScopedLeakCheckDisabler d;
+  g_random = new Random(GetRandomSeed32());
+  ANNOTATE_BENIGN_RACE_SIZED(g_random, sizeof(Random),
+                             "Racy random numbers are OK");
+}
+
+} // anonymous namespace
+
+void DoMaybeFault(const char* fault_str, double fraction) {
+  GoogleOnceInit(&g_random_once, InitRandom);
+  if (PREDICT_TRUE(g_random->NextDoubleFraction() >= fraction)) {
+    return;
+  }
+  LOG(ERROR) << "Injecting fault: " << fault_str << " (process will exit)";
+  // _exit will exit the program without running atexit handlers. This more
+  // accurately simulates a crash.
+  _exit(kExitStatus);
+}
+
+void DoInjectRandomLatency(double max_latency_ms) {
+  GoogleOnceInit(&g_random_once, InitRandom);
+  SleepFor(MonoDelta::FromMilliseconds(g_random->NextDoubleFraction() * max_latency_ms));
+}
+
+void DoInjectFixedLatency(int32_t latency_ms) {
+  SleepFor(MonoDelta::FromMilliseconds(latency_ms));
+}
+
+bool DoMaybeTrue(double fraction) {
+  GoogleOnceInit(&g_random_once, InitRandom);
+  return PREDICT_FALSE(g_random->NextDoubleFraction() <= fraction);
+}
+
+} // namespace fault_injection
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/fault_injection.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/fault_injection.h b/be/src/kudu/util/fault_injection.h
new file mode 100644
index 0000000..7a71698
--- /dev/null
+++ b/be/src/kudu/util/fault_injection.h
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_FAULT_INJECTION_H
+#define KUDU_UTIL_FAULT_INJECTION_H
+
+#include <stdint.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/status.h"
+
+// Macros for injecting various kinds of faults with varying probability. If
+// configured with 0 probability, each of these macros is evaluated inline and
+// is fast enough to run even in hot code paths.
+
+// With some probability, crash at the current point in the code
+// by issuing LOG(FATAL).
+//
+// The probability is determined by the 'fraction_flag' argument.
+//
+// Typical usage:
+//
+//   DEFINE_double(fault_crash_before_foo, 0.0,
+//                 "Fraction of the time when we will crash before doing foo");
+//   TAG_FLAG(fault_crash_before_foo, unsafe);
+#define MAYBE_FAULT(fraction_flag) \
+  kudu::fault_injection::MaybeFault(AS_STRING(fraction_flag), fraction_flag)
+
+// Inject a uniformly random amount of latency between 0 and the configured
+// number of milliseconds.
+#define MAYBE_INJECT_RANDOM_LATENCY(max_ms_flag) \
+  kudu::fault_injection::MaybeInjectRandomLatency(max_ms_flag)
+
+// Inject a specific amount of latency.
+#define MAYBE_INJECT_FIXED_LATENCY(ms_flag) \
+  kudu::fault_injection::MaybeInjectFixedLatency(ms_flag)
+
+// With some probability, return the status described by 'status_expr'.
+// This will not evaluate 'status_expr' if 'fraction_flag' is zero.
+#define MAYBE_RETURN_FAILURE(fraction_flag, status_expr) \
+  if (kudu::fault_injection::MaybeTrue(fraction_flag)) { \
+    RETURN_NOT_OK((status_expr)); \
+  }
+
+// Implementation details below.
+// Use the MAYBE_FAULT macro instead.
+namespace kudu {
+namespace fault_injection {
+
+// The exit status returned from a process exiting due to a fault.
+// The choice of value here is arbitrary: just needs to be something
+// wouldn't normally be returned by a non-fault-injection code path.
+constexpr int kExitStatus = 85;
+
+// Out-of-line implementation.
+void DoMaybeFault(const char* fault_str, double fraction);
+void DoInjectRandomLatency(double max_latency_ms);
+void DoInjectFixedLatency(int32_t latency_ms);
+bool DoMaybeTrue(double fraction);
+
+inline bool MaybeTrue(double fraction) {
+  if (PREDICT_TRUE(fraction <= 0)) return false;
+  return DoMaybeTrue(fraction);
+}
+
+inline void MaybeFault(const char* fault_str, double fraction) {
+  if (PREDICT_TRUE(fraction <= 0)) return;
+  DoMaybeFault(fault_str, fraction);
+}
+
+inline void MaybeInjectRandomLatency(double max_latency) {
+  if (PREDICT_TRUE(max_latency <= 0)) return;
+  DoInjectRandomLatency(max_latency);
+}
+
+inline void MaybeInjectFixedLatency(int32_t latency) {
+  if (PREDICT_TRUE(latency <= 0)) return;
+  DoInjectFixedLatency(latency);
+}
+
+
+} // namespace fault_injection
+} // namespace kudu
+#endif /* KUDU_UTIL_FAULT_INJECTION_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/file_cache-stress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/file_cache-stress-test.cc b/be/src/kudu/util/file_cache-stress-test.cc
new file mode 100644
index 0000000..9c51a52
--- /dev/null
+++ b/be/src/kudu/util/file_cache-stress-test.cc
@@ -0,0 +1,402 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/file_cache.h"
+
+#include <cstddef>
+#include <cstdint>
+#include <deque>
+#include <iterator>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/file_cache-test-util.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/oid_generator.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DEFINE_int32(test_num_producer_threads, 1, "Number of producer threads");
+DEFINE_int32(test_num_consumer_threads, 4, "Number of consumer threads");
+DEFINE_int32(test_duration_secs, 2, "Number of seconds to run the test");
+
+DECLARE_bool(cache_force_single_shard);
+
+using std::deque;
+using std::shared_ptr;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::unordered_map;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+// FD limit to enforce during the test.
+static const int kTestMaxOpenFiles = 100;
+
+template <class FileType>
+class FileCacheStressTest : public KuduTest {
+
+// Like CHECK_OK(), but dumps the contents of the cache before failing.
+//
+// The output of ToDebugString() tends to be long enough that LOG() truncates
+// it, so we must split it ourselves before logging.
+#define TEST_CHECK_OK(to_call) do {                                       \
+    const Status& _s = (to_call);                                         \
+    if (!_s.ok()) {                                                       \
+      LOG(INFO) << "Dumping cache contents";                              \
+      vector<string> lines = strings::Split(cache_->ToDebugString(), "\n",\
+                                            strings::SkipEmpty());        \
+      for (const auto& l : lines) {                                       \
+        LOG(INFO) << l;                                                   \
+      }                                                                   \
+    }                                                                     \
+    CHECK(_s.ok()) << "Bad status: " << _s.ToString();                    \
+  } while (0);
+
+ public:
+  typedef unordered_map<string, unordered_map<string, int>> MetricMap;
+
+  FileCacheStressTest()
+      : rand_(SeedRandom()),
+        running_(1) {
+    // Use a single shard. Otherwise, the cache can be a little bit "sloppy"
+    // depending on the number of CPUs on the system.
+    FLAGS_cache_force_single_shard = true;
+    cache_.reset(new FileCache<FileType>("test",
+                                         env_,
+                                         kTestMaxOpenFiles,
+                                         scoped_refptr<MetricEntity>()));
+  }
+
+  void SetUp() override {
+    ASSERT_OK(cache_->Init());
+  }
+
+  void ProducerThread() {
+    Random rand(rand_.Next32());
+    ObjectIdGenerator oid_generator;
+    MetricMap metrics;
+
+    do {
+      // Create a new file with some (0-32k) random data in it.
+      string next_file_name = GetTestPath(oid_generator.Next());
+      {
+        unique_ptr<WritableFile> next_file;
+        CHECK_OK(env_->NewWritableFile(next_file_name, &next_file));
+        uint8_t buf[rand.Uniform((32 * 1024) - 1) + 1];
+        CHECK_OK(next_file->Append(GenerateRandomChunk(buf, sizeof(buf), &rand)));
+        CHECK_OK(next_file->Close());
+      }
+      {
+        std::lock_guard<simple_spinlock> l(lock_);
+        InsertOrDie(&available_files_, next_file_name, 0);
+      }
+      metrics[BaseName(next_file_name)]["create"] = 1;
+    } while (!running_.WaitFor(MonoDelta::FromMilliseconds(1)));
+
+    // Update the global metrics map.
+    MergeNewMetrics(std::move(metrics));
+  }
+
+  void ConsumerThread() {
+    // Each thread has its own PRNG to minimize contention on the main one.
+    Random rand(rand_.Next32());
+
+    // Active opened files in this thread.
+    deque<shared_ptr<FileType>> files;
+
+    // Metrics generated by this thread. They will be merged into the main
+    // metrics map when the thread is done.
+    MetricMap metrics;
+
+    do {
+      // Pick an action to perform. Distribution:
+      // 20% open
+      // 15% close
+      // 35% read
+      // 20% write
+      // 10% delete
+      int next_action = rand.Uniform(100);
+
+      if (next_action < 20) {
+        // Open an existing file.
+        string to_open;
+        if (!GetRandomFile(OPEN, &rand, &to_open)) {
+          continue;
+        }
+        shared_ptr<FileType> new_file;
+        TEST_CHECK_OK(cache_->OpenExistingFile(to_open, &new_file));
+        FinishedOpen(to_open);
+        metrics[BaseName(to_open)]["open"]++;
+        files.emplace_back(new_file);
+      } else if (next_action < 35) {
+        // Close a file.
+        if (files.empty()) {
+          continue;
+        }
+        shared_ptr<FileType> file = files.front();
+        files.pop_front();
+        metrics[BaseName(file->filename())]["close"]++;
+      } else if (next_action < 70) {
+        // Read a random chunk from a file.
+        TEST_CHECK_OK(ReadRandomChunk(files, &metrics, &rand));
+      } else if (next_action < 90) {
+        // Write a random chunk to a file.
+        TEST_CHECK_OK(WriteRandomChunk(files, &metrics, &rand));
+      } else if (next_action < 100) {
+        // Delete a file.
+        string to_delete;
+        if (!GetRandomFile(DELETE, &rand, &to_delete)) {
+          continue;
+        }
+        TEST_CHECK_OK(cache_->DeleteFile(to_delete));
+        metrics[BaseName(to_delete)]["delete"]++;
+      }
+    } while (!running_.WaitFor(MonoDelta::FromMilliseconds(1)));
+
+    // Update the global metrics map.
+    MergeNewMetrics(std::move(metrics));
+  }
+
+ protected:
+  void NotifyThreads() { running_.CountDown(); }
+
+  const MetricMap& metrics() const { return metrics_; }
+
+ private:
+  enum GetMode {
+    OPEN,
+    DELETE
+  };
+
+  // Retrieve a random file name to be either opened or deleted. If deleting,
+  // the file name is made inaccessible to future operations.
+  bool GetRandomFile(GetMode mode, Random* rand, string* out) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    if (available_files_.empty()) {
+      return false;
+    }
+
+    // This is linear time, but it's simpler than managing multiple data
+    // structures.
+    auto it = available_files_.begin();
+    std::advance(it, rand->Uniform(available_files_.size()));
+
+    // It's unsafe to delete a file that is still being opened.
+    if (mode == DELETE && it->second > 0) {
+      return false;
+    }
+
+    *out = it->first;
+    if (mode == OPEN) {
+      it->second++;
+    } else {
+      available_files_.erase(it);
+    }
+    return true;
+  }
+
+  // Signal that a previously in-progress open has finished, allowing the file
+  // in question to be deleted.
+  void FinishedOpen(const string& opened) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    int& openers = FindOrDie(available_files_, opened);
+    openers--;
+  }
+
+  // Reads a random chunk of data from a random file in 'files'. On success,
+  // writes to 'metrics'.
+  static Status ReadRandomChunk(const deque<shared_ptr<FileType>>& files,
+                                MetricMap* metrics,
+                                Random* rand) {
+    if (files.empty()) {
+      return Status::OK();
+    }
+    const shared_ptr<FileType>& file = files[rand->Uniform(files.size())];
+
+    uint64_t file_size;
+    RETURN_NOT_OK(file->Size(&file_size));
+    uint64_t off = file_size > 0 ? rand->Uniform(file_size) : 0;
+    size_t len = file_size > 0 ? rand->Uniform(file_size - off) : 0;
+    unique_ptr<uint8_t[]> scratch(new uint8_t[len]);
+    RETURN_NOT_OK(file->Read(off, Slice(scratch.get(), len)));
+
+    (*metrics)[BaseName(file->filename())]["read"]++;
+    return Status::OK();
+  }
+
+  // Writes a random chunk of data to a random file in 'files'. On success,
+  // updates 'metrics'.
+  //
+  // No-op for file implementations that don't support writing.
+  static Status WriteRandomChunk(const deque<shared_ptr<FileType>>& files,
+                                 MetricMap* metrics,
+                                 Random* rand);
+
+  static Slice GenerateRandomChunk(uint8_t* buffer, size_t max_length, Random* rand) {
+    size_t len = rand->Uniform(max_length);
+    len -= len % sizeof(uint32_t);
+    for (int i = 0; i < (len / sizeof(uint32_t)); i += sizeof(uint32_t)) {
+      reinterpret_cast<uint32_t*>(buffer)[i] = rand->Next32();
+    }
+    return Slice(buffer, len);
+  }
+
+  // Merge the metrics in 'new_metrics' into the global metric map.
+  void MergeNewMetrics(MetricMap new_metrics) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    for (const auto& file_action_pair : new_metrics) {
+      for (const auto& action_count_pair : file_action_pair.second) {
+        metrics_[file_action_pair.first][action_count_pair.first] += action_count_pair.second;
+      }
+    }
+  }
+
+  unique_ptr<FileCache<FileType>> cache_;
+
+  // Used to seed per-thread PRNGs.
+  ThreadSafeRandom rand_;
+
+  // Drops to zero when the test ends.
+  CountDownLatch running_;
+
+  // Protects 'available_files_' and 'metrics_'.
+  simple_spinlock lock_;
+
+  // Contains files produced by producer threads and ready for consumption by
+  // consumer threads.
+  //
+  // Each entry is a file name and the number of in-progress openers. To delete
+  // a file, there must be no openers.
+  unordered_map<string, int> available_files_;
+
+  // For each file name, tracks the count of consumer actions performed.
+  //
+  // Only updated at test end.
+  MetricMap metrics_;
+};
+
+template <>
+Status FileCacheStressTest<RWFile>::WriteRandomChunk(
+    const deque<shared_ptr<RWFile>>& files,
+    MetricMap* metrics,
+    Random* rand) {
+  if (files.empty()) {
+    return Status::OK();
+  }
+  const shared_ptr<RWFile>& file = files[rand->Uniform(files.size())];
+
+  uint64_t file_size;
+  RETURN_NOT_OK(file->Size(&file_size));
+  uint64_t off = file_size > 0 ? rand->Uniform(file_size) : 0;
+  uint8_t buf[64];
+  RETURN_NOT_OK(file->Write(off, GenerateRandomChunk(buf, sizeof(buf), rand)));
+  (*metrics)[BaseName(file->filename())]["write"]++;
+  return Status::OK();
+}
+
+template <>
+Status FileCacheStressTest<RandomAccessFile>::WriteRandomChunk(
+    const deque<shared_ptr<RandomAccessFile>>& /* unused */,
+    MetricMap* /* unused */,
+    Random* /* unused */) {
+  return Status::OK();
+}
+
+typedef ::testing::Types<RWFile, RandomAccessFile> FileTypes;
+TYPED_TEST_CASE(FileCacheStressTest, FileTypes);
+
+TYPED_TEST(FileCacheStressTest, TestStress) {
+  OverrideFlagForSlowTests("test_num_producer_threads", "2");
+  OverrideFlagForSlowTests("test_num_consumer_threads", "8");
+  OverrideFlagForSlowTests("test_duration_secs", "30");
+
+  // Start the threads.
+  PeriodicOpenFdChecker checker(
+      this->env_,
+      this->GetTestPath("*"),           // only count within our test dir
+      kTestMaxOpenFiles +               // cache capacity
+      FLAGS_test_num_producer_threads + // files being written
+      FLAGS_test_num_consumer_threads); // files being opened
+  checker.Start();
+  vector<thread> producers;
+  for (int i = 0; i < FLAGS_test_num_producer_threads; i++) {
+    producers.emplace_back(&FileCacheStressTest<TypeParam>::ProducerThread, this);
+  }
+  vector<thread> consumers;
+  for (int i = 0; i < FLAGS_test_num_consumer_threads; i++) {
+    consumers.emplace_back(&FileCacheStressTest<TypeParam>::ConsumerThread, this);
+  }
+
+  // Let the test run.
+  SleepFor(MonoDelta::FromSeconds(FLAGS_test_duration_secs));
+
+  // Stop the threads.
+  this->NotifyThreads();
+  checker.Stop();
+  for (auto& p : producers) {
+    p.join();
+  }
+  for (auto& c : consumers) {
+    c.join();
+  }
+
+  // Log the metrics.
+  unordered_map<string, int> action_counts;
+  for (const auto& file_action_pair : this->metrics()) {
+    for (const auto& action_count_pair : file_action_pair.second) {
+      VLOG(2) << Substitute("$0: $1: $2",
+                            file_action_pair.first,
+                            action_count_pair.first,
+                            action_count_pair.second);
+      action_counts[action_count_pair.first] += action_count_pair.second;
+    }
+  }
+  for (const auto& action_count_pair : action_counts) {
+    LOG(INFO) << Substitute("$0: $1",
+                            action_count_pair.first,
+                            action_count_pair.second);
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/file_cache-test-util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/file_cache-test-util.h b/be/src/kudu/util/file_cache-test-util.h
new file mode 100644
index 0000000..09acb68
--- /dev/null
+++ b/be/src/kudu/util/file_cache-test-util.h
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <thread>
+
+#include <glog/logging.h>
+#include <string>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+// Periodically checks the number of open file descriptors belonging to this
+// process, crashing if it exceeds some upper bound.
+class PeriodicOpenFdChecker {
+ public:
+  // path_pattern: a glob-style pattern of which paths should be included while
+  //               counting file descriptors
+  // upper_bound:  the maximum number of file descriptors that should be open
+  //               at any point in time
+  PeriodicOpenFdChecker(Env* env, std::string path_pattern, int upper_bound)
+    : env_(env),
+      path_pattern_(std::move(path_pattern)),
+      initial_fd_count_(CountOpenFds(env, path_pattern_)),
+      max_fd_count_(upper_bound + initial_fd_count_),
+      running_(1),
+      started_(false) {}
+
+  ~PeriodicOpenFdChecker() { Stop(); }
+
+  void Start() {
+    DCHECK(!started_);
+    running_.Reset(1);
+    check_thread_ = std::thread(&PeriodicOpenFdChecker::CheckThread, this);
+    started_ = true;
+  }
+
+  void Stop() {
+    if (started_) {
+      running_.CountDown();
+      check_thread_.join();
+      started_ = false;
+    }
+  }
+
+ private:
+  void CheckThread() {
+    LOG(INFO) << strings::Substitute(
+        "Periodic open fd checker starting for path pattern $0"
+        "(initial: $1 max: $2)",
+        path_pattern_, initial_fd_count_, max_fd_count_);
+    do {
+      int open_fd_count = CountOpenFds(env_, path_pattern_);
+      KLOG_EVERY_N_SECS(INFO, 1) << strings::Substitute("Open fd count: $0/$1",
+                                                        open_fd_count,
+                                                        max_fd_count_);
+      CHECK_LE(open_fd_count, max_fd_count_);
+    } while (!running_.WaitFor(MonoDelta::FromMilliseconds(100)));
+  }
+
+  Env* env_;
+  const std::string path_pattern_;
+  const int initial_fd_count_;
+  const int max_fd_count_;
+
+  CountDownLatch running_;
+  std::thread check_thread_;
+  bool started_;
+};
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/file_cache-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/file_cache-test.cc b/be/src/kudu/util/file_cache-test.cc
new file mode 100644
index 0000000..94c09eb
--- /dev/null
+++ b/be/src/kudu/util/file_cache-test.cc
@@ -0,0 +1,361 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/file_cache.h"
+
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/metrics.h"  // IWYU pragma: keep
+#include "kudu/util/random.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(cache_force_single_shard);
+DECLARE_int32(file_cache_expiry_period_ms);
+
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+template <class FileType>
+class FileCacheTest : public KuduTest {
+ public:
+  FileCacheTest()
+      : rand_(SeedRandom()) {
+    // Simplify testing of the actual cache capacity.
+    FLAGS_cache_force_single_shard = true;
+
+    // Speed up tests that check the number of descriptors.
+    FLAGS_file_cache_expiry_period_ms = 1;
+
+    // libunwind internally uses two file descriptors as a pipe.
+    // Make sure it gets initialized early so that our fd count
+    // doesn't get affected by it.
+    ignore_result(GetStackTraceHex());
+    initial_open_fds_ = CountOpenFds();
+  }
+
+  int CountOpenFds() const {
+    // Only count files in the test working directory so that we don't
+    // accidentally count other fds that might be opened or closed in
+    // the background by other threads.
+    return kudu::CountOpenFds(env_, GetTestPath("*"));
+  }
+
+  void SetUp() override {
+    KuduTest::SetUp();
+    ASSERT_OK(ReinitCache(1));
+  }
+
+ protected:
+  Status ReinitCache(int max_open_files) {
+    cache_.reset(new FileCache<FileType>("test",
+                                         env_,
+                                         max_open_files,
+                                         nullptr));
+    return cache_->Init();
+  }
+
+  Status WriteTestFile(const string& name, const string& data) {
+    unique_ptr<RWFile> f;
+    RETURN_NOT_OK(env_->NewRWFile(name, &f));
+    RETURN_NOT_OK(f->Write(0, data));
+    return Status::OK();
+  }
+
+  void AssertFdsAndDescriptors(int num_expected_fds,
+                               int num_expected_descriptors) {
+    ASSERT_EQ(initial_open_fds_ + num_expected_fds, CountOpenFds());
+
+    // The expiry thread may take some time to run.
+    ASSERT_EVENTUALLY([&]() {
+      ASSERT_EQ(num_expected_descriptors, cache_->NumDescriptorsForTests());
+    });
+  }
+
+  Random rand_;
+  int initial_open_fds_;
+  unique_ptr<FileCache<FileType>> cache_;
+};
+
+typedef ::testing::Types<RWFile, RandomAccessFile> FileTypes;
+TYPED_TEST_CASE(FileCacheTest, FileTypes);
+
+TYPED_TEST(FileCacheTest, TestBasicOperations) {
+  // Open a non-existent file.
+  {
+    shared_ptr<TypeParam> f;
+    ASSERT_TRUE(this->cache_->OpenExistingFile(
+        "/does/not/exist", &f).IsNotFound());
+    NO_FATALS(this->AssertFdsAndDescriptors(0, 0));
+  }
+
+  const string kFile1 = this->GetTestPath("foo");
+  const string kFile2 = this->GetTestPath("bar");
+  const string kData1 = "test data 1";
+  const string kData2 = "test data 2";
+
+  // Create some test files.
+  ASSERT_OK(this->WriteTestFile(kFile1, kData1));
+  ASSERT_OK(this->WriteTestFile(kFile2, kData2));
+  NO_FATALS(this->AssertFdsAndDescriptors(0, 0));
+
+  {
+    // Open a test file. It should open an fd and create a descriptor.
+    shared_ptr<TypeParam> f1;
+    ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f1));
+    NO_FATALS(this->AssertFdsAndDescriptors(1, 1));
+
+    // Spot check the test data by comparing sizes.
+    for (int i = 0; i < 3; i++) {
+      uint64_t size;
+      ASSERT_OK(f1->Size(&size));
+      ASSERT_EQ(kData1.size(), size);
+      NO_FATALS(this->AssertFdsAndDescriptors(1, 1));
+    }
+
+    // Open the same file a second time. It should reuse the existing
+    // descriptor and not open a second fd.
+    shared_ptr<TypeParam> f2;
+    ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f2));
+    NO_FATALS(this->AssertFdsAndDescriptors(1, 1));
+    {
+      Cache::UniqueHandle uh(
+          this->cache_->cache_->Lookup(kFile1, Cache::EXPECT_IN_CACHE),
+          Cache::HandleDeleter(this->cache_->cache_.get()));
+      ASSERT_TRUE(uh.get());
+    }
+
+    // Open a second file. This will create a new descriptor, but evict the fd
+    // opened for the first file, so the fd count should remain constant.
+    shared_ptr<TypeParam> f3;
+    ASSERT_OK(this->cache_->OpenExistingFile(kFile2, &f3));
+    NO_FATALS(this->AssertFdsAndDescriptors(1, 2));
+    {
+      Cache::UniqueHandle uh(
+          this->cache_->cache_->Lookup(kFile1, Cache::EXPECT_IN_CACHE),
+          Cache::HandleDeleter(this->cache_->cache_.get()));
+      ASSERT_FALSE(uh.get());
+    }
+    {
+      Cache::UniqueHandle uh(
+          this->cache_->cache_->Lookup(kFile2, Cache::EXPECT_IN_CACHE),
+          Cache::HandleDeleter(this->cache_->cache_.get()));
+      ASSERT_TRUE(uh.get());
+    }
+  }
+
+  // The descriptors are all out of scope, but the open fds remain in the cache.
+  NO_FATALS(this->AssertFdsAndDescriptors(1, 0));
+
+  // With the cache gone, so are the cached fds.
+  this->cache_.reset();
+  ASSERT_EQ(this->initial_open_fds_, this->CountOpenFds());
+}
+
+TYPED_TEST(FileCacheTest, TestDeletion) {
+  // Deleting a file that doesn't exist does nothing/
+  ASSERT_TRUE(this->cache_->DeleteFile("/does/not/exist").IsNotFound());
+
+  // Create a test file, then delete it. It will be deleted immediately.
+  const string kFile1 = this->GetTestPath("foo");
+  const string kData1 = "test data 1";
+  ASSERT_OK(this->WriteTestFile(kFile1, kData1));
+  ASSERT_TRUE(this->env_->FileExists(kFile1));
+  ASSERT_OK(this->cache_->DeleteFile(kFile1));
+  ASSERT_FALSE(this->env_->FileExists(kFile1));
+
+  // Trying to delete it again fails.
+  ASSERT_TRUE(this->cache_->DeleteFile(kFile1).IsNotFound());
+
+  // Create another test file, open it, then delete it. The delete is not
+  // effected until the last open descriptor is closed. In between, the
+  // cache won't allow the file to be opened again.
+  const string kFile2 = this->GetTestPath("bar");
+  const string kData2 = "test data 2";
+  ASSERT_OK(this->WriteTestFile(kFile2, kData2));
+  ASSERT_TRUE(this->env_->FileExists(kFile2));
+  {
+    shared_ptr<TypeParam> f1;
+    ASSERT_OK(this->cache_->OpenExistingFile(kFile2, &f1));
+    ASSERT_EQ(this->initial_open_fds_ + 1, this->CountOpenFds());
+    ASSERT_OK(this->cache_->DeleteFile(kFile2));
+    {
+      shared_ptr<TypeParam> f2;
+      ASSERT_TRUE(this->cache_->OpenExistingFile(kFile2, &f2).IsNotFound());
+    }
+    ASSERT_TRUE(this->cache_->DeleteFile(kFile2).IsNotFound());
+    ASSERT_TRUE(this->env_->FileExists(kFile2));
+    ASSERT_EQ(this->initial_open_fds_ + 1, this->CountOpenFds());
+  }
+  ASSERT_FALSE(this->env_->FileExists(kFile2));
+  ASSERT_EQ(this->initial_open_fds_, this->CountOpenFds());
+
+  // Create a test file, open it, and let it go out of scope before
+  // deleting it. The deletion should evict the fd and close it, despite
+  // happening after the descriptor is gone.
+  const string kFile3 = this->GetTestPath("baz");
+  const string kData3 = "test data 3";
+  ASSERT_OK(this->WriteTestFile(kFile3, kData3));
+  {
+    shared_ptr<TypeParam> f3;
+    ASSERT_OK(this->cache_->OpenExistingFile(kFile3, &f3));
+  }
+  ASSERT_TRUE(this->env_->FileExists(kFile3));
+  ASSERT_EQ(this->initial_open_fds_ + 1, this->CountOpenFds());
+  ASSERT_OK(this->cache_->DeleteFile(kFile3));
+  ASSERT_FALSE(this->env_->FileExists(kFile3));
+  ASSERT_EQ(this->initial_open_fds_, this->CountOpenFds());
+}
+
+TYPED_TEST(FileCacheTest, TestInvalidation) {
+  const string kFile1 = this->GetTestPath("foo");
+  const string kData1 = "test data 1";
+  ASSERT_OK(this->WriteTestFile(kFile1, kData1));
+
+  // Open the file.
+  shared_ptr<TypeParam> f;
+  ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f));
+
+  // Write a new file and rename it in place on top of file1.
+  const string kFile2 = this->GetTestPath("foo2");
+  const string kData2 = "test data 2 (longer than original)";
+  ASSERT_OK(this->WriteTestFile(kFile2, kData2));
+  ASSERT_OK(this->env_->RenameFile(kFile2, kFile1));
+
+  // We should still be able to access the file, since it has a cached fd.
+  uint64_t size;
+  ASSERT_OK(f->Size(&size));
+  ASSERT_EQ(kData1.size(), size);
+
+  // If we invalidate it from the cache and try again, it should crash because
+  // the existing descriptor was invalidated.
+  this->cache_->Invalidate(kFile1);
+  ASSERT_DEATH({ f->Size(&size); }, "invalidated");
+
+  // But if we re-open the path again, the new descriptor should read the
+  // new data.
+  shared_ptr<TypeParam> f2;
+  ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f2));
+  ASSERT_OK(f2->Size(&size));
+  ASSERT_EQ(kData2.size(), size);
+}
+
+
+TYPED_TEST(FileCacheTest, TestHeavyReads) {
+  const int kNumFiles = 20;
+  const int kNumIterations = 100;
+  const int kCacheCapacity = 5;
+
+  ASSERT_OK(this->ReinitCache(kCacheCapacity));
+
+  // Randomly generate some data.
+  string data;
+  for (int i = 0; i < 1000; i++) {
+    data += Substitute("$0", this->rand_.Next());
+  }
+
+  // Write that data to a bunch of files and open them through the cache.
+  vector<shared_ptr<TypeParam>> opened_files;
+  for (int i = 0; i < kNumFiles; i++) {
+    string filename = this->GetTestPath(Substitute("$0", i));
+    ASSERT_OK(this->WriteTestFile(filename, data));
+    shared_ptr<TypeParam> f;
+    ASSERT_OK(this->cache_->OpenExistingFile(filename, &f));
+    opened_files.push_back(f);
+  }
+
+  // Read back the data at random through the cache.
+  unique_ptr<uint8_t[]> buf(new uint8_t[data.length()]);
+  for (int i = 0; i < kNumIterations; i++) {
+    int idx = this->rand_.Uniform(opened_files.size());
+    const auto& f = opened_files[idx];
+    uint64_t size;
+    ASSERT_OK(f->Size(&size));
+    Slice s(buf.get(), size);
+    ASSERT_OK(f->Read(0, s));
+    ASSERT_EQ(data, s);
+    ASSERT_LE(this->CountOpenFds(),
+              this->initial_open_fds_ + kCacheCapacity);
+  }
+}
+
+TYPED_TEST(FileCacheTest, TestNoRecursiveDeadlock) {
+  // This test triggered a deadlock in a previous implementation, when expired
+  // weak_ptrs were removed from the descriptor map in the descriptor's
+  // destructor.
+  alarm(60);
+  auto cleanup = MakeScopedCleanup([]() {
+    alarm(0);
+  });
+
+  const string kFile = this->GetTestPath("foo");
+  ASSERT_OK(this->WriteTestFile(kFile, "test data"));
+
+  vector<std::thread> threads;
+  for (int i = 0; i < 2; i++) {
+    threads.emplace_back([&]() {
+      for (int i = 0; i < 10000; i++) {
+        shared_ptr<TypeParam> f;
+        CHECK_OK(this->cache_->OpenExistingFile(kFile, &f));
+      }
+    });
+  }
+
+  for (auto& t : threads) {
+    t.join();
+  }
+}
+
+class RandomAccessFileCacheTest : public FileCacheTest<RandomAccessFile> {
+};
+
+TEST_F(RandomAccessFileCacheTest, TestMemoryFootprintDoesNotCrash) {
+  const string kFile = this->GetTestPath("foo");
+  ASSERT_OK(this->WriteTestFile(kFile, "test data"));
+
+  shared_ptr<RandomAccessFile> f;
+  ASSERT_OK(this->cache_->OpenExistingFile(kFile, &f));
+
+  // This used to crash due to a kudu_malloc_usable_size() call on a memory
+  // address that wasn't the start of an actual heap allocation.
+  LOG(INFO) << f->memory_footprint();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/file_cache.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/file_cache.cc b/be/src/kudu/util/file_cache.cc
new file mode 100644
index 0000000..a1ab814
--- /dev/null
+++ b/be/src/kudu/util/file_cache.cc
@@ -0,0 +1,654 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/file_cache.h"
+
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/array_view.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"  // IWYU pragma: keep
+#include "kudu/util/monotime.h"
+#include "kudu/util/once.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+DEFINE_int32(file_cache_expiry_period_ms, 60 * 1000,
+             "Period of time (in ms) between removing expired file cache descriptors");
+TAG_FLAG(file_cache_expiry_period_ms, advanced);
+
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+namespace {
+
+template <class FileType>
+FileType* CacheValueToFileType(Slice s) {
+  return reinterpret_cast<FileType*>(*reinterpret_cast<void**>(
+      s.mutable_data()));
+}
+
+template <class FileType>
+class EvictionCallback : public Cache::EvictionCallback {
+ public:
+  EvictionCallback() {}
+
+  void EvictedEntry(Slice key, Slice value) override {
+    VLOG(2) << "Evicted fd belonging to " << key.ToString();
+    delete CacheValueToFileType<FileType>(value);
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(EvictionCallback);
+};
+
+} // anonymous namespace
+
+namespace internal {
+
+template <class FileType>
+class ScopedOpenedDescriptor;
+
+// Encapsulates common descriptor fields and methods.
+template <class FileType>
+class BaseDescriptor {
+ public:
+  BaseDescriptor(FileCache<FileType>* file_cache,
+                 string filename)
+      : file_cache_(file_cache),
+        file_name_(std::move(filename)) {}
+
+  ~BaseDescriptor() {
+    VLOG(2) << "Out of scope descriptor with file name: " << filename();
+
+    // The (now expired) weak_ptr remains in 'descriptors_', to be removed by
+    // the next call to RunDescriptorExpiry(). Removing it here would risk a
+    // deadlock on recursive acquisition of 'lock_'.
+
+    if (deleted()) {
+      cache()->Erase(filename());
+
+      VLOG(1) << "Deleting file: " << filename();
+      WARN_NOT_OK(env()->DeleteFile(filename()), "");
+    }
+  }
+
+  // Insert a pointer to an open file object into the file cache with the
+  // filename as the cache key.
+  //
+  // Returns a handle to the inserted entry. The handle always contains an open
+  // file.
+  ScopedOpenedDescriptor<FileType> InsertIntoCache(void* file_ptr) const {
+    // The allocated charge is always one byte. This is incorrect with respect
+    // to memory tracking, but it's necessary if the cache capacity is to be
+    // equivalent to the max number of fds.
+    Cache::PendingHandle* pending = CHECK_NOTNULL(cache()->Allocate(
+        filename(), sizeof(file_ptr), 1));
+    memcpy(cache()->MutableValue(pending),
+           &file_ptr,
+           sizeof(file_ptr));
+    return ScopedOpenedDescriptor<FileType>(this, Cache::UniqueHandle(
+        cache()->Insert(pending, file_cache_->eviction_cb_.get()),
+        Cache::HandleDeleter(cache())));
+  }
+
+  // Retrieves a pointer to an open file object from the file cache with the
+  // filename as the cache key.
+  //
+  // Returns a handle to the looked up entry. The handle may or may not contain
+  // an open file, depending on whether the cache hit or missed.
+  ScopedOpenedDescriptor<FileType> LookupFromCache() const {
+    return ScopedOpenedDescriptor<FileType>(this, Cache::UniqueHandle(
+        cache()->Lookup(filename(), Cache::EXPECT_IN_CACHE),
+        Cache::HandleDeleter(cache())));
+  }
+
+  // Mark this descriptor as to-be-deleted later.
+  void MarkDeleted() {
+    DCHECK(!deleted());
+    while (true) {
+      auto v = flags_.load();
+      if (flags_.compare_exchange_weak(v, v | FILE_DELETED)) return;
+    }
+  }
+
+  // Mark this descriptor as invalidated. No further access is allowed
+  // to this file.
+  void MarkInvalidated() {
+    DCHECK(!invalidated());
+    while (true) {
+      auto v = flags_.load();
+      if (flags_.compare_exchange_weak(v, v | INVALIDATED)) return;
+    }
+  }
+
+  Cache* cache() const { return file_cache_->cache_.get(); }
+
+  Env* env() const { return file_cache_->env_; }
+
+  const string& filename() const { return file_name_; }
+
+  bool deleted() const { return flags_.load() & FILE_DELETED; }
+  bool invalidated() const { return flags_.load() & INVALIDATED; }
+
+ private:
+  FileCache<FileType>* file_cache_;
+  const string file_name_;
+  enum Flags {
+    FILE_DELETED = 1 << 0,
+    INVALIDATED = 1 << 1
+  };
+  std::atomic<uint8_t> flags_ {0};
+
+  DISALLOW_COPY_AND_ASSIGN(BaseDescriptor);
+};
+
+// A "smart" retrieved LRU cache handle.
+//
+// The cache handle is released when this object goes out of scope, possibly
+// closing the opened file if it is no longer in the cache.
+template <class FileType>
+class ScopedOpenedDescriptor {
+ public:
+  // A not-yet-but-soon-to-be opened descriptor.
+  explicit ScopedOpenedDescriptor(const BaseDescriptor<FileType>* desc)
+      : desc_(desc),
+        handle_(nullptr, Cache::HandleDeleter(desc_->cache())) {
+  }
+
+  // An opened descriptor. Its handle may or may not contain an open file.
+  ScopedOpenedDescriptor(const BaseDescriptor<FileType>* desc,
+                         Cache::UniqueHandle handle)
+      : desc_(desc),
+        handle_(std::move(handle)) {
+  }
+
+  bool opened() const { return handle_.get(); }
+
+  FileType* file() const {
+    DCHECK(opened());
+    return CacheValueToFileType<FileType>(desc_->cache()->Value(handle_.get()));
+  }
+
+ private:
+  const BaseDescriptor<FileType>* desc_;
+  Cache::UniqueHandle handle_;
+};
+
+// Reference to an on-disk file that may or may not be opened (and thus
+// cached) in the file cache.
+//
+// This empty template is just a specification; actual descriptor classes must
+// be fully specialized.
+template <class FileType>
+class Descriptor : public FileType {
+};
+
+// A descriptor adhering to the RWFile interface (i.e. when opened, provides
+// a read-write interface to the underlying file).
+template <>
+class Descriptor<RWFile> : public RWFile {
+ public:
+  Descriptor(FileCache<RWFile>* file_cache, const string& filename)
+      : base_(file_cache, filename) {}
+
+  ~Descriptor() = default;
+
+  Status Read(uint64_t offset, Slice result) const override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Read(offset, result);
+  }
+
+  Status ReadV(uint64_t offset, ArrayView<Slice> results) const override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->ReadV(offset, results);
+  }
+
+  Status Write(uint64_t offset, const Slice& data) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Write(offset, data);
+  }
+
+  Status WriteV(uint64_t offset, ArrayView<const Slice> data) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->WriteV(offset, data);
+  }
+
+  Status PreAllocate(uint64_t offset, size_t length, PreAllocateMode mode) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->PreAllocate(offset, length, mode);
+  }
+
+  Status Truncate(uint64_t length) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Truncate(length);
+  }
+
+  Status PunchHole(uint64_t offset, size_t length) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->PunchHole(offset, length);
+  }
+
+  Status Flush(FlushMode mode, uint64_t offset, size_t length) override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Flush(mode, offset, length);
+  }
+
+  Status Sync() override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Sync();
+  }
+
+  Status Close() override {
+    // Intentional no-op; actual closing is deferred to LRU cache eviction.
+    return Status::OK();
+  }
+
+  Status Size(uint64_t* size) const override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Size(size);
+  }
+
+  Status GetExtentMap(ExtentMap* out) const override {
+    ScopedOpenedDescriptor<RWFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->GetExtentMap(out);
+  }
+
+  const string& filename() const override {
+    return base_.filename();
+  }
+
+ private:
+  friend class FileCache<RWFile>;
+
+  Status Init() {
+    return once_.Init(&Descriptor<RWFile>::InitOnce, this);
+  }
+
+  Status InitOnce() {
+    return ReopenFileIfNecessary(nullptr);
+  }
+
+  Status ReopenFileIfNecessary(ScopedOpenedDescriptor<RWFile>* out) const {
+    ScopedOpenedDescriptor<RWFile> found(base_.LookupFromCache());
+    CHECK(!base_.invalidated());
+    if (found.opened()) {
+      // The file is already open in the cache, return it.
+      if (out) {
+        *out = std::move(found);
+      }
+      return Status::OK();
+    }
+
+    // The file was evicted, reopen it.
+    //
+    // Because the file may be evicted at any time we must use 'sync_on_close'
+    // (note: sync is a no-op if the file isn't dirty).
+    RWFileOptions opts;
+    opts.sync_on_close = true;
+    opts.mode = Env::OPEN_EXISTING;
+    unique_ptr<RWFile> f;
+    RETURN_NOT_OK(base_.env()->NewRWFile(opts, base_.filename(), &f));
+
+    // The cache will take ownership of the newly opened file.
+    ScopedOpenedDescriptor<RWFile> opened(base_.InsertIntoCache(f.release()));
+    if (out) {
+      *out = std::move(opened);
+    }
+    return Status::OK();
+  }
+
+  BaseDescriptor<RWFile> base_;
+  KuduOnceDynamic once_;
+
+  DISALLOW_COPY_AND_ASSIGN(Descriptor);
+};
+
+// A descriptor adhering to the RandomAccessFile interface (i.e. when opened,
+// provides a read-only interface to the underlying file).
+template <>
+class Descriptor<RandomAccessFile> : public RandomAccessFile {
+ public:
+  Descriptor(FileCache<RandomAccessFile>* file_cache, const string& filename)
+      : base_(file_cache, filename) {}
+
+  ~Descriptor() = default;
+
+  Status Read(uint64_t offset, Slice result) const override {
+    ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Read(offset, result);
+  }
+
+  Status ReadV(uint64_t offset, ArrayView<Slice> results) const override {
+    ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->ReadV(offset, results);
+  }
+
+  Status Size(uint64_t *size) const override {
+    ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
+    RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+    return opened.file()->Size(size);
+  }
+
+  const string& filename() const override {
+    return base_.filename();
+  }
+
+  size_t memory_footprint() const override {
+    // Normally we would use kudu_malloc_usable_size(this). However, that's
+    // not safe because 'this' was allocated via std::make_shared(), which
+    // means it isn't necessarily the base of the memory allocation; it may be
+    // preceded by the shared_ptr control block.
+    //
+    // It doesn't appear possible to get the base of the allocation via any
+    // shared_ptr APIs, so we'll use sizeof(*this) + 16 instead. The 16 bytes
+    // represent the shared_ptr control block. Overall the object size is still
+    // undercounted as it doesn't account for any internal heap fragmentation,
+    // but at least it's safe.
+    //
+    // Some anecdotal memory measurements taken inside gdb:
+    // - glibc 2.23 malloc_usable_size() on make_shared<FileType>: 88 bytes.
+    // - tcmalloc malloc_usable_size() on make_shared<FileType>: 96 bytes.
+    // - sizeof(std::_Sp_counted_base<>) with libstdc++ 5.4: 16 bytes.
+    // - sizeof(std::__1::__shared_ptr_emplace<>) with libc++ 3.9: 16 bytes.
+    // - sizeof(*this): 72 bytes.
+    return sizeof(*this) +
+        16 + // shared_ptr control block
+        once_.memory_footprint_excluding_this() +
+        base_.filename().capacity();
+  }
+
+ private:
+  friend class FileCache<RandomAccessFile>;
+
+  Status Init() {
+    return once_.Init(&Descriptor<RandomAccessFile>::InitOnce, this);
+  }
+
+  Status InitOnce() {
+    return ReopenFileIfNecessary(nullptr);
+  }
+
+  Status ReopenFileIfNecessary(
+      ScopedOpenedDescriptor<RandomAccessFile>* out) const {
+    ScopedOpenedDescriptor<RandomAccessFile> found(base_.LookupFromCache());
+    CHECK(!base_.invalidated());
+    if (found.opened()) {
+      // The file is already open in the cache, return it.
+      if (out) {
+        *out = std::move(found);
+      }
+      return Status::OK();
+    }
+
+    // The file was evicted, reopen it.
+    unique_ptr<RandomAccessFile> f;
+    RETURN_NOT_OK(base_.env()->NewRandomAccessFile(base_.filename(), &f));
+
+    // The cache will take ownership of the newly opened file.
+    ScopedOpenedDescriptor<RandomAccessFile> opened(
+        base_.InsertIntoCache(f.release()));
+    if (out) {
+      *out = std::move(opened);
+    }
+    return Status::OK();
+  }
+
+  BaseDescriptor<RandomAccessFile> base_;
+  KuduOnceDynamic once_;
+
+  DISALLOW_COPY_AND_ASSIGN(Descriptor);
+};
+
+} // namespace internal
+
+template <class FileType>
+FileCache<FileType>::FileCache(const string& cache_name,
+                               Env* env,
+                               int max_open_files,
+                               const scoped_refptr<MetricEntity>& entity)
+    : env_(env),
+      cache_name_(cache_name),
+      eviction_cb_(new EvictionCallback<FileType>()),
+      cache_(NewLRUCache(DRAM_CACHE, max_open_files, cache_name)),
+      running_(1) {
+  if (entity) {
+    cache_->SetMetrics(entity);
+  }
+  LOG(INFO) << Substitute("Constructed file cache $0 with capacity $1",
+                          cache_name, max_open_files);
+}
+
+template <class FileType>
+FileCache<FileType>::~FileCache() {
+  running_.CountDown();
+  if (descriptor_expiry_thread_) {
+    descriptor_expiry_thread_->Join();
+  }
+}
+
+template <class FileType>
+Status FileCache<FileType>::Init() {
+  return Thread::Create("cache", Substitute("$0-evict", cache_name_),
+                        &FileCache::RunDescriptorExpiry, this,
+                        &descriptor_expiry_thread_);
+}
+
+template <class FileType>
+Status FileCache<FileType>::OpenExistingFile(const string& file_name,
+                                             shared_ptr<FileType>* file) {
+  shared_ptr<internal::Descriptor<FileType>> desc;
+  {
+    // Find an existing descriptor, or create one if none exists.
+    std::lock_guard<simple_spinlock> l(lock_);
+    RETURN_NOT_OK(FindDescriptorUnlocked(file_name, &desc));
+    if (desc) {
+      VLOG(2) << "Found existing descriptor: " << desc->filename();
+    } else {
+      desc = std::make_shared<internal::Descriptor<FileType>>(this, file_name);
+      InsertOrDie(&descriptors_, file_name, desc);
+      VLOG(2) << "Created new descriptor: " << desc->filename();
+    }
+  }
+
+  // Check that the underlying file can be opened (no-op for found
+  // descriptors). Done outside the lock.
+  RETURN_NOT_OK(desc->Init());
+  *file = std::move(desc);
+  return Status::OK();
+}
+
+template <class FileType>
+Status FileCache<FileType>::DeleteFile(const string& file_name) {
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    shared_ptr<internal::Descriptor<FileType>> desc;
+    RETURN_NOT_OK(FindDescriptorUnlocked(file_name, &desc));
+
+    if (desc) {
+      VLOG(2) << "Marking file for deletion: " << file_name;
+      desc->base_.MarkDeleted();
+      return Status::OK();
+    }
+  }
+
+  // There is no outstanding descriptor. Delete the file now.
+  //
+  // Make sure it's been fully evicted from the cache (perhaps it was opened
+  // previously?) so that the filesystem can reclaim the file data instantly.
+  cache_->Erase(file_name);
+  return env_->DeleteFile(file_name);
+}
+
+template <class FileType>
+void FileCache<FileType>::Invalidate(const string& file_name) {
+  // Ensure that there is an invalidated descriptor in the map for this filename.
+  //
+  // This ensures that any concurrent OpenExistingFile() during this method wil
+  // see the invalidation and issue a CHECK failure.
+  shared_ptr<internal::Descriptor<FileType>> desc;
+  {
+    // Find an existing descriptor, or create one if none exists.
+    std::lock_guard<simple_spinlock> l(lock_);
+    auto it = descriptors_.find(file_name);
+    if (it != descriptors_.end()) {
+      desc = it->second.lock();
+    }
+    if (!desc) {
+      desc = std::make_shared<internal::Descriptor<FileType>>(this, file_name);
+      descriptors_.emplace(file_name, desc);
+    }
+
+    desc->base_.MarkInvalidated();
+  }
+  // Remove it from the cache so that if the same path is opened again, we
+  // will re-open a new FD rather than retrieving one that might have been
+  // cached prior to invalidation.
+  cache_->Erase(file_name);
+
+  // Remove the invalidated descriptor from the map. We are guaranteed it
+  // is still there because we've held a strong reference to it for
+  // the duration of this method, and no other methods erase strong
+  // references from the map.
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    CHECK_EQ(1, descriptors_.erase(file_name));
+  }
+}
+
+template <class FileType>
+int FileCache<FileType>::NumDescriptorsForTests() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return descriptors_.size();
+}
+
+template <class FileType>
+string FileCache<FileType>::ToDebugString() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  string ret;
+  for (const auto& e : descriptors_) {
+    bool strong = false;
+    bool deleted = false;
+    bool opened = false;
+    shared_ptr<internal::Descriptor<FileType>> desc = e.second.lock();
+    if (desc) {
+      strong = true;
+      if (desc->base_.deleted()) {
+        deleted = true;
+      }
+      internal::ScopedOpenedDescriptor<FileType> o(
+          desc->base_.LookupFromCache());
+      if (o.opened()) {
+        opened = true;
+      }
+    }
+    if (strong) {
+      ret += Substitute("$0 (S$1$2)\n", e.first,
+                        deleted ? "D" : "", opened ? "O" : "");
+    } else {
+      ret += Substitute("$0\n", e.first);
+    }
+  }
+  return ret;
+}
+
+template <class FileType>
+Status FileCache<FileType>::FindDescriptorUnlocked(
+    const string& file_name,
+    shared_ptr<internal::Descriptor<FileType>>* file) {
+  DCHECK(lock_.is_locked());
+
+  auto it = descriptors_.find(file_name);
+  if (it != descriptors_.end()) {
+    // Found the descriptor. Has it expired?
+    shared_ptr<internal::Descriptor<FileType>> desc = it->second.lock();
+    if (desc) {
+      CHECK(!desc->base_.invalidated());
+      if (desc->base_.deleted()) {
+        return Status::NotFound("File already marked for deletion", file_name);
+      }
+
+      // Descriptor is still valid, return it.
+      if (file) {
+        *file = desc;
+      }
+      return Status::OK();
+    }
+    // Descriptor has expired; erase it and pretend we found nothing.
+    descriptors_.erase(it);
+  }
+  return Status::OK();
+}
+
+template <class FileType>
+void FileCache<FileType>::RunDescriptorExpiry() {
+  while (!running_.WaitFor(MonoDelta::FromMilliseconds(
+      FLAGS_file_cache_expiry_period_ms))) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    for (auto it = descriptors_.begin(); it != descriptors_.end();) {
+      if (it->second.expired()) {
+        it = descriptors_.erase(it);
+      } else {
+        it++;
+      }
+    }
+  }
+}
+
+// Explicit specialization for callers outside this compilation unit.
+template class FileCache<RWFile>;
+template class FileCache<RandomAccessFile>;
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/file_cache.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/file_cache.h b/be/src/kudu/util/file_cache.h
new file mode 100644
index 0000000..021c758
--- /dev/null
+++ b/be/src/kudu/util/file_cache.h
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Env;
+
+namespace internal {
+
+template <class FileType>
+class BaseDescriptor;
+
+template <class FileType>
+class Descriptor;
+
+} // namespace internal
+
+class MetricEntity;
+class Thread;
+
+// Cache of open files.
+//
+// The purpose of this cache is to enforce an upper bound on the maximum number
+// of files open at a time. Files opened through the cache may be closed at any
+// time, only to be reopened upon next use.
+//
+// The file cache can be viewed as having two logical parts: the client-facing
+// API and the LRU cache.
+//
+// Client-facing API
+// -----------------
+// The core of the client-facing API is the cache descriptor. A descriptor
+// uniquely identifies an opened file. To a client, a descriptor is just an
+// open file interface of the variety defined in util/env.h. Clients open
+// descriptors via the OpenExistingFile() cache method.
+//
+// Descriptors are shared objects; an existing descriptor is handed back to a
+// client if a file with the same name is already opened. To facilitate
+// descriptor sharing, the file cache maintains a by-file-name descriptor map.
+// The values are weak references to the descriptors so that map entries don't
+// affect the descriptor lifecycle.
+//
+// LRU cache
+// ---------
+// The lower half of the file cache is a standard LRU cache whose keys are file
+// names and whose values are pointers to opened file objects allocated on the
+// heap. Unlike the descriptor map, this cache has an upper bound on capacity,
+// and handles are evicted (and closed) according to an LRU algorithm.
+//
+// Whenever a descriptor is used by a client in file I/O, its file name is used
+// in an LRU cache lookup. If found, the underlying file is still open and the
+// file access is performed. Otherwise, the file must have been evicted and
+// closed, so it is reopened and reinserted (possibly evicting a different open
+// file) before the file access is performed.
+//
+// Other notes
+// -----------
+// In a world where files are opened and closed transparently, file deletion
+// demands special care if UNIX semantics are to be preserved. When a call to
+// DeleteFile() is made to a file with an opened descriptor, the descriptor is
+// simply "marked" as to-be-deleted-later. Only when all references to the
+// descriptor are dropped is the file actually deleted. If there is no open
+// descriptor, the file is deleted immediately.
+//
+// Every public method in the file cache is thread safe.
+template <class FileType>
+class FileCache {
+ public:
+  // Creates a new file cache.
+  //
+  // The 'cache_name' is used to disambiguate amongst other file cache
+  // instances. The cache will use 'max_open_files' as a soft upper bound on
+  // the number of files open at any given time.
+  FileCache(const std::string& cache_name,
+            Env* env,
+            int max_open_files,
+            const scoped_refptr<MetricEntity>& entity);
+
+  // Destroys the file cache.
+  ~FileCache();
+
+  // Initializes the file cache. Initialization done here may fail.
+  Status Init();
+
+  // Opens an existing file by name through the cache.
+  //
+  // The returned 'file' is actually an object called a descriptor. It adheres
+  // to a file-like interface but interfaces with the cache under the hood to
+  // reopen a file as needed during file operations.
+  //
+  // The descriptor is opened immediately to verify that the on-disk file can
+  // be opened, but may be closed later if the cache reaches its upper bound on
+  // the number of open files.
+  Status OpenExistingFile(const std::string& file_name,
+                          std::shared_ptr<FileType>* file);
+
+  // Deletes a file by name through the cache.
+  //
+  // If there is an outstanding descriptor for the file, the deletion will be
+  // deferred until the last referent is dropped. Otherwise, the file is
+  // deleted immediately.
+  Status DeleteFile(const std::string& file_name);
+
+  // Invalidate the given path in the cache if present. This removes the
+  // path from the cache, and invalidates any previously-opened descriptors
+  // associated with this file.
+  //
+  // If a file with the same path is opened again, the actual path will be opened from
+  // disk.
+  //
+  // This operation should be used during 'rename-to-replace' patterns, eg:
+  //
+  //    WriteNewDataTo(tmp_path);
+  //    env->RenameFile(tmp_path, p);
+  //    file_cache->Invalidate(p);
+  //
+  // NOTE: if any reader of 'p' holds an open descriptor from the cache
+  // prior to this operation, that descriptor is invalidated and any
+  // further operations on that descriptor will result in a CHECK failure.
+  // Hence this is not safe to use without some external synchronization
+  // which prevents concurrent access to the same file.
+  //
+  // NOTE: this function must not be called concurrently on the same file name
+  // from multiple threads.
+  void Invalidate(const std::string& file_name);
+
+  // Returns the number of entries in the descriptor map.
+  //
+  // Only intended for unit tests.
+  int NumDescriptorsForTests() const;
+
+  // Dumps the contents of the file cache. Intended for debugging.
+  std::string ToDebugString() const;
+
+ private:
+  friend class internal::BaseDescriptor<FileType>;
+
+  template<class FileType2>
+  FRIEND_TEST(FileCacheTest, TestBasicOperations);
+
+  // Looks up a descriptor by file name.
+  //
+  // Must be called with 'lock_' held.
+  Status FindDescriptorUnlocked(
+      const std::string& file_name,
+      std::shared_ptr<internal::Descriptor<FileType>>* file);
+
+  // Periodically removes expired descriptors from 'descriptors_'.
+  void RunDescriptorExpiry();
+
+  // Interface to the underlying filesystem.
+  Env* env_;
+
+  // Name of the cache.
+  const std::string cache_name_;
+
+  // Invoked whenever a cached file reaches zero references (i.e. it was
+  // removed from the cache and is no longer in use by any file operations).
+  std::unique_ptr<Cache::EvictionCallback> eviction_cb_;
+
+  // Underlying cache instance. Caches opened files.
+  std::unique_ptr<Cache> cache_;
+
+  // Protects the descriptor map.
+  mutable simple_spinlock lock_;
+
+  // Maps filenames to descriptors.
+  std::unordered_map<std::string,
+                     std::weak_ptr<internal::Descriptor<FileType>>> descriptors_;
+
+  // Calls RunDescriptorExpiry() in a loop until 'running_' isn't set.
+  scoped_refptr<Thread> descriptor_expiry_thread_;
+
+  // Tracks whether or not 'descriptor_expiry_thread_' should be running.
+  CountDownLatch running_;
+
+  DISALLOW_COPY_AND_ASSIGN(FileCache);
+};
+
+} // namespace kudu