You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/20 20:44:13 UTC

[15/21] impala git commit: IMPALA-6488: removes use-after-free bug in lib_cache

IMPALA-6488: removes use-after-free bug in lib_cache

Several recent runs have resulted in a boost mutex
invalid argument exception. The mutex in question is
the one that guards individual lib_cache entries
(LibCacheEntry::lock).

The exception is thrown due to the entry being deleted
by another thread prior to the current thread acquiring
the entry lock. This scenario happens when:
1) thread t1 looks up an existing entry e
   a. the lookup is guarded by a cache lock
   b. the cache lock is released (L356)
   c. e's lock is acquired on the next line and propagated
      up the call stack (look for the swaps)
   d. while e is locked, its use-count is incremented.
2) thread t2 deletes entry e
   a. the cache lock is acquired and e is looked up
   b. e's lock is acquired
   c. if e's usecount is 0 and was marked for removal, e is deleted.

If t2 runs following (1b), then t1 will acquire a lock on a deleted
entry (1c), causing the mutex exception.

There are two parts to the fix in this change:
(1) don't crash and (2) don't let concurrency regress.

1) remove 1b: keep the cache lock while acquiring e's lock.
The cache lock is then released and all other operations proceed as before.
Note that current lock ordering is still maintained (coarse to fine),
but now, the cache lock can be held while other threads block access
to the entry lock. When files have been copied, these operations
are short. While files are loading, accesses to the same entry
can serialize access to the entire cache.
2) add cache entry after its loaded.
Currently, on a cache miss, a new entry is created, locked,
added to the cache, and finally the cache is unlocked. The intent
is to allow other threads to concurrently load files for other entries.
However, now that the cache lock is held while the entry lock is held,
the loading thread will block other thread's from acquiring the same entry,
which will block access to the cache. The work-around is to release
the cache lock when loading a new entry and add the cache entry only when
its loaded. The workaround avoids expensive work while holding the
cache lock, but may do wasted work if multiple threads miss on the
same entry and load their entries in parallel.

Testing:
- ran core tests + hdfs
- added an end-to-end test that concurrently uses and drops/creates from
  the same lib-cache entry. with current settings, the use-after-free
  error is reliably reproduced.
- manual testing to examine concurrency of the following cases:
  - concurrent function creation from multiple lib files
    (stresses coordinator)
  - concurrent function use from multiple lib files
    (stresses backend)

Change-Id: I1f178a2114cb3969dbb920949ddff4e8220b742e
Reviewed-on: http://gerrit.cloudera.org:8080/9626
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f301879d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f301879d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f301879d

Branch: refs/heads/2.x
Commit: f301879d512656c07a8d0be965e3baf5e21da1ea
Parents: 5eba80b
Author: Vuk Ercegovac <ve...@cloudera.com>
Authored: Fri Mar 9 18:53:57 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sun Mar 18 21:03:22 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/lib-cache.cc   | 217 +++++++++++++++++++++----------------
 be/src/runtime/lib-cache.h    |  19 +++-
 tests/query_test/test_udfs.py |  73 +++++++++++++
 3 files changed, 215 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/f301879d/be/src/runtime/lib-cache.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.cc b/be/src/runtime/lib-cache.cc
index d694c49..c7ca6cb 100644
--- a/be/src/runtime/lib-cache.cc
+++ b/be/src/runtime/lib-cache.cc
@@ -86,42 +86,44 @@ struct LibCacheEntry {
   // before it is removed to return the same error.
   Status loading_status;
 
-  LibCacheEntry() : use_count(0), should_remove(false), check_needs_refresh(false),
-                    shared_object_handle(NULL) {}
+  LibCacheEntry()
+    : use_count(0),
+      should_remove(false),
+      check_needs_refresh(false),
+      shared_object_handle(nullptr) {}
   ~LibCacheEntry();
 };
 
-LibCache::LibCache() : current_process_handle_(NULL) {
-}
+LibCache::LibCache() : current_process_handle_(nullptr) {}
 
 LibCache::~LibCache() {
   DropCache();
-  if (current_process_handle_ != NULL) DynamicClose(current_process_handle_);
+  if (current_process_handle_ != nullptr) DynamicClose(current_process_handle_);
 }
 
 Status LibCache::Init() {
-  DCHECK(LibCache::instance_.get() == NULL);
+  DCHECK(LibCache::instance_.get() == nullptr);
   LibCache::instance_.reset(new LibCache());
   return LibCache::instance_->InitInternal();
 }
 
 Status LibCache::InitInternal() {
   if (TestInfo::is_fe_test()) {
-    // In the FE tests, NULL gives the handle to the java process.
+    // In the FE tests, nullptr gives the handle to the java process.
     // Explicitly load the fe-support shared object.
     string fe_support_path;
     PathBuilder::GetFullBuildPath("service/libfesupport.so", &fe_support_path);
     RETURN_IF_ERROR(DynamicOpen(fe_support_path.c_str(), &current_process_handle_));
   } else {
-    RETURN_IF_ERROR(DynamicOpen(NULL, &current_process_handle_));
+    RETURN_IF_ERROR(DynamicOpen(nullptr, &current_process_handle_));
   }
-  DCHECK(current_process_handle_ != NULL)
+  DCHECK(current_process_handle_ != nullptr)
       << "We should always be able to get current process handle.";
   return Status::OK();
 }
 
 LibCacheEntry::~LibCacheEntry() {
-  if (shared_object_handle != NULL) {
+  if (shared_object_handle != nullptr) {
     DCHECK_EQ(use_count, 0);
     DCHECK(should_remove);
     DynamicClose(shared_object_handle);
@@ -133,14 +135,14 @@ Status LibCache::GetSoFunctionPtr(const string& hdfs_lib_file, const string& sym
     void** fn_ptr, LibCacheEntry** ent, bool quiet) {
   if (hdfs_lib_file.empty()) {
     // Just loading a function ptr in the current process. No need to take any locks.
-    DCHECK(current_process_handle_ != NULL);
+    DCHECK(current_process_handle_ != nullptr);
     RETURN_IF_ERROR(DynamicLookup(current_process_handle_, symbol.c_str(), fn_ptr, quiet));
     return Status::OK();
   }
 
-  LibCacheEntry* entry = NULL;
+  LibCacheEntry* entry = nullptr;
   unique_lock<mutex> lock;
-  if (ent != NULL && *ent != NULL) {
+  if (ent != nullptr && *ent != nullptr) {
     // Reuse already-cached entry provided by user
     entry = *ent;
     unique_lock<mutex> l(entry->lock);
@@ -148,9 +150,8 @@ Status LibCache::GetSoFunctionPtr(const string& hdfs_lib_file, const string& sym
   } else {
     RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, TYPE_SO, &lock, &entry));
   }
-  DCHECK(entry != NULL);
+  DCHECK(entry != nullptr);
   DCHECK_EQ(entry->type, TYPE_SO);
-
   LibCacheEntry::SymbolMap::iterator it = entry->symbol_cache.find(symbol);
   if (it != entry->symbol_cache.end()) {
     *fn_ptr = it->second;
@@ -160,8 +161,8 @@ Status LibCache::GetSoFunctionPtr(const string& hdfs_lib_file, const string& sym
     entry->symbol_cache[symbol] = *fn_ptr;
   }
 
-  DCHECK(*fn_ptr != NULL);
-  if (ent != NULL && *ent == NULL) {
+  DCHECK(*fn_ptr != nullptr);
+  if (ent != nullptr && *ent == nullptr) {
     // Only set and increment user's entry if it wasn't already cached
     *ent = entry;
     ++(*ent)->use_count;
@@ -170,10 +171,10 @@ Status LibCache::GetSoFunctionPtr(const string& hdfs_lib_file, const string& sym
 }
 
 void LibCache::DecrementUseCount(LibCacheEntry* entry) {
-  if (entry == NULL) return;
+  if (entry == nullptr) return;
   bool can_delete = false;
   {
-    unique_lock<mutex> lock(entry->lock);;
+    unique_lock<mutex> lock(entry->lock);
     --entry->use_count;
     can_delete = (entry->use_count == 0 && entry->should_remove);
   }
@@ -183,9 +184,9 @@ void LibCache::DecrementUseCount(LibCacheEntry* entry) {
 Status LibCache::GetLocalLibPath(const string& hdfs_lib_file, LibType type,
                                  string* local_path) {
   unique_lock<mutex> lock;
-  LibCacheEntry* entry = NULL;
+  LibCacheEntry* entry = nullptr;
   RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, &lock, &entry));
-  DCHECK(entry != NULL);
+  DCHECK(entry != nullptr);
   DCHECK_EQ(entry->type, type);
   *local_path = entry->local_path;
   return Status::OK();
@@ -194,13 +195,13 @@ Status LibCache::GetLocalLibPath(const string& hdfs_lib_file, LibType type,
 Status LibCache::CheckSymbolExists(const string& hdfs_lib_file, LibType type,
     const string& symbol, bool quiet) {
   if (type == TYPE_SO) {
-    void* dummy_ptr = NULL;
-    return GetSoFunctionPtr(hdfs_lib_file, symbol, &dummy_ptr, NULL, quiet);
+    void* dummy_ptr = nullptr;
+    return GetSoFunctionPtr(hdfs_lib_file, symbol, &dummy_ptr, nullptr, quiet);
   } else if (type == TYPE_IR) {
     unique_lock<mutex> lock;
-    LibCacheEntry* entry = NULL;
+    LibCacheEntry* entry = nullptr;
     RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, &lock, &entry));
-    DCHECK(entry != NULL);
+    DCHECK(entry != nullptr);
     DCHECK_EQ(entry->type, TYPE_IR);
     if (entry->symbols.find(symbol) == entry->symbols.end()) {
       stringstream ss;
@@ -212,7 +213,7 @@ Status LibCache::CheckSymbolExists(const string& hdfs_lib_file, LibType type,
   } else if (type == TYPE_JAR) {
     // TODO: figure out how to inspect contents of jars
     unique_lock<mutex> lock;
-    LibCacheEntry* dummy_entry = NULL;
+    LibCacheEntry* dummy_entry = nullptr;
     return GetCacheEntry(hdfs_lib_file, type, &lock, &dummy_entry);
   } else {
     DCHECK(false);
@@ -238,8 +239,8 @@ void LibCache::RemoveEntry(const string& hdfs_lib_file) {
   RemoveEntryInternal(hdfs_lib_file, it);
 }
 
-void LibCache::RemoveEntryInternal(const string& hdfs_lib_file,
-                                   const LibMap::iterator& entry_iter) {
+void LibCache::RemoveEntryInternal(
+    const string& hdfs_lib_file, const LibMap::iterator& entry_iter) {
   LibCacheEntry* entry = entry_iter->second;
   VLOG(1) << "Removing lib cache entry: " << hdfs_lib_file
           << ", local path: " << entry->local_path;
@@ -291,7 +292,7 @@ Status LibCache::GetCacheEntry(const string& hdfs_lib_file, LibType type,
       entry_lock->swap(local_entry_lock);
       return status;
     }
-    if (*entry == NULL) return status;
+    if (*entry == nullptr) return status;
 
     // Set loading_status on the entry so that if another thread calls
     // GetCacheEntry() for this lib before this thread is able to acquire lock_ in
@@ -306,77 +307,111 @@ Status LibCache::GetCacheEntry(const string& hdfs_lib_file, LibType type,
 Status LibCache::GetCacheEntryInternal(const string& hdfs_lib_file, LibType type,
     unique_lock<mutex>* entry_lock, LibCacheEntry** entry) {
   DCHECK(!hdfs_lib_file.empty());
-  *entry = NULL;
+  *entry = nullptr;
 
-  // Check if this file is already cached or an error occured on another thread while
-  // loading the library.
-  unique_lock<mutex> lib_cache_lock(lock_);
-  LibMap::iterator it = lib_cache_.find(hdfs_lib_file);
-  if (it != lib_cache_.end()) {
-    {
-      unique_lock<mutex> local_entry_lock((it->second)->lock);
-      if (!(it->second)->loading_status.ok()) {
-        // If loading_status is already set, the returned *entry should be NULL.
-        DCHECK(*entry == NULL);
-        return (it->second)->loading_status;
-      }
+  // Check if this file is already cached. Refresh the entry if needed.
+  {
+    unique_lock<mutex> lib_cache_lock(lock_);
+    LibMap::iterator it = lib_cache_.find(hdfs_lib_file);
+    if (it != lib_cache_.end()) {
+      RETURN_IF_ERROR(RefreshCacheEntry(hdfs_lib_file, type, it, entry_lock, entry));
+      if (*entry != nullptr) return Status::OK();
     }
+  }
+
+  // Entry didn't exist. Create a new entry and load it. Note that the cache lock is
+  // *not* held and the entry is not added to the cache until it is loaded. Loading is
+  // expensive, so *not* holding the cache lock and *not* making the entry visible to
+  // other threads avoids blocking other threads with an expensive operation.
+  unique_ptr<LibCacheEntry> new_entry = make_unique<LibCacheEntry>();
+  RETURN_IF_ERROR(LoadCacheEntry(hdfs_lib_file, type, new_entry.get()));
 
-    *entry = it->second;
-    if ((*entry)->check_needs_refresh) {
-      // Check if file has been modified since loading the cached copy. If so, remove the
-      // cached entry and create a new one.
-      (*entry)->check_needs_refresh = false;
-      time_t last_mod_time;
-      hdfsFS hdfs_conn;
-      Status status = HdfsFsCache::instance()->GetConnection(hdfs_lib_file, &hdfs_conn);
-      if (!status.ok()) {
-        RemoveEntryInternal(hdfs_lib_file, it);
-        *entry = NULL;
+  // Entry is now loaded. Check that another thread did not already load and add an entry
+  // for the same key. If so, refresh it if needed. If the existing entry is valid, then
+  // use it and discard new_entry.
+  {
+    unique_lock<mutex> lib_cache_lock(lock_);
+    LibMap::iterator it = lib_cache_.find(hdfs_lib_file);
+    if (it != lib_cache_.end()) {
+      Status status = RefreshCacheEntry(hdfs_lib_file, type, it, entry_lock, entry);
+      // The entry lock is held at this point if entry is valid.
+      if (!status.ok() || *entry != nullptr) {
+        // new_entry will be discarded; while wasted work, it avoids holding
+        // the cache lock while loading.
+        new_entry->should_remove = true;
         return status;
       }
-      status = GetLastModificationTime(hdfs_conn, hdfs_lib_file.c_str(), &last_mod_time);
-      if (!status.ok() || (*entry)->last_mod_time < last_mod_time) {
-        RemoveEntryInternal(hdfs_lib_file, it);
-        *entry = NULL;
-      }
-      RETURN_IF_ERROR(status);
     }
+
+    // The entry was not found or was removed for refresh. Use the new entry, so
+    // lock it and add it to the cache.
+    *entry = new_entry.release();
+    unique_lock<mutex> local_entry_lock((*entry)->lock);
+    entry_lock->swap(local_entry_lock);
+    lib_cache_[hdfs_lib_file] = *entry;
   }
+  return Status::OK();
+}
 
-  if (*entry != NULL) {
-    // Release the lib_cache_ lock. This guarantees other threads looking at other
-    // libs can continue.
-    lib_cache_lock.unlock();
+Status LibCache::RefreshCacheEntry(const string& hdfs_lib_file, LibType type,
+    const LibMap::iterator& iter, unique_lock<mutex>* entry_lock, LibCacheEntry** entry) {
+  // Check if an error occurred on another thread while loading the library.
+  {
+    unique_lock<mutex> local_entry_lock((iter->second)->lock);
+    if (!(iter->second)->loading_status.ok()) {
+      // If loading_status is already set, the returned *entry should be nullptr.
+      DCHECK(*entry == nullptr);
+      return (iter->second)->loading_status;
+    }
+  }
+
+  // Refresh the cache entry if needed. If refreshed or an error occurred, remove
+  // the entry and set the returned entry to nullptr.
+  *entry = iter->second;
+  if ((*entry)->check_needs_refresh) {
+    // Check if file has been modified since loading the cached copy. If so, remove the
+    // cached entry and create a new one.
+    (*entry)->check_needs_refresh = false;
+    time_t last_mod_time;
+    hdfsFS hdfs_conn;
+    Status status = HdfsFsCache::instance()->GetConnection(hdfs_lib_file, &hdfs_conn);
+    if (!status.ok()) {
+      RemoveEntryInternal(hdfs_lib_file, iter);
+      *entry = nullptr;
+      return status;
+    }
+    status = GetLastModificationTime(hdfs_conn, hdfs_lib_file.c_str(), &last_mod_time);
+    if (!status.ok() || (*entry)->last_mod_time < last_mod_time) {
+      RemoveEntryInternal(hdfs_lib_file, iter);
+      *entry = nullptr;
+    }
+    RETURN_IF_ERROR(status);
+  }
+
+  // No refresh needed, the entry can be used.
+  if (*entry != nullptr) {
+    // The cache level lock continues to be held while the entry lock is obtained
+    // so that some other thread does not access the entry and delete it.
     unique_lock<mutex> local_entry_lock((*entry)->lock);
     entry_lock->swap(local_entry_lock);
 
+    // Let the caller propagate any error that occurred when loading the entry.
     RETURN_IF_ERROR((*entry)->copy_file_status);
     DCHECK_EQ((*entry)->type, type);
     DCHECK(!(*entry)->local_path.empty());
-    return Status::OK();
   }
+  return Status::OK();
+}
 
-  // Entry didn't exist. Add the entry then release lock_ (so other libraries
-  // can be accessed).
-  *entry = new LibCacheEntry();
-
-  // Grab the entry lock before adding it to lib_cache_. We still need to do more
-  // work to initialize *entry and we don't want another thread to pick up
-  // the uninitialized entry.
-  unique_lock<mutex> local_entry_lock((*entry)->lock);
-  entry_lock->swap(local_entry_lock);
-  lib_cache_[hdfs_lib_file] = *entry;
-  lib_cache_lock.unlock();
-
-  // At this point we have the entry lock but not the lib cache lock.
-  DCHECK(*entry != NULL);
-  (*entry)->type = type;
+Status LibCache::LoadCacheEntry(
+    const std::string& hdfs_lib_file, LibType type, LibCacheEntry* entry) {
+  DCHECK(entry != nullptr);
+  entry->type = type;
 
   // Copy the file
-  (*entry)->local_path = MakeLocalPath(hdfs_lib_file, FLAGS_local_library_dir);
+  entry->local_path = MakeLocalPath(hdfs_lib_file, FLAGS_local_library_dir);
   VLOG(1) << "Adding lib cache entry: " << hdfs_lib_file
-          << ", local path: " << (*entry)->local_path;
+          << ", local path: " << entry->local_path;
 
   hdfsFS hdfs_conn, local_conn;
   RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(hdfs_lib_file, &hdfs_conn));
@@ -385,28 +420,26 @@ Status LibCache::GetCacheEntryInternal(const string& hdfs_lib_file, LibType type
   // Note: the file can be updated between getting last_mod_time and copying the file to
   // local_path. This can only result in the file unnecessarily being refreshed, and does
   // not affect correctness.
-  (*entry)->copy_file_status = GetLastModificationTime(
-      hdfs_conn, hdfs_lib_file.c_str(), &(*entry)->last_mod_time);
-  RETURN_IF_ERROR((*entry)->copy_file_status);
+  entry->copy_file_status =
+      GetLastModificationTime(hdfs_conn, hdfs_lib_file.c_str(), &entry->last_mod_time);
+  RETURN_IF_ERROR(entry->copy_file_status);
 
-  (*entry)->copy_file_status = CopyHdfsFile(
-      hdfs_conn, hdfs_lib_file, local_conn, (*entry)->local_path);
-  RETURN_IF_ERROR((*entry)->copy_file_status);
+  entry->copy_file_status =
+      CopyHdfsFile(hdfs_conn, hdfs_lib_file, local_conn, entry->local_path);
+  RETURN_IF_ERROR(entry->copy_file_status);
 
   if (type == TYPE_SO) {
     // dlopen the local library
-    RETURN_IF_ERROR(
-        DynamicOpen((*entry)->local_path.c_str(), &(*entry)->shared_object_handle));
+    RETURN_IF_ERROR(DynamicOpen(entry->local_path.c_str(), &entry->shared_object_handle));
   } else if (type == TYPE_IR) {
     // Load the module temporarily and populate all symbols.
-    const string file = (*entry)->local_path;
+    const string file = entry->local_path;
     const string module_id = filesystem::path(file).stem().string();
-    RETURN_IF_ERROR(LlvmCodeGen::GetSymbols(file, module_id, &(*entry)->symbols));
+    RETURN_IF_ERROR(LlvmCodeGen::GetSymbols(file, module_id, &entry->symbols));
   } else {
     DCHECK_EQ(type, TYPE_JAR);
     // Nothing to do.
   }
-
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/f301879d/be/src/runtime/lib-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h
index 4a564ee..b1b70b1 100644
--- a/be/src/runtime/lib-cache.h
+++ b/be/src/runtime/lib-cache.h
@@ -151,6 +151,21 @@ class LibCache {
   Status GetCacheEntryInternal(const std::string& hdfs_lib_file, LibType type,
       boost::unique_lock<boost::mutex>* entry_lock, LibCacheEntry** entry);
 
+  /// Returns iter's cache entry in 'entry' with 'entry_lock' held if entry does not
+  /// need to be refreshed.
+  /// If entry needs to be refreshed, then it is removed and '*entry' is set to nullptr.
+  /// The cache lock must be held prior to calling this method. On return the entry's
+  /// lock is taken and returned in '*entry_lock' if entry does not need to be refreshed.
+  /// TODO: cleanup this method's interface and how the outputs are used.
+  Status RefreshCacheEntry(const std::string& hdfs_lib_file, LibType type,
+      const LibMap::iterator& iter, boost::unique_lock<boost::mutex>* entry_lock,
+      LibCacheEntry** entry);
+
+  /// 'hdfs_lib_file' is copied locally and 'entry' is initialized with its contents.
+  /// No locks are assumed held; 'entry' should be visible only to a single thread.
+  Status LoadCacheEntry(
+      const std::string& hdfs_lib_file, LibType type, LibCacheEntry* entry);
+
   /// Utility function for generating a filename unique to this process and
   /// 'hdfs_path'. This is to prevent multiple impalad processes or different library files
   /// with the same name from clobbering each other. 'hdfs_path' should be the full path
@@ -160,8 +175,8 @@ class LibCache {
 
   /// Implementation to remove an entry from the cache.
   /// lock_ must be held. The entry's lock should not be held.
-  void RemoveEntryInternal(const std::string& hdfs_lib_file,
-                           const LibMap::iterator& entry_iterator);
+  void RemoveEntryInternal(
+      const std::string& hdfs_lib_file, const LibMap::iterator& entry_iterator);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/f301879d/tests/query_test/test_udfs.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 5252363..17890a4 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -302,6 +302,79 @@ class TestUdfExecution(TestUdfBase):
       self.run_test_case('QueryTest/udf-non-deterministic', vector,
           use_db=unique_database)
 
+  def test_native_functions_race(self, vector, unique_database):
+    """ IMPALA-6488: stress concurrent adds, uses, and deletes of native functions.
+        Exposes a crash caused by use-after-free in lib-cache."""
+
+    # Native function used by a query. Stresses lib-cache during analysis and
+    # backend expressions.
+    create_fn_to_use = """create function {0}.use_it(string) returns string
+                          LOCATION '{1}'
+                          SYMBOL='_Z8IdentityPN10impala_udf15FunctionContextERKNS_9StringValE'"""
+    use_fn = """select * from (select max(int_col) from functional.alltypesagg
+                where {0}.use_it(string_col) = 'blah' union all
+                (select max(int_col) from functional.alltypesagg
+                 where {0}.use_it(String_col) > '1' union all
+                (select max(int_col) from functional.alltypesagg
+                 where {0}.use_it(string_col) > '1'))) v"""
+    # Reference to another native function from the same 'so' file. Creating/dropping
+    # stresses lib-cache lookup, add, and refresh.
+    create_another_fn = """create function if not exists {0}.other(float)
+                           returns float location '{1}' symbol='Identity'"""
+    drop_another_fn = """drop function if exists {0}.other(float)"""
+
+    setup_client = self.create_impala_client()
+    setup_query = create_fn_to_use.format(unique_database, '/test-warehouse/libTestUdfs.so')
+    try:
+      setup_client.execute(setup_query)
+    except Exception as e:
+      print "Unable to create initial function: {0}".format(setup_query)
+      raise
+
+    errors = []
+    def use_fn_method():
+      time.sleep(1 + random.random())
+      client = self.create_impala_client()
+      query = use_fn.format(unique_database)
+      try:
+        client.execute(query)
+      except Exception as e:
+        errors.append(e)
+
+    def load_fn_method():
+      time.sleep(1 + random.random())
+      client = self.create_impala_client()
+      drop = drop_another_fn.format(unique_database)
+      create = create_another_fn.format(unique_database, '/test-warehouse/libTestUdfs.so')
+      try:
+        client.execute(drop)
+        client.execute(create)
+      except Exception as e:
+        errors.append(e)
+
+    # number of uses/loads needed to reliably reproduce the bug.
+    num_uses = 200
+    num_loads = 200
+
+    # create threads to use native function.
+    runner_threads = []
+    for i in xrange(num_uses):
+      runner_threads.append(threading.Thread(target=use_fn_method))
+
+    # create threads to drop/create native functions.
+    for i in xrange(num_loads):
+      runner_threads.append(threading.Thread(target=load_fn_method))
+
+    # launch all runner threads.
+    for t in runner_threads: t.start()
+
+    # join all threads.
+    for t in runner_threads: t.join();
+
+    # Check for any errors.
+    for e in errors: print e
+    assert len(errors) == 0
+
   def test_ir_functions(self, vector, unique_database):
     if vector.get_value('exec_option')['disable_codegen']:
       # IR functions require codegen to be enabled.