You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:33 UTC
[21/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/errno-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/errno-test.cc b/be/src/kudu/util/errno-test.cc
new file mode 100644
index 0000000..f628b55
--- /dev/null
+++ b/be/src/kudu/util/errno-test.cc
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cerrno>
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/errno.h"
+
+using std::string;
+
+namespace kudu {
+
+TEST(OsUtilTest, TestErrnoToString) {
+ int err = ENOENT;
+
+ // Non-truncated result.
+ ASSERT_EQ("No such file or directory", ErrnoToString(err));
+
+ // Truncated because of a short buffer.
+ char buf[2];
+ ErrnoToCString(err, buf, arraysize(buf));
+ ASSERT_EQ("N", string(buf));
+
+ // Unknown error.
+ string expected = "Unknown error";
+ ASSERT_EQ(ErrnoToString(-1).compare(0, expected.length(), expected), 0);
+
+ // Unknown error (truncated).
+ ErrnoToCString(-1, buf, arraysize(buf));
+ ASSERT_EQ("U", string(buf));
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/errno.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/errno.cc b/be/src/kudu/util/errno.cc
new file mode 100644
index 0000000..cc00d0f
--- /dev/null
+++ b/be/src/kudu/util/errno.cc
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/errno.h"
+
+#include <cstring>
+#include <glog/logging.h>
+
+#include "kudu/gutil/dynamic_annotations.h"
+// IWYU pragma: no_include <features.h>
+
+namespace kudu {
+
+void ErrnoToCString(int err, char *buf, size_t buf_len) {
+ CHECK_GT(buf_len, 0);
+#if !defined(__GLIBC__) || \
+ ((_POSIX_C_SOURCE >= 200112 || _XOPEN_SOURCE >= 600) && !defined(_GNU_SOURCE))
+ // Using POSIX version 'int strerror_r(...)'.
+ int ret = strerror_r(err, buf, buf_len);
+ if (ret && ret != ERANGE && ret != EINVAL) {
+ strncpy(buf, "unknown error", buf_len);
+ buf[buf_len - 1] = '\0';
+ }
+#else
+ // Using GLIBC version
+
+ // KUDU-1515: TSAN in Clang 3.9 has an incorrect interceptor for strerror_r:
+ // https://github.com/google/sanitizers/issues/696
+ ANNOTATE_IGNORE_WRITES_BEGIN();
+ char* ret = strerror_r(err, buf, buf_len);
+ ANNOTATE_IGNORE_WRITES_END();
+ if (ret != buf) {
+ strncpy(buf, ret, buf_len);
+ buf[buf_len - 1] = '\0';
+ }
+#endif
+}
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/errno.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/errno.h b/be/src/kudu/util/errno.h
new file mode 100644
index 0000000..89802de
--- /dev/null
+++ b/be/src/kudu/util/errno.h
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_ERRNO_H
+#define KUDU_ERRNO_H
+
+#include <cstddef>
+#include <string>
+
+namespace kudu {
+
+void ErrnoToCString(int err, char *buf, size_t buf_len);
+
+// Return a string representing an errno.
+inline static std::string ErrnoToString(int err) {
+ char buf[512];
+ ErrnoToCString(err, buf, sizeof(buf));
+ return std::string(buf);
+}
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/faststring-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/faststring-test.cc b/be/src/kudu/util/faststring-test.cc
new file mode 100644
index 0000000..07c5697
--- /dev/null
+++ b/be/src/kudu/util/faststring-test.cc
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstring>
+#include <memory>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/faststring.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+class FaststringTest : public KuduTest {};
+
+TEST_F(FaststringTest, TestShrinkToFit_Empty) {
+ faststring s;
+ s.shrink_to_fit();
+ ASSERT_EQ(faststring::kInitialCapacity, s.capacity());
+}
+
+// Test that, if the string contents is shorter than the initial capacity
+// of the faststring, shrink_to_fit() leaves the string in the built-in
+// array.
+TEST_F(FaststringTest, TestShrinkToFit_SmallerThanInitialCapacity) {
+ faststring s;
+ s.append("hello");
+ s.shrink_to_fit();
+ ASSERT_EQ(faststring::kInitialCapacity, s.capacity());
+}
+
+TEST_F(FaststringTest, TestShrinkToFit_Random) {
+ Random r(GetRandomSeed32());
+ int kMaxSize = faststring::kInitialCapacity * 2;
+ std::unique_ptr<char[]> random_bytes(new char[kMaxSize]);
+ RandomString(random_bytes.get(), kMaxSize, &r);
+
+ faststring s;
+ for (int i = 0; i < 100; i++) {
+ int new_size = r.Uniform(kMaxSize);
+ s.resize(new_size);
+ memcpy(s.data(), random_bytes.get(), new_size);
+ s.shrink_to_fit();
+ ASSERT_EQ(0, memcmp(s.data(), random_bytes.get(), new_size));
+ ASSERT_EQ(std::max<int>(faststring::kInitialCapacity, new_size), s.capacity());
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/faststring.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/faststring.cc b/be/src/kudu/util/faststring.cc
new file mode 100644
index 0000000..a1cd26b
--- /dev/null
+++ b/be/src/kudu/util/faststring.cc
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/faststring.h"
+
+#include <glog/logging.h>
+#include <memory>
+
+namespace kudu {
+
+void faststring::GrowByAtLeast(size_t count) {
+ // Not enough space, need to reserve more.
+ // Don't reserve exactly enough space for the new string -- that makes it
+ // too easy to write perf bugs where you get O(n^2) append.
+ // Instead, alwayhs expand by at least 50%.
+
+ size_t to_reserve = len_ + count;
+ if (len_ + count < len_ * 3 / 2) {
+ to_reserve = len_ * 3 / 2;
+ }
+ GrowArray(to_reserve);
+}
+
+void faststring::GrowArray(size_t newcapacity) {
+ DCHECK_GE(newcapacity, capacity_);
+ std::unique_ptr<uint8_t[]> newdata(new uint8_t[newcapacity]);
+ if (len_ > 0) {
+ memcpy(&newdata[0], &data_[0], len_);
+ }
+ capacity_ = newcapacity;
+ if (data_ != initial_data_) {
+ delete[] data_;
+ } else {
+ ASAN_POISON_MEMORY_REGION(initial_data_, arraysize(initial_data_));
+ }
+
+ data_ = newdata.release();
+ ASAN_POISON_MEMORY_REGION(data_ + len_, capacity_ - len_);
+}
+
+void faststring::ShrinkToFitInternal() {
+ DCHECK_NE(data_, initial_data_);
+ if (len_ <= kInitialCapacity) {
+ ASAN_UNPOISON_MEMORY_REGION(initial_data_, len_);
+ memcpy(initial_data_, &data_[0], len_);
+ delete[] data_;
+ data_ = initial_data_;
+ capacity_ = kInitialCapacity;
+ } else {
+ std::unique_ptr<uint8_t[]> newdata(new uint8_t[len_]);
+ memcpy(&newdata[0], &data_[0], len_);
+ delete[] data_;
+ data_ = newdata.release();
+ capacity_ = len_;
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/faststring.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/faststring.h b/be/src/kudu/util/faststring.h
new file mode 100644
index 0000000..992060b
--- /dev/null
+++ b/be/src/kudu/util/faststring.h
@@ -0,0 +1,259 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_FASTSTRING_H
+#define KUDU_UTIL_FASTSTRING_H
+
+#include <cstdint>
+#include <cstring>
+#include <string>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/fastmem.h"
+
+namespace kudu {
+
+// A faststring is similar to a std::string, except that it is faster for many
+// common use cases (in particular, resize() will fill with uninitialized data
+// instead of memsetting to \0)
+class faststring {
+ public:
+ enum {
+ kInitialCapacity = 32
+ };
+
+ faststring() :
+ data_(initial_data_),
+ len_(0),
+ capacity_(kInitialCapacity) {
+ }
+
+ // Construct a string with the given capacity, in bytes.
+ explicit faststring(size_t capacity)
+ : data_(initial_data_),
+ len_(0),
+ capacity_(kInitialCapacity) {
+ if (capacity > capacity_) {
+ data_ = new uint8_t[capacity];
+ capacity_ = capacity;
+ }
+ ASAN_POISON_MEMORY_REGION(data_, capacity_);
+ }
+
+ ~faststring() {
+ ASAN_UNPOISON_MEMORY_REGION(initial_data_, arraysize(initial_data_));
+ if (data_ != initial_data_) {
+ delete[] data_;
+ }
+ }
+
+ // Reset the valid length of the string to 0.
+ //
+ // This does not free up any memory. The capacity of the string remains unchanged.
+ void clear() {
+ resize(0);
+ ASAN_POISON_MEMORY_REGION(data_, capacity_);
+ }
+
+ // Resize the string to the given length.
+ // If the new length is larger than the old length, the capacity is expanded as necessary.
+ //
+ // NOTE: in contrast to std::string's implementation, Any newly "exposed" bytes of data are
+ // not cleared.
+ void resize(size_t newsize) {
+ if (newsize > capacity_) {
+ reserve(newsize);
+ }
+ len_ = newsize;
+ ASAN_POISON_MEMORY_REGION(data_ + len_, capacity_ - len_);
+ ASAN_UNPOISON_MEMORY_REGION(data_, len_);
+ }
+
+ // Releases the underlying array; after this, the buffer is left empty.
+ //
+ // NOTE: the data pointer returned by release() is not necessarily the pointer
+ uint8_t *release() WARN_UNUSED_RESULT {
+ uint8_t *ret = data_;
+ if (ret == initial_data_) {
+ ret = new uint8_t[len_];
+ memcpy(ret, data_, len_);
+ }
+ len_ = 0;
+ capacity_ = kInitialCapacity;
+ data_ = initial_data_;
+ ASAN_POISON_MEMORY_REGION(data_, capacity_);
+ return ret;
+ }
+
+ // Reserve space for the given total amount of data. If the current capacity is already
+ // larger than the newly requested capacity, this is a no-op (i.e. it does not ever free memory).
+ //
+ // NOTE: even though the new capacity is reserved, it is illegal to begin writing into that memory
+ // directly using pointers. If ASAN is enabled, this is ensured using manual memory poisoning.
+ void reserve(size_t newcapacity) {
+ if (PREDICT_TRUE(newcapacity <= capacity_)) return;
+ GrowArray(newcapacity);
+ }
+
+ // Append the given data to the string, resizing capacity as necessary.
+ void append(const void *src_v, size_t count) {
+ const uint8_t *src = reinterpret_cast<const uint8_t *>(src_v);
+ EnsureRoomForAppend(count);
+ ASAN_UNPOISON_MEMORY_REGION(data_ + len_, count);
+
+ // appending short values is common enough that this
+ // actually helps, according to benchmarks. In theory
+ // memcpy_inlined should already be just as good, but this
+ // was ~20% faster for reading a large prefix-coded string file
+ // where each string was only a few chars different
+ if (count <= 4) {
+ uint8_t *p = &data_[len_];
+ for (int i = 0; i < count; i++) {
+ *p++ = *src++;
+ }
+ } else {
+ strings::memcpy_inlined(&data_[len_], src, count);
+ }
+ len_ += count;
+ }
+
+ // Append the given string to this string.
+ void append(const std::string &str) {
+ append(str.data(), str.size());
+ }
+
+ // Append the given character to this string.
+ void push_back(const char byte) {
+ EnsureRoomForAppend(1);
+ ASAN_UNPOISON_MEMORY_REGION(data_ + len_, 1);
+ data_[len_] = byte;
+ len_++;
+ }
+
+ // Return the valid length of this string.
+ size_t length() const {
+ return len_;
+ }
+
+ // Return the valid length of this string (identical to length())
+ size_t size() const {
+ return len_;
+ }
+
+ // Return the allocated capacity of this string.
+ size_t capacity() const {
+ return capacity_;
+ }
+
+ // Return a pointer to the data in this string. Note that this pointer
+ // may be invalidated by any later non-const operation.
+ const uint8_t *data() const {
+ return &data_[0];
+ }
+
+ // Return a pointer to the data in this string. Note that this pointer
+ // may be invalidated by any later non-const operation.
+ uint8_t *data() {
+ return &data_[0];
+ }
+
+ // Return the given element of this string. Note that this does not perform
+ // any bounds checking.
+ const uint8_t &at(size_t i) const {
+ return data_[i];
+ }
+
+ // Return the given element of this string. Note that this does not perform
+ // any bounds checking.
+ const uint8_t &operator[](size_t i) const {
+ return data_[i];
+ }
+
+ // Return the given element of this string. Note that this does not perform
+ // any bounds checking.
+ uint8_t &operator[](size_t i) {
+ return data_[i];
+ }
+
+ // Reset the contents of this string by copying 'len' bytes from 'src'.
+ void assign_copy(const uint8_t *src, size_t len) {
+ // Reset length so that the first resize doesn't need to copy the current
+ // contents of the array.
+ len_ = 0;
+ resize(len);
+ memcpy(data(), src, len);
+ }
+
+ // Reset the contents of this string by copying from the given std::string.
+ void assign_copy(const std::string &str) {
+ assign_copy(reinterpret_cast<const uint8_t *>(str.c_str()),
+ str.size());
+ }
+
+ // Reallocates the internal storage to fit only the current data.
+ //
+ // This may revert to using internal storage if the current length is shorter than
+ // kInitialCapacity. Note that, in that case, after this call, capacity() will return
+ // a capacity larger than the data length.
+ //
+ // Any pointers within this instance are invalidated.
+ void shrink_to_fit() {
+ if (data_ == initial_data_ || capacity_ == len_) return;
+ ShrinkToFitInternal();
+ }
+
+ // Return a copy of this string as a std::string.
+ std::string ToString() const {
+ return std::string(reinterpret_cast<const char *>(data()),
+ len_);
+ }
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(faststring);
+
+ // If necessary, expand the buffer to fit at least 'count' more bytes.
+ // If the array has to be grown, it is grown by at least 50%.
+ void EnsureRoomForAppend(size_t count) {
+ if (PREDICT_TRUE(len_ + count <= capacity_)) {
+ return;
+ }
+
+ // Call the non-inline slow path - this reduces the number of instructions
+ // on the hot path.
+ GrowByAtLeast(count);
+ }
+
+ // The slow path of MakeRoomFor. Grows the buffer by either
+ // 'count' bytes, or 50%, whichever is more.
+ void GrowByAtLeast(size_t count);
+
+ // Grow the array to the given capacity, which must be more than
+ // the current capacity.
+ void GrowArray(size_t newcapacity);
+
+ void ShrinkToFitInternal();
+
+ uint8_t* data_;
+ uint8_t initial_data_[kInitialCapacity];
+ size_t len_;
+ size_t capacity_;
+};
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/fault_injection.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/fault_injection.cc b/be/src/kudu/util/fault_injection.cc
new file mode 100644
index 0000000..6638bb6
--- /dev/null
+++ b/be/src/kudu/util/fault_injection.cc
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/fault_injection.h"
+
+#include <unistd.h>
+
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/once.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+
+namespace kudu {
+namespace fault_injection {
+
+namespace {
+GoogleOnceType g_random_once;
+Random* g_random;
+
+void InitRandom() {
+ LOG(WARNING) << "FAULT INJECTION ENABLED!";
+ LOG(WARNING) << "THIS SERVER MAY CRASH!";
+
+ debug::ScopedLeakCheckDisabler d;
+ g_random = new Random(GetRandomSeed32());
+ ANNOTATE_BENIGN_RACE_SIZED(g_random, sizeof(Random),
+ "Racy random numbers are OK");
+}
+
+} // anonymous namespace
+
+void DoMaybeFault(const char* fault_str, double fraction) {
+ GoogleOnceInit(&g_random_once, InitRandom);
+ if (PREDICT_TRUE(g_random->NextDoubleFraction() >= fraction)) {
+ return;
+ }
+ LOG(ERROR) << "Injecting fault: " << fault_str << " (process will exit)";
+ // _exit will exit the program without running atexit handlers. This more
+ // accurately simulates a crash.
+ _exit(kExitStatus);
+}
+
+void DoInjectRandomLatency(double max_latency_ms) {
+ GoogleOnceInit(&g_random_once, InitRandom);
+ SleepFor(MonoDelta::FromMilliseconds(g_random->NextDoubleFraction() * max_latency_ms));
+}
+
+void DoInjectFixedLatency(int32_t latency_ms) {
+ SleepFor(MonoDelta::FromMilliseconds(latency_ms));
+}
+
+bool DoMaybeTrue(double fraction) {
+ GoogleOnceInit(&g_random_once, InitRandom);
+ return PREDICT_FALSE(g_random->NextDoubleFraction() <= fraction);
+}
+
+} // namespace fault_injection
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/fault_injection.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/fault_injection.h b/be/src/kudu/util/fault_injection.h
new file mode 100644
index 0000000..7a71698
--- /dev/null
+++ b/be/src/kudu/util/fault_injection.h
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_FAULT_INJECTION_H
+#define KUDU_UTIL_FAULT_INJECTION_H
+
+#include <stdint.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/status.h"
+
+// Macros for injecting various kinds of faults with varying probability. If
+// configured with 0 probability, each of these macros is evaluated inline and
+// is fast enough to run even in hot code paths.
+
+// With some probability, crash at the current point in the code
+// by issuing LOG(FATAL).
+//
+// The probability is determined by the 'fraction_flag' argument.
+//
+// Typical usage:
+//
+// DEFINE_double(fault_crash_before_foo, 0.0,
+// "Fraction of the time when we will crash before doing foo");
+// TAG_FLAG(fault_crash_before_foo, unsafe);
+#define MAYBE_FAULT(fraction_flag) \
+ kudu::fault_injection::MaybeFault(AS_STRING(fraction_flag), fraction_flag)
+
+// Inject a uniformly random amount of latency between 0 and the configured
+// number of milliseconds.
+#define MAYBE_INJECT_RANDOM_LATENCY(max_ms_flag) \
+ kudu::fault_injection::MaybeInjectRandomLatency(max_ms_flag)
+
+// Inject a specific amount of latency.
+#define MAYBE_INJECT_FIXED_LATENCY(ms_flag) \
+ kudu::fault_injection::MaybeInjectFixedLatency(ms_flag)
+
+// With some probability, return the status described by 'status_expr'.
+// This will not evaluate 'status_expr' if 'fraction_flag' is zero.
+#define MAYBE_RETURN_FAILURE(fraction_flag, status_expr) \
+ if (kudu::fault_injection::MaybeTrue(fraction_flag)) { \
+ RETURN_NOT_OK((status_expr)); \
+ }
+
+// Implementation details below.
+// Use the MAYBE_FAULT macro instead.
+namespace kudu {
+namespace fault_injection {
+
+// The exit status returned from a process exiting due to a fault.
+// The choice of value here is arbitrary: just needs to be something
+// wouldn't normally be returned by a non-fault-injection code path.
+constexpr int kExitStatus = 85;
+
+// Out-of-line implementation.
+void DoMaybeFault(const char* fault_str, double fraction);
+void DoInjectRandomLatency(double max_latency_ms);
+void DoInjectFixedLatency(int32_t latency_ms);
+bool DoMaybeTrue(double fraction);
+
+inline bool MaybeTrue(double fraction) {
+ if (PREDICT_TRUE(fraction <= 0)) return false;
+ return DoMaybeTrue(fraction);
+}
+
+inline void MaybeFault(const char* fault_str, double fraction) {
+ if (PREDICT_TRUE(fraction <= 0)) return;
+ DoMaybeFault(fault_str, fraction);
+}
+
+inline void MaybeInjectRandomLatency(double max_latency) {
+ if (PREDICT_TRUE(max_latency <= 0)) return;
+ DoInjectRandomLatency(max_latency);
+}
+
+inline void MaybeInjectFixedLatency(int32_t latency) {
+ if (PREDICT_TRUE(latency <= 0)) return;
+ DoInjectFixedLatency(latency);
+}
+
+
+} // namespace fault_injection
+} // namespace kudu
+#endif /* KUDU_UTIL_FAULT_INJECTION_H */
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/file_cache-stress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/file_cache-stress-test.cc b/be/src/kudu/util/file_cache-stress-test.cc
new file mode 100644
index 0000000..9c51a52
--- /dev/null
+++ b/be/src/kudu/util/file_cache-stress-test.cc
@@ -0,0 +1,402 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/file_cache.h"
+
+#include <cstddef>
+#include <cstdint>
+#include <deque>
+#include <iterator>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/file_cache-test-util.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/oid_generator.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DEFINE_int32(test_num_producer_threads, 1, "Number of producer threads");
+DEFINE_int32(test_num_consumer_threads, 4, "Number of consumer threads");
+DEFINE_int32(test_duration_secs, 2, "Number of seconds to run the test");
+
+DECLARE_bool(cache_force_single_shard);
+
+using std::deque;
+using std::shared_ptr;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::unordered_map;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+// FD limit to enforce during the test.
+static const int kTestMaxOpenFiles = 100;
+
+template <class FileType>
+class FileCacheStressTest : public KuduTest {
+
+// Like CHECK_OK(), but dumps the contents of the cache before failing.
+//
+// The output of ToDebugString() tends to be long enough that LOG() truncates
+// it, so we must split it ourselves before logging.
+#define TEST_CHECK_OK(to_call) do { \
+ const Status& _s = (to_call); \
+ if (!_s.ok()) { \
+ LOG(INFO) << "Dumping cache contents"; \
+ vector<string> lines = strings::Split(cache_->ToDebugString(), "\n",\
+ strings::SkipEmpty()); \
+ for (const auto& l : lines) { \
+ LOG(INFO) << l; \
+ } \
+ } \
+ CHECK(_s.ok()) << "Bad status: " << _s.ToString(); \
+ } while (0);
+
+ public:
+ typedef unordered_map<string, unordered_map<string, int>> MetricMap;
+
+ FileCacheStressTest()
+ : rand_(SeedRandom()),
+ running_(1) {
+ // Use a single shard. Otherwise, the cache can be a little bit "sloppy"
+ // depending on the number of CPUs on the system.
+ FLAGS_cache_force_single_shard = true;
+ cache_.reset(new FileCache<FileType>("test",
+ env_,
+ kTestMaxOpenFiles,
+ scoped_refptr<MetricEntity>()));
+ }
+
+ void SetUp() override {
+ ASSERT_OK(cache_->Init());
+ }
+
+ void ProducerThread() {
+ Random rand(rand_.Next32());
+ ObjectIdGenerator oid_generator;
+ MetricMap metrics;
+
+ do {
+ // Create a new file with some (0-32k) random data in it.
+ string next_file_name = GetTestPath(oid_generator.Next());
+ {
+ unique_ptr<WritableFile> next_file;
+ CHECK_OK(env_->NewWritableFile(next_file_name, &next_file));
+ uint8_t buf[rand.Uniform((32 * 1024) - 1) + 1];
+ CHECK_OK(next_file->Append(GenerateRandomChunk(buf, sizeof(buf), &rand)));
+ CHECK_OK(next_file->Close());
+ }
+ {
+ std::lock_guard<simple_spinlock> l(lock_);
+ InsertOrDie(&available_files_, next_file_name, 0);
+ }
+ metrics[BaseName(next_file_name)]["create"] = 1;
+ } while (!running_.WaitFor(MonoDelta::FromMilliseconds(1)));
+
+ // Update the global metrics map.
+ MergeNewMetrics(std::move(metrics));
+ }
+
+ void ConsumerThread() {
+ // Each thread has its own PRNG to minimize contention on the main one.
+ Random rand(rand_.Next32());
+
+ // Active opened files in this thread.
+ deque<shared_ptr<FileType>> files;
+
+ // Metrics generated by this thread. They will be merged into the main
+ // metrics map when the thread is done.
+ MetricMap metrics;
+
+ do {
+ // Pick an action to perform. Distribution:
+ // 20% open
+ // 15% close
+ // 35% read
+ // 20% write
+ // 10% delete
+ int next_action = rand.Uniform(100);
+
+ if (next_action < 20) {
+ // Open an existing file.
+ string to_open;
+ if (!GetRandomFile(OPEN, &rand, &to_open)) {
+ continue;
+ }
+ shared_ptr<FileType> new_file;
+ TEST_CHECK_OK(cache_->OpenExistingFile(to_open, &new_file));
+ FinishedOpen(to_open);
+ metrics[BaseName(to_open)]["open"]++;
+ files.emplace_back(new_file);
+ } else if (next_action < 35) {
+ // Close a file.
+ if (files.empty()) {
+ continue;
+ }
+ shared_ptr<FileType> file = files.front();
+ files.pop_front();
+ metrics[BaseName(file->filename())]["close"]++;
+ } else if (next_action < 70) {
+ // Read a random chunk from a file.
+ TEST_CHECK_OK(ReadRandomChunk(files, &metrics, &rand));
+ } else if (next_action < 90) {
+ // Write a random chunk to a file.
+ TEST_CHECK_OK(WriteRandomChunk(files, &metrics, &rand));
+ } else if (next_action < 100) {
+ // Delete a file.
+ string to_delete;
+ if (!GetRandomFile(DELETE, &rand, &to_delete)) {
+ continue;
+ }
+ TEST_CHECK_OK(cache_->DeleteFile(to_delete));
+ metrics[BaseName(to_delete)]["delete"]++;
+ }
+ } while (!running_.WaitFor(MonoDelta::FromMilliseconds(1)));
+
+ // Update the global metrics map.
+ MergeNewMetrics(std::move(metrics));
+ }
+
+ protected:
+ void NotifyThreads() { running_.CountDown(); }
+
+ const MetricMap& metrics() const { return metrics_; }
+
+ private:
+ enum GetMode {
+ OPEN,
+ DELETE
+ };
+
+ // Retrieve a random file name to be either opened or deleted. If deleting,
+ // the file name is made inaccessible to future operations.
+ bool GetRandomFile(GetMode mode, Random* rand, string* out) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ if (available_files_.empty()) {
+ return false;
+ }
+
+ // This is linear time, but it's simpler than managing multiple data
+ // structures.
+ auto it = available_files_.begin();
+ std::advance(it, rand->Uniform(available_files_.size()));
+
+ // It's unsafe to delete a file that is still being opened.
+ if (mode == DELETE && it->second > 0) {
+ return false;
+ }
+
+ *out = it->first;
+ if (mode == OPEN) {
+ it->second++;
+ } else {
+ available_files_.erase(it);
+ }
+ return true;
+ }
+
+ // Signal that a previously in-progress open has finished, allowing the file
+ // in question to be deleted.
+ void FinishedOpen(const string& opened) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ int& openers = FindOrDie(available_files_, opened);
+ openers--;
+ }
+
+ // Reads a random chunk of data from a random file in 'files'. On success,
+ // writes to 'metrics'.
+ static Status ReadRandomChunk(const deque<shared_ptr<FileType>>& files,
+ MetricMap* metrics,
+ Random* rand) {
+ if (files.empty()) {
+ return Status::OK();
+ }
+ const shared_ptr<FileType>& file = files[rand->Uniform(files.size())];
+
+ uint64_t file_size;
+ RETURN_NOT_OK(file->Size(&file_size));
+ uint64_t off = file_size > 0 ? rand->Uniform(file_size) : 0;
+ size_t len = file_size > 0 ? rand->Uniform(file_size - off) : 0;
+ unique_ptr<uint8_t[]> scratch(new uint8_t[len]);
+ RETURN_NOT_OK(file->Read(off, Slice(scratch.get(), len)));
+
+ (*metrics)[BaseName(file->filename())]["read"]++;
+ return Status::OK();
+ }
+
+ // Writes a random chunk of data to a random file in 'files'. On success,
+ // updates 'metrics'.
+ //
+ // No-op for file implementations that don't support writing.
+ static Status WriteRandomChunk(const deque<shared_ptr<FileType>>& files,
+ MetricMap* metrics,
+ Random* rand);
+
+ static Slice GenerateRandomChunk(uint8_t* buffer, size_t max_length, Random* rand) {
+ size_t len = rand->Uniform(max_length);
+ len -= len % sizeof(uint32_t);
+ for (int i = 0; i < (len / sizeof(uint32_t)); i += sizeof(uint32_t)) {
+ reinterpret_cast<uint32_t*>(buffer)[i] = rand->Next32();
+ }
+ return Slice(buffer, len);
+ }
+
+ // Merge the metrics in 'new_metrics' into the global metric map.
+ void MergeNewMetrics(MetricMap new_metrics) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ for (const auto& file_action_pair : new_metrics) {
+ for (const auto& action_count_pair : file_action_pair.second) {
+ metrics_[file_action_pair.first][action_count_pair.first] += action_count_pair.second;
+ }
+ }
+ }
+
+ unique_ptr<FileCache<FileType>> cache_;
+
+ // Used to seed per-thread PRNGs.
+ ThreadSafeRandom rand_;
+
+ // Drops to zero when the test ends.
+ CountDownLatch running_;
+
+ // Protects 'available_files_' and 'metrics_'.
+ simple_spinlock lock_;
+
+ // Contains files produced by producer threads and ready for consumption by
+ // consumer threads.
+ //
+ // Each entry is a file name and the number of in-progress openers. To delete
+ // a file, there must be no openers.
+ unordered_map<string, int> available_files_;
+
+ // For each file name, tracks the count of consumer actions performed.
+ //
+ // Only updated at test end.
+ MetricMap metrics_;
+};
+
+template <>
+Status FileCacheStressTest<RWFile>::WriteRandomChunk(
+ const deque<shared_ptr<RWFile>>& files,
+ MetricMap* metrics,
+ Random* rand) {
+ if (files.empty()) {
+ return Status::OK();
+ }
+ const shared_ptr<RWFile>& file = files[rand->Uniform(files.size())];
+
+ uint64_t file_size;
+ RETURN_NOT_OK(file->Size(&file_size));
+ uint64_t off = file_size > 0 ? rand->Uniform(file_size) : 0;
+ uint8_t buf[64];
+ RETURN_NOT_OK(file->Write(off, GenerateRandomChunk(buf, sizeof(buf), rand)));
+ (*metrics)[BaseName(file->filename())]["write"]++;
+ return Status::OK();
+}
+
+template <>
+Status FileCacheStressTest<RandomAccessFile>::WriteRandomChunk(
+ const deque<shared_ptr<RandomAccessFile>>& /* unused */,
+ MetricMap* /* unused */,
+ Random* /* unused */) {
+ return Status::OK();
+}
+
+typedef ::testing::Types<RWFile, RandomAccessFile> FileTypes;
+TYPED_TEST_CASE(FileCacheStressTest, FileTypes);
+
+TYPED_TEST(FileCacheStressTest, TestStress) {
+ OverrideFlagForSlowTests("test_num_producer_threads", "2");
+ OverrideFlagForSlowTests("test_num_consumer_threads", "8");
+ OverrideFlagForSlowTests("test_duration_secs", "30");
+
+ // Start the threads.
+ PeriodicOpenFdChecker checker(
+ this->env_,
+ this->GetTestPath("*"), // only count within our test dir
+ kTestMaxOpenFiles + // cache capacity
+ FLAGS_test_num_producer_threads + // files being written
+ FLAGS_test_num_consumer_threads); // files being opened
+ checker.Start();
+ vector<thread> producers;
+ for (int i = 0; i < FLAGS_test_num_producer_threads; i++) {
+ producers.emplace_back(&FileCacheStressTest<TypeParam>::ProducerThread, this);
+ }
+ vector<thread> consumers;
+ for (int i = 0; i < FLAGS_test_num_consumer_threads; i++) {
+ consumers.emplace_back(&FileCacheStressTest<TypeParam>::ConsumerThread, this);
+ }
+
+ // Let the test run.
+ SleepFor(MonoDelta::FromSeconds(FLAGS_test_duration_secs));
+
+ // Stop the threads.
+ this->NotifyThreads();
+ checker.Stop();
+ for (auto& p : producers) {
+ p.join();
+ }
+ for (auto& c : consumers) {
+ c.join();
+ }
+
+ // Log the metrics.
+ unordered_map<string, int> action_counts;
+ for (const auto& file_action_pair : this->metrics()) {
+ for (const auto& action_count_pair : file_action_pair.second) {
+ VLOG(2) << Substitute("$0: $1: $2",
+ file_action_pair.first,
+ action_count_pair.first,
+ action_count_pair.second);
+ action_counts[action_count_pair.first] += action_count_pair.second;
+ }
+ }
+ for (const auto& action_count_pair : action_counts) {
+ LOG(INFO) << Substitute("$0: $1",
+ action_count_pair.first,
+ action_count_pair.second);
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/file_cache-test-util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/file_cache-test-util.h b/be/src/kudu/util/file_cache-test-util.h
new file mode 100644
index 0000000..09acb68
--- /dev/null
+++ b/be/src/kudu/util/file_cache-test-util.h
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <thread>
+
+#include <glog/logging.h>
+#include <string>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+// Periodically checks the number of open file descriptors belonging to this
+// process, crashing if it exceeds some upper bound.
+class PeriodicOpenFdChecker {
+ public:
+ // path_pattern: a glob-style pattern of which paths should be included while
+ // counting file descriptors
+ // upper_bound: the maximum number of file descriptors that should be open
+ // at any point in time
+ PeriodicOpenFdChecker(Env* env, std::string path_pattern, int upper_bound)
+ : env_(env),
+ path_pattern_(std::move(path_pattern)),
+ initial_fd_count_(CountOpenFds(env, path_pattern_)),
+ max_fd_count_(upper_bound + initial_fd_count_),
+ running_(1),
+ started_(false) {}
+
+ ~PeriodicOpenFdChecker() { Stop(); }
+
+ void Start() {
+ DCHECK(!started_);
+ running_.Reset(1);
+ check_thread_ = std::thread(&PeriodicOpenFdChecker::CheckThread, this);
+ started_ = true;
+ }
+
+ void Stop() {
+ if (started_) {
+ running_.CountDown();
+ check_thread_.join();
+ started_ = false;
+ }
+ }
+
+ private:
+ void CheckThread() {
+ LOG(INFO) << strings::Substitute(
+ "Periodic open fd checker starting for path pattern $0"
+ "(initial: $1 max: $2)",
+ path_pattern_, initial_fd_count_, max_fd_count_);
+ do {
+ int open_fd_count = CountOpenFds(env_, path_pattern_);
+ KLOG_EVERY_N_SECS(INFO, 1) << strings::Substitute("Open fd count: $0/$1",
+ open_fd_count,
+ max_fd_count_);
+ CHECK_LE(open_fd_count, max_fd_count_);
+ } while (!running_.WaitFor(MonoDelta::FromMilliseconds(100)));
+ }
+
+ Env* env_;
+ const std::string path_pattern_;
+ const int initial_fd_count_;
+ const int max_fd_count_;
+
+ CountDownLatch running_;
+ std::thread check_thread_;
+ bool started_;
+};
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/file_cache-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/file_cache-test.cc b/be/src/kudu/util/file_cache-test.cc
new file mode 100644
index 0000000..94c09eb
--- /dev/null
+++ b/be/src/kudu/util/file_cache-test.cc
@@ -0,0 +1,361 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/file_cache.h"
+
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/metrics.h" // IWYU pragma: keep
+#include "kudu/util/random.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(cache_force_single_shard);
+DECLARE_int32(file_cache_expiry_period_ms);
+
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+template <class FileType>
+class FileCacheTest : public KuduTest {
+ public:
+ FileCacheTest()
+ : rand_(SeedRandom()) {
+ // Simplify testing of the actual cache capacity.
+ FLAGS_cache_force_single_shard = true;
+
+ // Speed up tests that check the number of descriptors.
+ FLAGS_file_cache_expiry_period_ms = 1;
+
+ // libunwind internally uses two file descriptors as a pipe.
+ // Make sure it gets initialized early so that our fd count
+ // doesn't get affected by it.
+ ignore_result(GetStackTraceHex());
+ initial_open_fds_ = CountOpenFds();
+ }
+
+ int CountOpenFds() const {
+ // Only count files in the test working directory so that we don't
+ // accidentally count other fds that might be opened or closed in
+ // the background by other threads.
+ return kudu::CountOpenFds(env_, GetTestPath("*"));
+ }
+
+ void SetUp() override {
+ KuduTest::SetUp();
+ ASSERT_OK(ReinitCache(1));
+ }
+
+ protected:
+ Status ReinitCache(int max_open_files) {
+ cache_.reset(new FileCache<FileType>("test",
+ env_,
+ max_open_files,
+ nullptr));
+ return cache_->Init();
+ }
+
+ Status WriteTestFile(const string& name, const string& data) {
+ unique_ptr<RWFile> f;
+ RETURN_NOT_OK(env_->NewRWFile(name, &f));
+ RETURN_NOT_OK(f->Write(0, data));
+ return Status::OK();
+ }
+
+ void AssertFdsAndDescriptors(int num_expected_fds,
+ int num_expected_descriptors) {
+ ASSERT_EQ(initial_open_fds_ + num_expected_fds, CountOpenFds());
+
+ // The expiry thread may take some time to run.
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_EQ(num_expected_descriptors, cache_->NumDescriptorsForTests());
+ });
+ }
+
+ Random rand_;
+ int initial_open_fds_;
+ unique_ptr<FileCache<FileType>> cache_;
+};
+
+typedef ::testing::Types<RWFile, RandomAccessFile> FileTypes;
+TYPED_TEST_CASE(FileCacheTest, FileTypes);
+
+TYPED_TEST(FileCacheTest, TestBasicOperations) {
+ // Open a non-existent file.
+ {
+ shared_ptr<TypeParam> f;
+ ASSERT_TRUE(this->cache_->OpenExistingFile(
+ "/does/not/exist", &f).IsNotFound());
+ NO_FATALS(this->AssertFdsAndDescriptors(0, 0));
+ }
+
+ const string kFile1 = this->GetTestPath("foo");
+ const string kFile2 = this->GetTestPath("bar");
+ const string kData1 = "test data 1";
+ const string kData2 = "test data 2";
+
+ // Create some test files.
+ ASSERT_OK(this->WriteTestFile(kFile1, kData1));
+ ASSERT_OK(this->WriteTestFile(kFile2, kData2));
+ NO_FATALS(this->AssertFdsAndDescriptors(0, 0));
+
+ {
+ // Open a test file. It should open an fd and create a descriptor.
+ shared_ptr<TypeParam> f1;
+ ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f1));
+ NO_FATALS(this->AssertFdsAndDescriptors(1, 1));
+
+ // Spot check the test data by comparing sizes.
+ for (int i = 0; i < 3; i++) {
+ uint64_t size;
+ ASSERT_OK(f1->Size(&size));
+ ASSERT_EQ(kData1.size(), size);
+ NO_FATALS(this->AssertFdsAndDescriptors(1, 1));
+ }
+
+ // Open the same file a second time. It should reuse the existing
+ // descriptor and not open a second fd.
+ shared_ptr<TypeParam> f2;
+ ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f2));
+ NO_FATALS(this->AssertFdsAndDescriptors(1, 1));
+ {
+ Cache::UniqueHandle uh(
+ this->cache_->cache_->Lookup(kFile1, Cache::EXPECT_IN_CACHE),
+ Cache::HandleDeleter(this->cache_->cache_.get()));
+ ASSERT_TRUE(uh.get());
+ }
+
+ // Open a second file. This will create a new descriptor, but evict the fd
+ // opened for the first file, so the fd count should remain constant.
+ shared_ptr<TypeParam> f3;
+ ASSERT_OK(this->cache_->OpenExistingFile(kFile2, &f3));
+ NO_FATALS(this->AssertFdsAndDescriptors(1, 2));
+ {
+ Cache::UniqueHandle uh(
+ this->cache_->cache_->Lookup(kFile1, Cache::EXPECT_IN_CACHE),
+ Cache::HandleDeleter(this->cache_->cache_.get()));
+ ASSERT_FALSE(uh.get());
+ }
+ {
+ Cache::UniqueHandle uh(
+ this->cache_->cache_->Lookup(kFile2, Cache::EXPECT_IN_CACHE),
+ Cache::HandleDeleter(this->cache_->cache_.get()));
+ ASSERT_TRUE(uh.get());
+ }
+ }
+
+ // The descriptors are all out of scope, but the open fds remain in the cache.
+ NO_FATALS(this->AssertFdsAndDescriptors(1, 0));
+
+ // With the cache gone, so are the cached fds.
+ this->cache_.reset();
+ ASSERT_EQ(this->initial_open_fds_, this->CountOpenFds());
+}
+
+TYPED_TEST(FileCacheTest, TestDeletion) {
+ // Deleting a file that doesn't exist does nothing/
+ ASSERT_TRUE(this->cache_->DeleteFile("/does/not/exist").IsNotFound());
+
+ // Create a test file, then delete it. It will be deleted immediately.
+ const string kFile1 = this->GetTestPath("foo");
+ const string kData1 = "test data 1";
+ ASSERT_OK(this->WriteTestFile(kFile1, kData1));
+ ASSERT_TRUE(this->env_->FileExists(kFile1));
+ ASSERT_OK(this->cache_->DeleteFile(kFile1));
+ ASSERT_FALSE(this->env_->FileExists(kFile1));
+
+ // Trying to delete it again fails.
+ ASSERT_TRUE(this->cache_->DeleteFile(kFile1).IsNotFound());
+
+ // Create another test file, open it, then delete it. The delete is not
+ // effected until the last open descriptor is closed. In between, the
+ // cache won't allow the file to be opened again.
+ const string kFile2 = this->GetTestPath("bar");
+ const string kData2 = "test data 2";
+ ASSERT_OK(this->WriteTestFile(kFile2, kData2));
+ ASSERT_TRUE(this->env_->FileExists(kFile2));
+ {
+ shared_ptr<TypeParam> f1;
+ ASSERT_OK(this->cache_->OpenExistingFile(kFile2, &f1));
+ ASSERT_EQ(this->initial_open_fds_ + 1, this->CountOpenFds());
+ ASSERT_OK(this->cache_->DeleteFile(kFile2));
+ {
+ shared_ptr<TypeParam> f2;
+ ASSERT_TRUE(this->cache_->OpenExistingFile(kFile2, &f2).IsNotFound());
+ }
+ ASSERT_TRUE(this->cache_->DeleteFile(kFile2).IsNotFound());
+ ASSERT_TRUE(this->env_->FileExists(kFile2));
+ ASSERT_EQ(this->initial_open_fds_ + 1, this->CountOpenFds());
+ }
+ ASSERT_FALSE(this->env_->FileExists(kFile2));
+ ASSERT_EQ(this->initial_open_fds_, this->CountOpenFds());
+
+ // Create a test file, open it, and let it go out of scope before
+ // deleting it. The deletion should evict the fd and close it, despite
+ // happening after the descriptor is gone.
+ const string kFile3 = this->GetTestPath("baz");
+ const string kData3 = "test data 3";
+ ASSERT_OK(this->WriteTestFile(kFile3, kData3));
+ {
+ shared_ptr<TypeParam> f3;
+ ASSERT_OK(this->cache_->OpenExistingFile(kFile3, &f3));
+ }
+ ASSERT_TRUE(this->env_->FileExists(kFile3));
+ ASSERT_EQ(this->initial_open_fds_ + 1, this->CountOpenFds());
+ ASSERT_OK(this->cache_->DeleteFile(kFile3));
+ ASSERT_FALSE(this->env_->FileExists(kFile3));
+ ASSERT_EQ(this->initial_open_fds_, this->CountOpenFds());
+}
+
+TYPED_TEST(FileCacheTest, TestInvalidation) {
+ const string kFile1 = this->GetTestPath("foo");
+ const string kData1 = "test data 1";
+ ASSERT_OK(this->WriteTestFile(kFile1, kData1));
+
+ // Open the file.
+ shared_ptr<TypeParam> f;
+ ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f));
+
+ // Write a new file and rename it in place on top of file1.
+ const string kFile2 = this->GetTestPath("foo2");
+ const string kData2 = "test data 2 (longer than original)";
+ ASSERT_OK(this->WriteTestFile(kFile2, kData2));
+ ASSERT_OK(this->env_->RenameFile(kFile2, kFile1));
+
+ // We should still be able to access the file, since it has a cached fd.
+ uint64_t size;
+ ASSERT_OK(f->Size(&size));
+ ASSERT_EQ(kData1.size(), size);
+
+ // If we invalidate it from the cache and try again, it should crash because
+ // the existing descriptor was invalidated.
+ this->cache_->Invalidate(kFile1);
+ ASSERT_DEATH({ f->Size(&size); }, "invalidated");
+
+ // But if we re-open the path again, the new descriptor should read the
+ // new data.
+ shared_ptr<TypeParam> f2;
+ ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f2));
+ ASSERT_OK(f2->Size(&size));
+ ASSERT_EQ(kData2.size(), size);
+}
+
+
+TYPED_TEST(FileCacheTest, TestHeavyReads) {
+ const int kNumFiles = 20;
+ const int kNumIterations = 100;
+ const int kCacheCapacity = 5;
+
+ ASSERT_OK(this->ReinitCache(kCacheCapacity));
+
+ // Randomly generate some data.
+ string data;
+ for (int i = 0; i < 1000; i++) {
+ data += Substitute("$0", this->rand_.Next());
+ }
+
+ // Write that data to a bunch of files and open them through the cache.
+ vector<shared_ptr<TypeParam>> opened_files;
+ for (int i = 0; i < kNumFiles; i++) {
+ string filename = this->GetTestPath(Substitute("$0", i));
+ ASSERT_OK(this->WriteTestFile(filename, data));
+ shared_ptr<TypeParam> f;
+ ASSERT_OK(this->cache_->OpenExistingFile(filename, &f));
+ opened_files.push_back(f);
+ }
+
+ // Read back the data at random through the cache.
+ unique_ptr<uint8_t[]> buf(new uint8_t[data.length()]);
+ for (int i = 0; i < kNumIterations; i++) {
+ int idx = this->rand_.Uniform(opened_files.size());
+ const auto& f = opened_files[idx];
+ uint64_t size;
+ ASSERT_OK(f->Size(&size));
+ Slice s(buf.get(), size);
+ ASSERT_OK(f->Read(0, s));
+ ASSERT_EQ(data, s);
+ ASSERT_LE(this->CountOpenFds(),
+ this->initial_open_fds_ + kCacheCapacity);
+ }
+}
+
+TYPED_TEST(FileCacheTest, TestNoRecursiveDeadlock) {
+ // This test triggered a deadlock in a previous implementation, when expired
+ // weak_ptrs were removed from the descriptor map in the descriptor's
+ // destructor.
+ alarm(60);
+ auto cleanup = MakeScopedCleanup([]() {
+ alarm(0);
+ });
+
+ const string kFile = this->GetTestPath("foo");
+ ASSERT_OK(this->WriteTestFile(kFile, "test data"));
+
+ vector<std::thread> threads;
+ for (int i = 0; i < 2; i++) {
+ threads.emplace_back([&]() {
+ for (int i = 0; i < 10000; i++) {
+ shared_ptr<TypeParam> f;
+ CHECK_OK(this->cache_->OpenExistingFile(kFile, &f));
+ }
+ });
+ }
+
+ for (auto& t : threads) {
+ t.join();
+ }
+}
+
+class RandomAccessFileCacheTest : public FileCacheTest<RandomAccessFile> {
+};
+
+TEST_F(RandomAccessFileCacheTest, TestMemoryFootprintDoesNotCrash) {
+ const string kFile = this->GetTestPath("foo");
+ ASSERT_OK(this->WriteTestFile(kFile, "test data"));
+
+ shared_ptr<RandomAccessFile> f;
+ ASSERT_OK(this->cache_->OpenExistingFile(kFile, &f));
+
+ // This used to crash due to a kudu_malloc_usable_size() call on a memory
+ // address that wasn't the start of an actual heap allocation.
+ LOG(INFO) << f->memory_footprint();
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/file_cache.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/file_cache.cc b/be/src/kudu/util/file_cache.cc
new file mode 100644
index 0000000..a1ab814
--- /dev/null
+++ b/be/src/kudu/util/file_cache.cc
@@ -0,0 +1,654 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/file_cache.h"
+
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/array_view.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h" // IWYU pragma: keep
+#include "kudu/util/monotime.h"
+#include "kudu/util/once.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+DEFINE_int32(file_cache_expiry_period_ms, 60 * 1000,
+ "Period of time (in ms) between removing expired file cache descriptors");
+TAG_FLAG(file_cache_expiry_period_ms, advanced);
+
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+namespace {
+
+template <class FileType>
+FileType* CacheValueToFileType(Slice s) {
+ return reinterpret_cast<FileType*>(*reinterpret_cast<void**>(
+ s.mutable_data()));
+}
+
+template <class FileType>
+class EvictionCallback : public Cache::EvictionCallback {
+ public:
+ EvictionCallback() {}
+
+ void EvictedEntry(Slice key, Slice value) override {
+ VLOG(2) << "Evicted fd belonging to " << key.ToString();
+ delete CacheValueToFileType<FileType>(value);
+ }
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(EvictionCallback);
+};
+
+} // anonymous namespace
+
+namespace internal {
+
+template <class FileType>
+class ScopedOpenedDescriptor;
+
+// Encapsulates common descriptor fields and methods.
+template <class FileType>
+class BaseDescriptor {
+ public:
+ BaseDescriptor(FileCache<FileType>* file_cache,
+ string filename)
+ : file_cache_(file_cache),
+ file_name_(std::move(filename)) {}
+
+ ~BaseDescriptor() {
+ VLOG(2) << "Out of scope descriptor with file name: " << filename();
+
+ // The (now expired) weak_ptr remains in 'descriptors_', to be removed by
+ // the next call to RunDescriptorExpiry(). Removing it here would risk a
+ // deadlock on recursive acquisition of 'lock_'.
+
+ if (deleted()) {
+ cache()->Erase(filename());
+
+ VLOG(1) << "Deleting file: " << filename();
+ WARN_NOT_OK(env()->DeleteFile(filename()), "");
+ }
+ }
+
+ // Insert a pointer to an open file object into the file cache with the
+ // filename as the cache key.
+ //
+ // Returns a handle to the inserted entry. The handle always contains an open
+ // file.
+ ScopedOpenedDescriptor<FileType> InsertIntoCache(void* file_ptr) const {
+ // The allocated charge is always one byte. This is incorrect with respect
+ // to memory tracking, but it's necessary if the cache capacity is to be
+ // equivalent to the max number of fds.
+ Cache::PendingHandle* pending = CHECK_NOTNULL(cache()->Allocate(
+ filename(), sizeof(file_ptr), 1));
+ memcpy(cache()->MutableValue(pending),
+ &file_ptr,
+ sizeof(file_ptr));
+ return ScopedOpenedDescriptor<FileType>(this, Cache::UniqueHandle(
+ cache()->Insert(pending, file_cache_->eviction_cb_.get()),
+ Cache::HandleDeleter(cache())));
+ }
+
+ // Retrieves a pointer to an open file object from the file cache with the
+ // filename as the cache key.
+ //
+ // Returns a handle to the looked up entry. The handle may or may not contain
+ // an open file, depending on whether the cache hit or missed.
+ ScopedOpenedDescriptor<FileType> LookupFromCache() const {
+ return ScopedOpenedDescriptor<FileType>(this, Cache::UniqueHandle(
+ cache()->Lookup(filename(), Cache::EXPECT_IN_CACHE),
+ Cache::HandleDeleter(cache())));
+ }
+
+ // Mark this descriptor as to-be-deleted later.
+ void MarkDeleted() {
+ DCHECK(!deleted());
+ while (true) {
+ auto v = flags_.load();
+ if (flags_.compare_exchange_weak(v, v | FILE_DELETED)) return;
+ }
+ }
+
+ // Mark this descriptor as invalidated. No further access is allowed
+ // to this file.
+ void MarkInvalidated() {
+ DCHECK(!invalidated());
+ while (true) {
+ auto v = flags_.load();
+ if (flags_.compare_exchange_weak(v, v | INVALIDATED)) return;
+ }
+ }
+
+ Cache* cache() const { return file_cache_->cache_.get(); }
+
+ Env* env() const { return file_cache_->env_; }
+
+ const string& filename() const { return file_name_; }
+
+ bool deleted() const { return flags_.load() & FILE_DELETED; }
+ bool invalidated() const { return flags_.load() & INVALIDATED; }
+
+ private:
+ FileCache<FileType>* file_cache_;
+ const string file_name_;
+ enum Flags {
+ FILE_DELETED = 1 << 0,
+ INVALIDATED = 1 << 1
+ };
+ std::atomic<uint8_t> flags_ {0};
+
+ DISALLOW_COPY_AND_ASSIGN(BaseDescriptor);
+};
+
+// A "smart" retrieved LRU cache handle.
+//
+// The cache handle is released when this object goes out of scope, possibly
+// closing the opened file if it is no longer in the cache.
+template <class FileType>
+class ScopedOpenedDescriptor {
+ public:
+ // A not-yet-but-soon-to-be opened descriptor.
+ explicit ScopedOpenedDescriptor(const BaseDescriptor<FileType>* desc)
+ : desc_(desc),
+ handle_(nullptr, Cache::HandleDeleter(desc_->cache())) {
+ }
+
+ // An opened descriptor. Its handle may or may not contain an open file.
+ ScopedOpenedDescriptor(const BaseDescriptor<FileType>* desc,
+ Cache::UniqueHandle handle)
+ : desc_(desc),
+ handle_(std::move(handle)) {
+ }
+
+ bool opened() const { return handle_.get(); }
+
+ FileType* file() const {
+ DCHECK(opened());
+ return CacheValueToFileType<FileType>(desc_->cache()->Value(handle_.get()));
+ }
+
+ private:
+ const BaseDescriptor<FileType>* desc_;
+ Cache::UniqueHandle handle_;
+};
+
+// Reference to an on-disk file that may or may not be opened (and thus
+// cached) in the file cache.
+//
+// This empty template is just a specification; actual descriptor classes must
+// be fully specialized.
+template <class FileType>
+class Descriptor : public FileType {
+};
+
+// A descriptor adhering to the RWFile interface (i.e. when opened, provides
+// a read-write interface to the underlying file).
+template <>
+class Descriptor<RWFile> : public RWFile {
+ public:
+ Descriptor(FileCache<RWFile>* file_cache, const string& filename)
+ : base_(file_cache, filename) {}
+
+ ~Descriptor() = default;
+
+ Status Read(uint64_t offset, Slice result) const override {
+ ScopedOpenedDescriptor<RWFile> opened(&base_);
+ RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ return opened.file()->Read(offset, result);
+ }
+
+ Status ReadV(uint64_t offset, ArrayView<Slice> results) const override {
+ ScopedOpenedDescriptor<RWFile> opened(&base_);
+ RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ return opened.file()->ReadV(offset, results);
+ }
+
+ Status Write(uint64_t offset, const Slice& data) override {
+ ScopedOpenedDescriptor<RWFile> opened(&base_);
+ RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ return opened.file()->Write(offset, data);
+ }
+
+ Status WriteV(uint64_t offset, ArrayView<const Slice> data) override {
+ ScopedOpenedDescriptor<RWFile> opened(&base_);
+ RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ return opened.file()->WriteV(offset, data);
+ }
+
+ Status PreAllocate(uint64_t offset, size_t length, PreAllocateMode mode) override {
+ ScopedOpenedDescriptor<RWFile> opened(&base_);
+ RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ return opened.file()->PreAllocate(offset, length, mode);
+ }
+
+ Status Truncate(uint64_t length) override {
+ ScopedOpenedDescriptor<RWFile> opened(&base_);
+ RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ return opened.file()->Truncate(length);
+ }
+
+ Status PunchHole(uint64_t offset, size_t length) override {
+ ScopedOpenedDescriptor<RWFile> opened(&base_);
+ RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ return opened.file()->PunchHole(offset, length);
+ }
+
+ Status Flush(FlushMode mode, uint64_t offset, size_t length) override {
+ ScopedOpenedDescriptor<RWFile> opened(&base_);
+ RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ return opened.file()->Flush(mode, offset, length);
+ }
+
+ Status Sync() override {
+ ScopedOpenedDescriptor<RWFile> opened(&base_);
+ RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ return opened.file()->Sync();
+ }
+
+ Status Close() override {
+ // Intentional no-op; actual closing is deferred to LRU cache eviction.
+ return Status::OK();
+ }
+
+ Status Size(uint64_t* size) const override {
+ ScopedOpenedDescriptor<RWFile> opened(&base_);
+ RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ return opened.file()->Size(size);
+ }
+
+ Status GetExtentMap(ExtentMap* out) const override {
+ ScopedOpenedDescriptor<RWFile> opened(&base_);
+ RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ return opened.file()->GetExtentMap(out);
+ }
+
+ const string& filename() const override {
+ return base_.filename();
+ }
+
+ private:
+ friend class FileCache<RWFile>;
+
+ Status Init() {
+ return once_.Init(&Descriptor<RWFile>::InitOnce, this);
+ }
+
+ Status InitOnce() {
+ return ReopenFileIfNecessary(nullptr);
+ }
+
+ Status ReopenFileIfNecessary(ScopedOpenedDescriptor<RWFile>* out) const {
+ ScopedOpenedDescriptor<RWFile> found(base_.LookupFromCache());
+ CHECK(!base_.invalidated());
+ if (found.opened()) {
+ // The file is already open in the cache, return it.
+ if (out) {
+ *out = std::move(found);
+ }
+ return Status::OK();
+ }
+
+ // The file was evicted, reopen it.
+ //
+ // Because the file may be evicted at any time we must use 'sync_on_close'
+ // (note: sync is a no-op if the file isn't dirty).
+ RWFileOptions opts;
+ opts.sync_on_close = true;
+ opts.mode = Env::OPEN_EXISTING;
+ unique_ptr<RWFile> f;
+ RETURN_NOT_OK(base_.env()->NewRWFile(opts, base_.filename(), &f));
+
+ // The cache will take ownership of the newly opened file.
+ ScopedOpenedDescriptor<RWFile> opened(base_.InsertIntoCache(f.release()));
+ if (out) {
+ *out = std::move(opened);
+ }
+ return Status::OK();
+ }
+
+ BaseDescriptor<RWFile> base_;
+ KuduOnceDynamic once_;
+
+ DISALLOW_COPY_AND_ASSIGN(Descriptor);
+};
+
+// A descriptor adhering to the RandomAccessFile interface (i.e. when opened,
+// provides a read-only interface to the underlying file).
+template <>
+class Descriptor<RandomAccessFile> : public RandomAccessFile {
+ public:
+ Descriptor(FileCache<RandomAccessFile>* file_cache, const string& filename)
+ : base_(file_cache, filename) {}
+
+ ~Descriptor() = default;
+
+ Status Read(uint64_t offset, Slice result) const override {
+ ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
+ RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ return opened.file()->Read(offset, result);
+ }
+
+ Status ReadV(uint64_t offset, ArrayView<Slice> results) const override {
+ ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
+ RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ return opened.file()->ReadV(offset, results);
+ }
+
+ Status Size(uint64_t *size) const override {
+ ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
+ RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ return opened.file()->Size(size);
+ }
+
+ const string& filename() const override {
+ return base_.filename();
+ }
+
+ size_t memory_footprint() const override {
+ // Normally we would use kudu_malloc_usable_size(this). However, that's
+ // not safe because 'this' was allocated via std::make_shared(), which
+ // means it isn't necessarily the base of the memory allocation; it may be
+ // preceded by the shared_ptr control block.
+ //
+ // It doesn't appear possible to get the base of the allocation via any
+ // shared_ptr APIs, so we'll use sizeof(*this) + 16 instead. The 16 bytes
+ // represent the shared_ptr control block. Overall the object size is still
+ // undercounted as it doesn't account for any internal heap fragmentation,
+ // but at least it's safe.
+ //
+ // Some anecdotal memory measurements taken inside gdb:
+ // - glibc 2.23 malloc_usable_size() on make_shared<FileType>: 88 bytes.
+ // - tcmalloc malloc_usable_size() on make_shared<FileType>: 96 bytes.
+ // - sizeof(std::_Sp_counted_base<>) with libstdc++ 5.4: 16 bytes.
+ // - sizeof(std::__1::__shared_ptr_emplace<>) with libc++ 3.9: 16 bytes.
+ // - sizeof(*this): 72 bytes.
+ return sizeof(*this) +
+ 16 + // shared_ptr control block
+ once_.memory_footprint_excluding_this() +
+ base_.filename().capacity();
+ }
+
+ private:
+ friend class FileCache<RandomAccessFile>;
+
+ Status Init() {
+ return once_.Init(&Descriptor<RandomAccessFile>::InitOnce, this);
+ }
+
+ Status InitOnce() {
+ return ReopenFileIfNecessary(nullptr);
+ }
+
+ Status ReopenFileIfNecessary(
+ ScopedOpenedDescriptor<RandomAccessFile>* out) const {
+ ScopedOpenedDescriptor<RandomAccessFile> found(base_.LookupFromCache());
+ CHECK(!base_.invalidated());
+ if (found.opened()) {
+ // The file is already open in the cache, return it.
+ if (out) {
+ *out = std::move(found);
+ }
+ return Status::OK();
+ }
+
+ // The file was evicted, reopen it.
+ unique_ptr<RandomAccessFile> f;
+ RETURN_NOT_OK(base_.env()->NewRandomAccessFile(base_.filename(), &f));
+
+ // The cache will take ownership of the newly opened file.
+ ScopedOpenedDescriptor<RandomAccessFile> opened(
+ base_.InsertIntoCache(f.release()));
+ if (out) {
+ *out = std::move(opened);
+ }
+ return Status::OK();
+ }
+
+ BaseDescriptor<RandomAccessFile> base_;
+ KuduOnceDynamic once_;
+
+ DISALLOW_COPY_AND_ASSIGN(Descriptor);
+};
+
+} // namespace internal
+
+template <class FileType>
+FileCache<FileType>::FileCache(const string& cache_name,
+ Env* env,
+ int max_open_files,
+ const scoped_refptr<MetricEntity>& entity)
+ : env_(env),
+ cache_name_(cache_name),
+ eviction_cb_(new EvictionCallback<FileType>()),
+ cache_(NewLRUCache(DRAM_CACHE, max_open_files, cache_name)),
+ running_(1) {
+ if (entity) {
+ cache_->SetMetrics(entity);
+ }
+ LOG(INFO) << Substitute("Constructed file cache $0 with capacity $1",
+ cache_name, max_open_files);
+}
+
+template <class FileType>
+FileCache<FileType>::~FileCache() {
+ running_.CountDown();
+ if (descriptor_expiry_thread_) {
+ descriptor_expiry_thread_->Join();
+ }
+}
+
+template <class FileType>
+Status FileCache<FileType>::Init() {
+ return Thread::Create("cache", Substitute("$0-evict", cache_name_),
+ &FileCache::RunDescriptorExpiry, this,
+ &descriptor_expiry_thread_);
+}
+
+template <class FileType>
+Status FileCache<FileType>::OpenExistingFile(const string& file_name,
+ shared_ptr<FileType>* file) {
+ shared_ptr<internal::Descriptor<FileType>> desc;
+ {
+ // Find an existing descriptor, or create one if none exists.
+ std::lock_guard<simple_spinlock> l(lock_);
+ RETURN_NOT_OK(FindDescriptorUnlocked(file_name, &desc));
+ if (desc) {
+ VLOG(2) << "Found existing descriptor: " << desc->filename();
+ } else {
+ desc = std::make_shared<internal::Descriptor<FileType>>(this, file_name);
+ InsertOrDie(&descriptors_, file_name, desc);
+ VLOG(2) << "Created new descriptor: " << desc->filename();
+ }
+ }
+
+ // Check that the underlying file can be opened (no-op for found
+ // descriptors). Done outside the lock.
+ RETURN_NOT_OK(desc->Init());
+ *file = std::move(desc);
+ return Status::OK();
+}
+
+template <class FileType>
+Status FileCache<FileType>::DeleteFile(const string& file_name) {
+ {
+ std::lock_guard<simple_spinlock> l(lock_);
+ shared_ptr<internal::Descriptor<FileType>> desc;
+ RETURN_NOT_OK(FindDescriptorUnlocked(file_name, &desc));
+
+ if (desc) {
+ VLOG(2) << "Marking file for deletion: " << file_name;
+ desc->base_.MarkDeleted();
+ return Status::OK();
+ }
+ }
+
+ // There is no outstanding descriptor. Delete the file now.
+ //
+ // Make sure it's been fully evicted from the cache (perhaps it was opened
+ // previously?) so that the filesystem can reclaim the file data instantly.
+ cache_->Erase(file_name);
+ return env_->DeleteFile(file_name);
+}
+
+template <class FileType>
+void FileCache<FileType>::Invalidate(const string& file_name) {
+ // Ensure that there is an invalidated descriptor in the map for this filename.
+ //
+ // This ensures that any concurrent OpenExistingFile() during this method wil
+ // see the invalidation and issue a CHECK failure.
+ shared_ptr<internal::Descriptor<FileType>> desc;
+ {
+ // Find an existing descriptor, or create one if none exists.
+ std::lock_guard<simple_spinlock> l(lock_);
+ auto it = descriptors_.find(file_name);
+ if (it != descriptors_.end()) {
+ desc = it->second.lock();
+ }
+ if (!desc) {
+ desc = std::make_shared<internal::Descriptor<FileType>>(this, file_name);
+ descriptors_.emplace(file_name, desc);
+ }
+
+ desc->base_.MarkInvalidated();
+ }
+ // Remove it from the cache so that if the same path is opened again, we
+ // will re-open a new FD rather than retrieving one that might have been
+ // cached prior to invalidation.
+ cache_->Erase(file_name);
+
+ // Remove the invalidated descriptor from the map. We are guaranteed it
+ // is still there because we've held a strong reference to it for
+ // the duration of this method, and no other methods erase strong
+ // references from the map.
+ {
+ std::lock_guard<simple_spinlock> l(lock_);
+ CHECK_EQ(1, descriptors_.erase(file_name));
+ }
+}
+
+template <class FileType>
+int FileCache<FileType>::NumDescriptorsForTests() const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return descriptors_.size();
+}
+
+template <class FileType>
+string FileCache<FileType>::ToDebugString() const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ string ret;
+ for (const auto& e : descriptors_) {
+ bool strong = false;
+ bool deleted = false;
+ bool opened = false;
+ shared_ptr<internal::Descriptor<FileType>> desc = e.second.lock();
+ if (desc) {
+ strong = true;
+ if (desc->base_.deleted()) {
+ deleted = true;
+ }
+ internal::ScopedOpenedDescriptor<FileType> o(
+ desc->base_.LookupFromCache());
+ if (o.opened()) {
+ opened = true;
+ }
+ }
+ if (strong) {
+ ret += Substitute("$0 (S$1$2)\n", e.first,
+ deleted ? "D" : "", opened ? "O" : "");
+ } else {
+ ret += Substitute("$0\n", e.first);
+ }
+ }
+ return ret;
+}
+
+template <class FileType>
+Status FileCache<FileType>::FindDescriptorUnlocked(
+ const string& file_name,
+ shared_ptr<internal::Descriptor<FileType>>* file) {
+ DCHECK(lock_.is_locked());
+
+ auto it = descriptors_.find(file_name);
+ if (it != descriptors_.end()) {
+ // Found the descriptor. Has it expired?
+ shared_ptr<internal::Descriptor<FileType>> desc = it->second.lock();
+ if (desc) {
+ CHECK(!desc->base_.invalidated());
+ if (desc->base_.deleted()) {
+ return Status::NotFound("File already marked for deletion", file_name);
+ }
+
+ // Descriptor is still valid, return it.
+ if (file) {
+ *file = desc;
+ }
+ return Status::OK();
+ }
+ // Descriptor has expired; erase it and pretend we found nothing.
+ descriptors_.erase(it);
+ }
+ return Status::OK();
+}
+
+template <class FileType>
+void FileCache<FileType>::RunDescriptorExpiry() {
+ while (!running_.WaitFor(MonoDelta::FromMilliseconds(
+ FLAGS_file_cache_expiry_period_ms))) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ for (auto it = descriptors_.begin(); it != descriptors_.end();) {
+ if (it->second.expired()) {
+ it = descriptors_.erase(it);
+ } else {
+ it++;
+ }
+ }
+ }
+}
+
+// Explicit specialization for callers outside this compilation unit.
+template class FileCache<RWFile>;
+template class FileCache<RandomAccessFile>;
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/file_cache.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/file_cache.h b/be/src/kudu/util/file_cache.h
new file mode 100644
index 0000000..021c758
--- /dev/null
+++ b/be/src/kudu/util/file_cache.h
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/cache.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Env;
+
+namespace internal {
+
+template <class FileType>
+class BaseDescriptor;
+
+template <class FileType>
+class Descriptor;
+
+} // namespace internal
+
+class MetricEntity;
+class Thread;
+
+// Cache of open files.
+//
+// The purpose of this cache is to enforce an upper bound on the maximum number
+// of files open at a time. Files opened through the cache may be closed at any
+// time, only to be reopened upon next use.
+//
+// The file cache can be viewed as having two logical parts: the client-facing
+// API and the LRU cache.
+//
+// Client-facing API
+// -----------------
+// The core of the client-facing API is the cache descriptor. A descriptor
+// uniquely identifies an opened file. To a client, a descriptor is just an
+// open file interface of the variety defined in util/env.h. Clients open
+// descriptors via the OpenExistingFile() cache method.
+//
+// Descriptors are shared objects; an existing descriptor is handed back to a
+// client if a file with the same name is already opened. To facilitate
+// descriptor sharing, the file cache maintains a by-file-name descriptor map.
+// The values are weak references to the descriptors so that map entries don't
+// affect the descriptor lifecycle.
+//
+// LRU cache
+// ---------
+// The lower half of the file cache is a standard LRU cache whose keys are file
+// names and whose values are pointers to opened file objects allocated on the
+// heap. Unlike the descriptor map, this cache has an upper bound on capacity,
+// and handles are evicted (and closed) according to an LRU algorithm.
+//
+// Whenever a descriptor is used by a client in file I/O, its file name is used
+// in an LRU cache lookup. If found, the underlying file is still open and the
+// file access is performed. Otherwise, the file must have been evicted and
+// closed, so it is reopened and reinserted (possibly evicting a different open
+// file) before the file access is performed.
+//
+// Other notes
+// -----------
+// In a world where files are opened and closed transparently, file deletion
+// demands special care if UNIX semantics are to be preserved. When a call to
+// DeleteFile() is made to a file with an opened descriptor, the descriptor is
+// simply "marked" as to-be-deleted-later. Only when all references to the
+// descriptor are dropped is the file actually deleted. If there is no open
+// descriptor, the file is deleted immediately.
+//
+// Every public method in the file cache is thread safe.
+template <class FileType>
+class FileCache {
+ public:
+ // Creates a new file cache.
+ //
+ // The 'cache_name' is used to disambiguate amongst other file cache
+ // instances. The cache will use 'max_open_files' as a soft upper bound on
+ // the number of files open at any given time.
+ FileCache(const std::string& cache_name,
+ Env* env,
+ int max_open_files,
+ const scoped_refptr<MetricEntity>& entity);
+
+ // Destroys the file cache.
+ ~FileCache();
+
+ // Initializes the file cache. Initialization done here may fail.
+ Status Init();
+
+ // Opens an existing file by name through the cache.
+ //
+ // The returned 'file' is actually an object called a descriptor. It adheres
+ // to a file-like interface but interfaces with the cache under the hood to
+ // reopen a file as needed during file operations.
+ //
+ // The descriptor is opened immediately to verify that the on-disk file can
+ // be opened, but may be closed later if the cache reaches its upper bound on
+ // the number of open files.
+ Status OpenExistingFile(const std::string& file_name,
+ std::shared_ptr<FileType>* file);
+
+ // Deletes a file by name through the cache.
+ //
+ // If there is an outstanding descriptor for the file, the deletion will be
+ // deferred until the last referent is dropped. Otherwise, the file is
+ // deleted immediately.
+ Status DeleteFile(const std::string& file_name);
+
+ // Invalidate the given path in the cache if present. This removes the
+ // path from the cache, and invalidates any previously-opened descriptors
+ // associated with this file.
+ //
+ // If a file with the same path is opened again, the actual path will be opened from
+ // disk.
+ //
+ // This operation should be used during 'rename-to-replace' patterns, eg:
+ //
+ // WriteNewDataTo(tmp_path);
+ // env->RenameFile(tmp_path, p);
+ // file_cache->Invalidate(p);
+ //
+ // NOTE: if any reader of 'p' holds an open descriptor from the cache
+ // prior to this operation, that descriptor is invalidated and any
+ // further operations on that descriptor will result in a CHECK failure.
+ // Hence this is not safe to use without some external synchronization
+ // which prevents concurrent access to the same file.
+ //
+ // NOTE: this function must not be called concurrently on the same file name
+ // from multiple threads.
+ void Invalidate(const std::string& file_name);
+
+ // Returns the number of entries in the descriptor map.
+ //
+ // Only intended for unit tests.
+ int NumDescriptorsForTests() const;
+
+ // Dumps the contents of the file cache. Intended for debugging.
+ std::string ToDebugString() const;
+
+ private:
+ friend class internal::BaseDescriptor<FileType>;
+
+ template<class FileType2>
+ FRIEND_TEST(FileCacheTest, TestBasicOperations);
+
+ // Looks up a descriptor by file name.
+ //
+ // Must be called with 'lock_' held.
+ Status FindDescriptorUnlocked(
+ const std::string& file_name,
+ std::shared_ptr<internal::Descriptor<FileType>>* file);
+
+ // Periodically removes expired descriptors from 'descriptors_'.
+ void RunDescriptorExpiry();
+
+ // Interface to the underlying filesystem.
+ Env* env_;
+
+ // Name of the cache.
+ const std::string cache_name_;
+
+ // Invoked whenever a cached file reaches zero references (i.e. it was
+ // removed from the cache and is no longer in use by any file operations).
+ std::unique_ptr<Cache::EvictionCallback> eviction_cb_;
+
+ // Underlying cache instance. Caches opened files.
+ std::unique_ptr<Cache> cache_;
+
+ // Protects the descriptor map.
+ mutable simple_spinlock lock_;
+
+ // Maps filenames to descriptors.
+ std::unordered_map<std::string,
+ std::weak_ptr<internal::Descriptor<FileType>>> descriptors_;
+
+ // Calls RunDescriptorExpiry() in a loop until 'running_' isn't set.
+ scoped_refptr<Thread> descriptor_expiry_thread_;
+
+ // Tracks whether or not 'descriptor_expiry_thread_' should be running.
+ CountDownLatch running_;
+
+ DISALLOW_COPY_AND_ASSIGN(FileCache);
+};
+
+} // namespace kudu