You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/06/09 00:12:29 UTC

kudu git commit: log_block_manager: fix corruption after re-opening compacted metadata

Repository: kudu
Updated Branches:
  refs/heads/branch-1.4.x 2496aac3b -> 1ed03d710


log_block_manager: fix corruption after re-opening compacted metadata

This fixes an issue discovered on a cluster due to the following
sequence of events:

- a block manager compacts a metadata file while starting up
- when it reopens the metadata file after replacing it with the
  compacted one, it gets a file_cache hit. Thus, the WritablePBContainer
  continues to write to the _deleted_ file instead of the compacted one.
  Metadata entries at this point are lost (which could cause block loss
  in the case of lost CREATE records, or dangling blocks in the case of
  lost DELETEs)
- if the server continues to run for a while, the FD will be evicted
  from the cache and eventually re-opened. At that point, a further
  DELETE record could end up writing to an offset past the end of the
  file, since the write offset was incremented by the "lost" records
  above.
- on the next restart, the metadata file would have a "gap" of zero
  bytes, which would surface as a checksum failure and failure to start
  up.

The fix is relatively simple: when we replace the metadata file we need
to invalidate and evict the cache entry so that when we "reopen", it
actually starts appending to the _new_ file and not the old deleted one.

The bulk of the changes here are to tests:
- the stress test now enforces a minimum number of live blocks before it
  starts deleting them. It also more aggressively compacts, and has a
  smaller cache. With these changes, I was sometimes able to reproduce
  the issue.
- A more targeted test issues a canned sequence of block creations and
  deletions that can reliably reproduce the above issue.

Change-Id: I491eacbad4750efedea854a2cc35b8ec994f9077
Reviewed-on: http://gerrit.cloudera.org:8080/7113
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-on: http://gerrit.cloudera.org:8080/7125


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1ed03d71
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1ed03d71
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1ed03d71

Branch: refs/heads/branch-1.4.x
Commit: 1ed03d710ebf06afa35ee0f7813649d661f90829
Parents: 2496aac
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Jun 7 20:01:17 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Fri Jun 9 00:11:37 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/block_manager-stress-test.cc |  41 ++++++---
 src/kudu/fs/log_block_manager-test.cc    |  56 +++++++++++++
 src/kudu/fs/log_block_manager.cc         |  12 ++-
 src/kudu/util/file_cache-test.cc         |  34 ++++++++
 src/kudu/util/file_cache.cc              | 114 +++++++++++++++-----------
 src/kudu/util/file_cache.h               |  23 ++++++
 6 files changed, 216 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1ed03d71/src/kudu/fs/block_manager-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-stress-test.cc b/src/kudu/fs/block_manager-stress-test.cc
index ed5b969..899b717 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -42,10 +42,13 @@ DECLARE_int64(block_manager_max_open_files);
 DECLARE_uint64(log_container_max_size);
 DECLARE_uint64(log_container_preallocate_bytes);
 
-DEFINE_int32(test_duration_secs, 2, "Number of seconds to run the test");
+DEFINE_double(test_duration_secs, 2, "Number of seconds to run the test");
 DEFINE_int32(num_writer_threads, 4, "Number of writer threads to run");
 DEFINE_int32(num_reader_threads, 8, "Number of reader threads to run");
 DEFINE_int32(num_deleter_threads, 1, "Number of deleter threads to run");
+DEFINE_int32(minimum_live_blocks_for_delete, 1000,
+             "If there are fewer than this number of live blocks, the deleter "
+             "threads will not delete any");
 DEFINE_int32(block_group_size, 8, "Number of blocks to write per block "
              "group. Must be power of 2");
 DEFINE_int32(block_group_bytes, 32 * 1024,
@@ -102,13 +105,13 @@ class BlockManagerStressTest : public KuduTest {
     FLAGS_log_container_preallocate_bytes = 1 * 1024 * 1024;
 
     // Ensure the file cache is under stress too.
-    FLAGS_block_manager_max_open_files = 512;
+    FLAGS_block_manager_max_open_files = 32;
 
     // Maximize the amount of cleanup triggered by the extra space heuristic.
     FLAGS_log_container_excess_space_before_cleanup_fraction = 0.0;
 
     // Compact block manager metadata aggressively.
-    FLAGS_log_container_live_metadata_before_compact_ratio = 0.80;
+    FLAGS_log_container_live_metadata_before_compact_ratio = 0.99;
 
     if (FLAGS_block_manager_paths.empty()) {
       data_dirs_.push_back(test_dir_);
@@ -143,7 +146,7 @@ class BlockManagerStressTest : public KuduTest {
     return new T(env_, opts);
   }
 
-  void RunTest(int secs) {
+  void RunTest(double secs) {
     LOG(INFO) << "Starting all threads";
     this->StartThreads();
     SleepFor(MonoDelta::FromSeconds(secs));
@@ -379,8 +382,15 @@ void BlockManagerStressTest<T>::DeleterThread() {
     // Grab a block at random.
     BlockId to_delete;
     {
-      std::lock_guard<simple_spinlock> l(lock_);
-      if (written_blocks_.empty()) {
+      std::unique_lock<simple_spinlock> l(lock_);
+      // If we only have a small number of live blocks, don't delete any.
+      // This ensures that, when we restart, we always have a reasonable
+      // amount of data -- otherwise the deletion threads are likely to
+      // "keep up" with the writer threads and every restart will consist
+      // of a very small number of non-dead containers.
+      if (written_blocks_.size() < FLAGS_minimum_live_blocks_for_delete) {
+        l.unlock();
+        SleepFor(MonoDelta::FromMilliseconds(10));
         continue;
       }
 
@@ -450,6 +460,8 @@ TYPED_TEST(BlockManagerStressTest, StressTest) {
   OverrideFlagForSlowTests("block_group_size", "16");
   OverrideFlagForSlowTests("num_inconsistencies", "128");
 
+  const int kNumStarts = 3;
+
   if ((FLAGS_block_group_size & (FLAGS_block_group_size - 1)) != 0) {
     LOG(FATAL) << "block_group_size " << FLAGS_block_group_size
                << " is not a power of 2";
@@ -459,14 +471,17 @@ TYPED_TEST(BlockManagerStressTest, StressTest) {
 
   LOG(INFO) << "Running on fresh block manager";
   checker.Start();
-  this->RunTest(FLAGS_test_duration_secs / 2);
+  this->RunTest(FLAGS_test_duration_secs / kNumStarts);
   NO_FATALS(this->InjectNonFatalInconsistencies());
-  LOG(INFO) << "Running on populated block manager";
-  this->bm_.reset(this->CreateBlockManager());
-  FsReport report;
-  ASSERT_OK(this->bm_->Open(&report));
-  ASSERT_OK(report.LogAndCheckForFatalErrors());
-  this->RunTest(FLAGS_test_duration_secs / 2);
+
+  for (int i = 1; i < kNumStarts; i++) {
+    LOG(INFO) << "Running on populated block manager (restart #" << i << ")";
+    this->bm_.reset(this->CreateBlockManager());
+    FsReport report;
+    ASSERT_OK(this->bm_->Open(&report));
+    ASSERT_OK(report.LogAndCheckForFatalErrors());
+    this->RunTest(FLAGS_test_duration_secs / kNumStarts);
+  }
   checker.Stop();
 
   LOG(INFO) << "Printing test totals";

http://git-wip-us.apache.org/repos/asf/kudu/blob/1ed03d71/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index 5a57552..47b4138 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -44,6 +44,8 @@ using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
+DECLARE_int64(block_manager_max_open_files);
+DECLARE_bool(cache_force_single_shard);
 DECLARE_double(log_container_excess_space_before_cleanup_fraction);
 DECLARE_double(log_container_live_metadata_before_compact_ratio);
 DECLARE_int64(log_container_max_blocks);
@@ -1188,5 +1190,59 @@ TEST_F(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup) {
   ASSERT_EQ(last_live_aligned_bytes, report.stats.live_block_bytes_aligned);
 }
 
+// Regression test for a bug in which, after a metadata file was compacted,
+// we would not properly handle appending to the new (post-compaction) metadata.
+//
+// The bug was related to a stale file descriptor left in the file_cache, so
+// this test explicitly targets that scenario.
+TEST_F(LogBlockManagerTest, TestDeleteFromContainerAfterMetadataCompaction) {
+  // Compact aggressively.
+  FLAGS_log_container_live_metadata_before_compact_ratio = 0.99;
+  // Use a small file cache (smaller than the number of containers).
+  FLAGS_block_manager_max_open_files = 50;
+  // Use a single shard so that we have an accurate max cache capacity
+  // regardless of the number of cores on the machine.
+  FLAGS_cache_force_single_shard = true;
+  // Use very small containers, so that we generate a lot of them (and thus
+  // consume a lot of file descriptors).
+  FLAGS_log_container_max_blocks = 4;
+  // Reopen so the flags take effect.
+  ASSERT_OK(ReopenBlockManager(nullptr));
+
+  // Create many container with a bunch of blocks, half of which are deleted.
+  vector<BlockId> block_ids;
+  for (int i = 0; i < 1000; i++) {
+    unique_ptr<WritableBlock> block;
+    ASSERT_OK(bm_->CreateBlock(&block));
+    ASSERT_OK(block->Close());
+    if (i % 2 == 1) {
+      ASSERT_OK(bm_->DeleteBlock(block->id()));
+    } else {
+      block_ids.emplace_back(block->id());
+    }
+  }
+
+  // Reopen the block manager. This will cause it to compact all of the metadata
+  // files, since we've deleted half the blocks in every container and the
+  // threshold is set high above.
+  FsReport report;
+  ASSERT_OK(ReopenBlockManager(&report));
+
+  // Delete the remaining blocks in a random order. This will append to metadata
+  // files which have just been compacted. Since we have more metadata files than
+  // we have file_cache capacity, this will also generate a mix of cache hits,
+  // misses, and re-insertions.
+  std::random_shuffle(block_ids.begin(), block_ids.end());
+  for (const BlockId& b : block_ids) {
+    ASSERT_OK(bm_->DeleteBlock(b));
+  }
+
+  // Reopen to make sure that the metadata can be properly loaded and
+  // that the resulting block manager is empty.
+  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_EQ(0, report.stats.live_block_count);
+  ASSERT_EQ(0, report.stats.live_block_bytes_aligned);
+}
+
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/1ed03d71/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 48fb910..2ac1f9b 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -2258,9 +2258,8 @@ Status LogBlockManager::Repair(
 
     // Rewrite this metadata file. Failures are non-fatal.
     int64_t file_bytes_delta;
-    Status s = RewriteMetadataFile(StrCat(e.first, kContainerMetadataFileSuffix),
-                                   e.second,
-                                   &file_bytes_delta);
+    const auto& meta_path = StrCat(e.first, kContainerMetadataFileSuffix);
+    Status s = RewriteMetadataFile(meta_path, e.second, &file_bytes_delta);
     if (!s.ok()) {
       WARN_NOT_OK(s, "could not rewrite metadata file");
       continue;
@@ -2272,6 +2271,9 @@ Status LogBlockManager::Repair(
 
     metadata_files_compacted++;
     metadata_bytes_delta += file_bytes_delta;
+    VLOG(1) << "Compacted metadata file " << meta_path
+            << " (saved " << file_bytes_delta << " bytes)";
+
   }
 
   // The data directory can be synchronized once for all of the new metadata files.
@@ -2325,6 +2327,10 @@ Status LogBlockManager::RewriteMetadataFile(const string& metadata_file_name,
                         "could not get file size of temporary metadata file");
   RETURN_NOT_OK_PREPEND(env_->RenameFile(tmp_file_name, metadata_file_name),
                         "could not rename temporary metadata file");
+  // Evict the old path from the file cache, so that when we re-open the new
+  // metadata file for write, we don't accidentally get a cache hit on the
+  // old file descriptor pointing to the now-deleted old version.
+  file_cache_.Invalidate(metadata_file_name);
 
   tmp_deleter.Cancel();
   *file_bytes_delta = (static_cast<int64_t>(old_metadata_size) - new_metadata_size);

http://git-wip-us.apache.org/repos/asf/kudu/blob/1ed03d71/src/kudu/util/file_cache-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/file_cache-test.cc b/src/kudu/util/file_cache-test.cc
index 5ac568e..1752bd2 100644
--- a/src/kudu/util/file_cache-test.cc
+++ b/src/kudu/util/file_cache-test.cc
@@ -223,6 +223,40 @@ TYPED_TEST(FileCacheTest, TestDeletion) {
   ASSERT_EQ(this->initial_open_fds_, CountOpenFds(this->env_));
 }
 
+TYPED_TEST(FileCacheTest, TestInvalidation) {
+  const string kFile1 = this->GetTestPath("foo");
+  const string kData1 = "test data 1";
+  ASSERT_OK(this->WriteTestFile(kFile1, kData1));
+
+  // Open the file.
+  shared_ptr<TypeParam> f;
+  ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f));
+
+  // Write a new file and rename it in place on top of file1.
+  const string kFile2 = this->GetTestPath("foo2");
+  const string kData2 = "test data 2 (longer than original)";
+  ASSERT_OK(this->WriteTestFile(kFile2, kData2));
+  ASSERT_OK(this->env_->RenameFile(kFile2, kFile1));
+
+  // We should still be able to access the file, since it has a cached fd.
+  uint64_t size;
+  ASSERT_OK(f->Size(&size));
+  ASSERT_EQ(kData1.size(), size);
+
+  // If we invalidate it from the cache and try again, it should crash because
+  // the existing descriptor was invalidated.
+  this->cache_->Invalidate(kFile1);
+  ASSERT_DEATH({ f->Size(&size); }, "invalidated");
+
+  // But if we re-open the path again, the new descriptor should read the
+  // new data.
+  shared_ptr<TypeParam> f2;
+  ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f2));
+  ASSERT_OK(f2->Size(&size));
+  ASSERT_EQ(kData2.size(), size);
+}
+
+
 TYPED_TEST(FileCacheTest, TestHeavyReads) {
   const int kNumFiles = 20;
   const int kNumIterations = 100;

http://git-wip-us.apache.org/repos/asf/kudu/blob/1ed03d71/src/kudu/util/file_cache.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/file_cache.cc b/src/kudu/util/file_cache.cc
index b8d17cf..07babad 100644
--- a/src/kudu/util/file_cache.cc
+++ b/src/kudu/util/file_cache.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/util/file_cache.h"
 
+#include <atomic>
 #include <memory>
 #include <mutex>
 #include <string>
@@ -96,7 +97,7 @@ class BaseDescriptor {
     // the next call to RunDescriptorExpiry(). Removing it here would risk a
     // deadlock on recursive acquisition of 'lock_'.
 
-    if (deleted_) {
+    if (deleted()) {
       cache()->Erase(filename());
 
       VLOG(1) << "Deleting file: " << filename();
@@ -136,8 +137,21 @@ class BaseDescriptor {
 
   // Mark this descriptor as to-be-deleted later.
   void MarkDeleted() {
-    DCHECK(!deleted_);
-    deleted_ = true;
+    DCHECK(!deleted());
+    while (true) {
+      auto v = flags_.load();
+      if (flags_.compare_exchange_weak(v, v | FILE_DELETED)) return;
+    }
+  }
+
+  // Mark this descriptor as invalidated. No further access is allowed
+  // to this file.
+  void MarkInvalidated() {
+    DCHECK(!invalidated());
+    while (true) {
+      auto v = flags_.load();
+      if (flags_.compare_exchange_weak(v, v | INVALIDATED)) return;
+    }
   }
 
   Cache* cache() const { return file_cache_->cache_.get(); }
@@ -146,13 +160,17 @@ class BaseDescriptor {
 
   const string& filename() const { return file_name_; }
 
-  bool deleted() const { return deleted_; }
+  bool deleted() const { return flags_.load() & FILE_DELETED; }
+  bool invalidated() const { return flags_.load() & INVALIDATED; }
 
  private:
   FileCache<FileType>* file_cache_;
   const string file_name_;
-
-  bool deleted_ = false;
+  enum Flags {
+    FILE_DELETED = 1 << 0,
+    INVALIDATED = 1 << 1
+  };
+  std::atomic<uint8_t> flags_ {0};
 
   DISALLOW_COPY_AND_ASSIGN(BaseDescriptor);
 };
@@ -296,6 +314,7 @@ class Descriptor<RWFile> : public RWFile {
 
   Status ReopenFileIfNecessary(ScopedOpenedDescriptor<RWFile>* out) const {
     ScopedOpenedDescriptor<RWFile> found(base_.LookupFromCache());
+    CHECK(!base_.invalidated());
     if (found.opened()) {
       // The file is already open in the cache, return it.
       if (out) {
@@ -398,6 +417,7 @@ class Descriptor<RandomAccessFile> : public RandomAccessFile {
   Status ReopenFileIfNecessary(
       ScopedOpenedDescriptor<RandomAccessFile>* out) const {
     ScopedOpenedDescriptor<RandomAccessFile> found(base_.LookupFromCache());
+    CHECK(!base_.invalidated());
     if (found.opened()) {
       // The file is already open in the cache, return it.
       if (out) {
@@ -479,7 +499,7 @@ Status FileCache<FileType>::OpenExistingFile(const string& file_name,
   // Check that the underlying file can be opened (no-op for found
   // descriptors). Done outside the lock.
   RETURN_NOT_OK(desc->Init());
-  *file = desc;
+  *file = std::move(desc);
   return Status::OK();
 }
 
@@ -506,6 +526,42 @@ Status FileCache<FileType>::DeleteFile(const string& file_name) {
 }
 
 template <class FileType>
+void FileCache<FileType>::Invalidate(const string& file_name) {
+  // Ensure that there is an invalidated descriptor in the map for this filename.
+  //
+  // This ensures that any concurrent OpenExistingFile() during this method wil
+  // see the invalidation and issue a CHECK failure.
+  shared_ptr<internal::Descriptor<FileType>> desc;
+  {
+    // Find an existing descriptor, or create one if none exists.
+    std::lock_guard<simple_spinlock> l(lock_);
+    auto it = descriptors_.find(file_name);
+    if (it != descriptors_.end()) {
+      desc = it->second.lock();
+    }
+    if (!desc) {
+      desc = std::make_shared<internal::Descriptor<FileType>>(this, file_name);
+      descriptors_.emplace(file_name, desc);
+    }
+
+    desc->base_.MarkInvalidated();
+  }
+  // Remove it from the cache so that if the same path is opened again, we
+  // will re-open a new FD rather than retrieving one that might have been
+  // cached prior to invalidation.
+  cache_->Erase(file_name);
+
+  // Remove the invalidated descriptor from the map. We are guaranteed it
+  // is still there because we've held a strong reference to it for
+  // the duration of this method, and no other methods erase strong
+  // references from the map.
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    CHECK_EQ(1, descriptors_.erase(file_name));
+  }
+}
+
+template <class FileType>
 int FileCache<FileType>::NumDescriptorsForTests() const {
   std::lock_guard<simple_spinlock> l(lock_);
   return descriptors_.size();
@@ -552,6 +608,7 @@ Status FileCache<FileType>::FindDescriptorUnlocked(
     // Found the descriptor. Has it expired?
     shared_ptr<internal::Descriptor<FileType>> desc = it->second.lock();
     if (desc) {
+      CHECK(!desc->base_.invalidated());
       if (desc->base_.deleted()) {
         return Status::NotFound("File already marked for deletion", file_name);
       }
@@ -584,46 +641,7 @@ void FileCache<FileType>::RunDescriptorExpiry() {
 }
 
 // Explicit specialization for callers outside this compilation unit.
-template
-FileCache<RWFile>::FileCache(
-    const string& cache_name,
-    Env* env,
-    int max_open_files,
-    const scoped_refptr<MetricEntity>& entity);
-template
-FileCache<RWFile>::~FileCache();
-template
-Status FileCache<RWFile>::Init();
-template
-Status FileCache<RWFile>::OpenExistingFile(
-    const string& file_name,
-    shared_ptr<RWFile>* file);
-template
-Status FileCache<RWFile>::DeleteFile(const string& file_name);
-template
-int FileCache<RWFile>::NumDescriptorsForTests() const;
-template
-string FileCache<RWFile>::ToDebugString() const;
-
-template
-FileCache<RandomAccessFile>::FileCache(
-    const string& cache_name,
-    Env* env,
-    int max_open_files,
-    const scoped_refptr<MetricEntity>& entity);
-template
-FileCache<RandomAccessFile>::~FileCache();
-template
-Status FileCache<RandomAccessFile>::Init();
-template
-Status FileCache<RandomAccessFile>::OpenExistingFile(
-    const string& file_name,
-    shared_ptr<RandomAccessFile>* file);
-template
-Status FileCache<RandomAccessFile>::DeleteFile(const string& file_name);
-template
-int FileCache<RandomAccessFile>::NumDescriptorsForTests() const;
-template
-string FileCache<RandomAccessFile>::ToDebugString() const;
+template class FileCache<RWFile>;
+template class FileCache<RandomAccessFile>;
 
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/1ed03d71/src/kudu/util/file_cache.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/file_cache.h b/src/kudu/util/file_cache.h
index 3ef87ad..44330d3 100644
--- a/src/kudu/util/file_cache.h
+++ b/src/kudu/util/file_cache.h
@@ -130,6 +130,29 @@ class FileCache {
   // deleted immediately.
   Status DeleteFile(const std::string& file_name);
 
+  // Invalidate the given path in the cache if present. This removes the
+  // path from the cache, and invalidates any previously-opened descriptors
+  // associated with this file.
+  //
+  // If a file with the same path is opened again, the actual path will be opened from
+  // disk.
+  //
+  // This operation should be used during 'rename-to-replace' patterns, eg:
+  //
+  //    WriteNewDataTo(tmp_path);
+  //    env->RenameFile(tmp_path, p);
+  //    file_cache->Invalidate(p);
+  //
+  // NOTE: if any reader of 'p' holds an open descriptor from the cache
+  // prior to this operation, that descriptor is invalidated and any
+  // further operations on that descriptor will result in a CHECK failure.
+  // Hence this is not safe to use without some external synchronization
+  // which prevents concurrent access to the same file.
+  //
+  // NOTE: this function must not be called concurrently on the same file name
+  // from multiple threads.
+  void Invalidate(const std::string& file_name);
+
   // Returns the number of entries in the descriptor map.
   //
   // Only intended for unit tests.