You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/26 16:16:14 UTC

[2/2] impala git commit: IMPALA-6670: refresh lib-cache entries from plan

IMPALA-6670: refresh lib-cache entries from plan

When an impalad is in executor-only mode, it receives no
catalog updates. As a result, lib-cache entries are never
refreshed. A consequence is that udf queries can return
incorrect results or may not run due to resolution issues.
Both cases are caused by the executor using a stale copy
of the lib file. For incorrect results, an old version of
the method may be used. Resolution issues can come up if
a method is added to a lib file.

The solution in this change is to capture the coordinator's
view of the lib file's last modified time when planning.
This last modified time is then shipped with the plan to
executors. Executors must then use both the lib file path
and the last modified time as a key for the lib-cache.
If the coordinator's last modified time is more recent than
the executor's lib-cache entry, then the entry is refreshed.

Brief discussion of alternatives:

- lib-cache always checks last modified time
  + easy/local change to lib-cache
  - adds an fs lookup always. rejected for this reason

- keep the last modified time in the catalog
  - bound on staleness is too loose. consider the case where
    fn's f1, f2, f3 are created with last modified times of
    t1, t2, t3. treat the fn's last modified time as a low-watermark;
    if the cache entry has a more recent time, use it. Such a scheme
    would allow the version at t2 to persist. An old fn may keep the
    state from converging to the latest. This could end up with strange
    cases where different versions of the lib are used across executors
    for a single query.

    In contrast, the change in this path relies on the statestore to
    push versions forward at all coordinators, so will push all
    versions at all caches forward as well.

Testing:
- added an e2e custom cluster test

Change-Id: Icf740ea8c6a47e671427d30b4d139cb8507b7ff6
Reviewed-on: http://gerrit.cloudera.org:8080/9697
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/66ca5db7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/66ca5db7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/66ca5db7

Branch: refs/heads/2.x
Commit: 66ca5db71752f0d0dcf76128664dbf73fdcfb0a2
Parents: ff8db12
Author: Vuk Ercegovac <ve...@cloudera.com>
Authored: Fri Mar 16 00:45:30 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Mar 24 05:10:18 2018 +0000

----------------------------------------------------------------------
 be/src/codegen/codegen-callgraph.cc             |   3 +-
 be/src/codegen/llvm-codegen-test.cc             |   2 +-
 be/src/codegen/llvm-codegen.cc                  |  10 +-
 be/src/codegen/llvm-codegen.h                   |   5 +-
 be/src/exec/external-data-source-executor.cc    |   4 +-
 be/src/exprs/agg-fn.cc                          |  17 +--
 be/src/exprs/hive-udf-call.cc                   |   2 +-
 be/src/exprs/scalar-fn-call.cc                  |   6 +-
 be/src/runtime/lib-cache.cc                     |  81 ++++++++----
 be/src/runtime/lib-cache.h                      |  53 ++++++--
 be/src/service/fe-support.cc                    |  24 ++--
 common/thrift/Frontend.thrift                   |  11 +-
 common/thrift/Types.thrift                      |   7 +
 common/thrift/generate_error_codes.py           |   4 +
 .../java/org/apache/impala/analysis/Expr.java   |   5 +-
 .../impala/catalog/AggregateFunction.java       |   9 ++
 .../org/apache/impala/catalog/Function.java     |  58 +++++++--
 .../apache/impala/catalog/ScalarFunction.java   |   8 ++
 .../impala/analysis/AnalyzeExprsTest.java       |  77 +++++++++++
 testdata/bin/copy-udfs-udas.sh                  |   6 +
 tests/custom_cluster/test_coordinators.py       | 127 ++++++++++++++++++-
 .../java/org/apache/impala/TestUpdateUdf.java   |   4 +
 22 files changed, 440 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/be/src/codegen/codegen-callgraph.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/codegen-callgraph.cc b/be/src/codegen/codegen-callgraph.cc
index 6cc2b60..77a9328 100644
--- a/be/src/codegen/codegen-callgraph.cc
+++ b/be/src/codegen/codegen-callgraph.cc
@@ -31,8 +31,9 @@ namespace impala {
 
 bool CodegenCallGraph::IsDefinedInImpalad(const string& fn_name) {
   void* fn_ptr = nullptr;
+  // Looking up fn in process so mtime is set to -1 (no versioning issue).
   Status status =
-      LibCache::instance()->GetSoFunctionPtr("", fn_name, &fn_ptr, nullptr, true);
+      LibCache::instance()->GetSoFunctionPtr("", fn_name, -1, &fn_ptr, nullptr, true);
   return status.ok();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/be/src/codegen/llvm-codegen-test.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen-test.cc b/be/src/codegen/llvm-codegen-test.cc
index dbb190a..9f668e0 100644
--- a/be/src/codegen/llvm-codegen-test.cc
+++ b/be/src/codegen/llvm-codegen-test.cc
@@ -104,7 +104,7 @@ class LlvmCodeGenTest : public testing:: Test {
   }
 
   static Status LinkModuleFromHdfs(LlvmCodeGen* codegen, const string& hdfs_file) {
-    return codegen->LinkModuleFromHdfs(hdfs_file);
+    return codegen->LinkModuleFromHdfs(hdfs_file, -1);
   }
 
   static bool ContainsHandcraftedFn(LlvmCodeGen* codegen, llvm::Function* function) {

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/be/src/codegen/llvm-codegen.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index 3796d76..b3a1c50 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -335,11 +335,11 @@ Status LlvmCodeGen::LinkModuleFromLocalFs(const string& file) {
   return Status::OK();
 }
 
-Status LlvmCodeGen::LinkModuleFromHdfs(const string& hdfs_location) {
+Status LlvmCodeGen::LinkModuleFromHdfs(const string& hdfs_location, const time_t mtime) {
   if (linked_modules_.find(hdfs_location) != linked_modules_.end()) return Status::OK();
   string local_path;
-  RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(hdfs_location, LibCache::TYPE_IR,
-      &local_path));
+  RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
+      hdfs_location, LibCache::TYPE_IR, mtime, &local_path));
   RETURN_IF_ERROR(LinkModuleFromLocalFs(local_path));
   linked_modules_.insert(hdfs_location);
   return Status::OK();
@@ -803,7 +803,7 @@ Status LlvmCodeGen::LoadFunction(const TFunction& fn, const std::string& symbol,
     // in a .so or a builtin using the UDF interface.
     void* fn_ptr;
     Status status = LibCache::instance()->GetSoFunctionPtr(
-        fn.hdfs_location, symbol, &fn_ptr, cache_entry);
+        fn.hdfs_location, symbol, fn.last_modified_time, &fn_ptr, cache_entry);
     if (!status.ok() && fn.binary_type == TFunctionBinaryType::BUILTIN) {
       // Builtins symbols should exist unless there is a version mismatch.
       status.AddDetail(
@@ -874,7 +874,7 @@ Status LlvmCodeGen::LoadFunction(const TFunction& fn, const std::string& symbol,
 
     // Link the UDF module into this query's main module so the UDF's functions are
     // available in the main module.
-    RETURN_IF_ERROR(LinkModuleFromHdfs(fn.hdfs_location));
+    RETURN_IF_ERROR(LinkModuleFromHdfs(fn.hdfs_location, fn.last_modified_time));
 
     *llvm_fn = GetFunction(symbol, true);
     if (*llvm_fn == NULL) {

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/be/src/codegen/llvm-codegen.h
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h
index 268ab6d..783269b 100644
--- a/be/src/codegen/llvm-codegen.h
+++ b/be/src/codegen/llvm-codegen.h
@@ -636,8 +636,9 @@ class LlvmCodeGen {
   Status LinkModuleFromLocalFs(const std::string& file);
 
   /// Same as 'LinkModuleFromLocalFs', but takes an hdfs file location instead and makes
-  /// sure that the same hdfs file is not linked twice.
-  Status LinkModuleFromHdfs(const std::string& hdfs_file);
+  /// sure that the same hdfs file is not linked twice. The mtime is used ensure that the
+  /// cached hdfs_file that's used is the most recent.
+  Status LinkModuleFromHdfs(const std::string& hdfs_file, const time_t mtime);
 
   /// Strip global constructors and destructors from an LLVM module. We never run them
   /// anyway (they must be explicitly invoked) so it is dead code.

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/be/src/exec/external-data-source-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/external-data-source-executor.cc b/be/src/exec/external-data-source-executor.cc
index 7c54f39..e8217b7 100644
--- a/be/src/exec/external-data-source-executor.cc
+++ b/be/src/exec/external-data-source-executor.cc
@@ -137,8 +137,10 @@ Status ExternalDataSourceExecutor::Init(const string& jar_path,
     const string& class_name, const string& api_version, const string& init_string) {
   DCHECK(!is_initialized_);
   string local_jar_path;
+  // TODO(IMPALA-6727): pass the mtime from the coordinator. for now, skip the mtime
+  // check (-1).
   RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
-      jar_path, LibCache::TYPE_JAR, &local_jar_path));
+      jar_path, LibCache::TYPE_JAR, -1, &local_jar_path));
 
   JNIEnv* jni_env = getJNIEnv();
 

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/be/src/exprs/agg-fn.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn.cc b/be/src/exprs/agg-fn.cc
index dee3944..a9d461f 100644
--- a/be/src/exprs/agg-fn.cc
+++ b/be/src/exprs/agg-fn.cc
@@ -72,6 +72,7 @@ Status AggFn::Init(const RowDescriptor& row_desc, RuntimeState* state) {
       ColumnType::FromThrift(aggregate_fn.intermediate_type).type);
   DCHECK_EQ(output_slot_desc_.type().type, ColumnType::FromThrift(fn_.ret_type).type);
 
+  time_t mtime = fn_.last_modified_time;
   // Load the function pointers. Must have init() and update().
   if (aggregate_fn.init_fn_symbol.empty() ||
       aggregate_fn.update_fn_symbol.empty() ||
@@ -83,32 +84,32 @@ Status AggFn::Init(const RowDescriptor& row_desc, RuntimeState* state) {
     return Status(ss.str());
   }
 
+  RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(
+      fn_.hdfs_location, aggregate_fn.init_fn_symbol, mtime, &init_fn_, &cache_entry_));
   RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
-      aggregate_fn.init_fn_symbol, &init_fn_, &cache_entry_));
-  RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
-      aggregate_fn.update_fn_symbol, &update_fn_, &cache_entry_));
+      aggregate_fn.update_fn_symbol, mtime, &update_fn_, &cache_entry_));
 
   // Merge() is not defined for purely analytic function.
   if (!aggregate_fn.is_analytic_only_fn) {
     RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
-        aggregate_fn.merge_fn_symbol, &merge_fn_, &cache_entry_));
+        aggregate_fn.merge_fn_symbol, mtime, &merge_fn_, &cache_entry_));
   }
   // Serialize(), GetValue(), Remove() and Finalize() are optional
   if (!aggregate_fn.serialize_fn_symbol.empty()) {
     RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
-        aggregate_fn.serialize_fn_symbol, &serialize_fn_, &cache_entry_));
+        aggregate_fn.serialize_fn_symbol, mtime, &serialize_fn_, &cache_entry_));
   }
   if (!aggregate_fn.get_value_fn_symbol.empty()) {
     RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
-        aggregate_fn.get_value_fn_symbol, &get_value_fn_, &cache_entry_));
+        aggregate_fn.get_value_fn_symbol, mtime, &get_value_fn_, &cache_entry_));
   }
   if (!aggregate_fn.remove_fn_symbol.empty()) {
     RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
-        aggregate_fn.remove_fn_symbol, &remove_fn_, &cache_entry_));
+        aggregate_fn.remove_fn_symbol, mtime, &remove_fn_, &cache_entry_));
   }
   if (!aggregate_fn.finalize_fn_symbol.empty()) {
     RETURN_IF_ERROR(LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
-        fn_.aggregate_fn.finalize_fn_symbol, &finalize_fn_, &cache_entry_));
+        fn_.aggregate_fn.finalize_fn_symbol, mtime, &finalize_fn_, &cache_entry_));
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/be/src/exprs/hive-udf-call.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc
index 19e2e63..be3965a 100644
--- a/be/src/exprs/hive-udf-call.cc
+++ b/be/src/exprs/hive-udf-call.cc
@@ -176,7 +176,7 @@ Status HiveUdfCall::Init(const RowDescriptor& row_desc, RuntimeState* state) {
 
   // Copy the Hive Jar from hdfs to local file system.
   RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
-      fn_.hdfs_location, LibCache::TYPE_JAR, &local_location_));
+      fn_.hdfs_location, LibCache::TYPE_JAR, fn_.last_modified_time, &local_location_));
 
   // Initialize input_byte_offsets_ and input_buffer_size_
   for (int i = 0; i < GetNumChildren(); ++i) {

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/be/src/exprs/scalar-fn-call.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-fn-call.cc b/be/src/exprs/scalar-fn-call.cc
index acf6208..dd284b6 100644
--- a/be/src/exprs/scalar-fn-call.cc
+++ b/be/src/exprs/scalar-fn-call.cc
@@ -124,8 +124,8 @@ Status ScalarFnCall::Init(const RowDescriptor& desc, RuntimeState* state) {
           fn_.name.function_name, MAX_INTERP_ARGS));
     }
 
-    Status status = LibCache::instance()->GetSoFunctionPtr(
-        fn_.hdfs_location, fn_.scalar_fn.symbol, &scalar_fn_, &cache_entry_);
+    Status status = LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location,
+        fn_.scalar_fn.symbol, fn_.last_modified_time, &scalar_fn_, &cache_entry_);
     if (!status.ok()) {
       if (fn_.binary_type == TFunctionBinaryType::BUILTIN) {
         // Builtins symbols should exist unless there is a version mismatch.
@@ -427,7 +427,7 @@ Status ScalarFnCall::GetFunction(LlvmCodeGen* codegen, const string& symbol, voi
   if (fn_.binary_type == TFunctionBinaryType::NATIVE
       || fn_.binary_type == TFunctionBinaryType::BUILTIN) {
     return LibCache::instance()->GetSoFunctionPtr(
-        fn_.hdfs_location, symbol, fn, &cache_entry_);
+        fn_.hdfs_location, symbol, fn_.last_modified_time, fn, &cache_entry_);
   } else {
     DCHECK_EQ(fn_.binary_type, TFunctionBinaryType::IR);
     DCHECK(codegen != NULL);

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/be/src/runtime/lib-cache.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.cc b/be/src/runtime/lib-cache.cc
index c7ca6cb..83bb4dc 100644
--- a/be/src/runtime/lib-cache.cc
+++ b/be/src/runtime/lib-cache.cc
@@ -132,7 +132,7 @@ LibCacheEntry::~LibCacheEntry() {
 }
 
 Status LibCache::GetSoFunctionPtr(const string& hdfs_lib_file, const string& symbol,
-    void** fn_ptr, LibCacheEntry** ent, bool quiet) {
+    time_t exp_mtime, void** fn_ptr, LibCacheEntry** ent, bool quiet) {
   if (hdfs_lib_file.empty()) {
     // Just loading a function ptr in the current process. No need to take any locks.
     DCHECK(current_process_handle_ != nullptr);
@@ -148,7 +148,7 @@ Status LibCache::GetSoFunctionPtr(const string& hdfs_lib_file, const string& sym
     unique_lock<mutex> l(entry->lock);
     lock.swap(l);
   } else {
-    RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, TYPE_SO, &lock, &entry));
+    RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, TYPE_SO, exp_mtime, &lock, &entry));
   }
   DCHECK(entry != nullptr);
   DCHECK_EQ(entry->type, TYPE_SO);
@@ -181,11 +181,11 @@ void LibCache::DecrementUseCount(LibCacheEntry* entry) {
   if (can_delete) delete entry;
 }
 
-Status LibCache::GetLocalLibPath(const string& hdfs_lib_file, LibType type,
-                                 string* local_path) {
+Status LibCache::GetLocalLibPath(
+    const string& hdfs_lib_file, LibType type, time_t exp_mtime, string* local_path) {
   unique_lock<mutex> lock;
   LibCacheEntry* entry = nullptr;
-  RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, &lock, &entry));
+  RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, exp_mtime, &lock, &entry));
   DCHECK(entry != nullptr);
   DCHECK_EQ(entry->type, type);
   *local_path = entry->local_path;
@@ -193,14 +193,23 @@ Status LibCache::GetLocalLibPath(const string& hdfs_lib_file, LibType type,
 }
 
 Status LibCache::CheckSymbolExists(const string& hdfs_lib_file, LibType type,
-    const string& symbol, bool quiet) {
+    const string& symbol, bool quiet, time_t* mtime) {
   if (type == TYPE_SO) {
     void* dummy_ptr = nullptr;
-    return GetSoFunctionPtr(hdfs_lib_file, symbol, &dummy_ptr, nullptr, quiet);
+    LibCacheEntry* entry = nullptr;
+    RETURN_IF_ERROR(
+        GetSoFunctionPtr(hdfs_lib_file, symbol, -1, &dummy_ptr, &entry, quiet));
+    *mtime = -1;
+    if (entry != nullptr) {
+      *mtime = entry->last_mod_time;
+      // done holding this entry, so decrement its use count.
+      DecrementUseCount(entry);
+    }
+    return Status::OK();
   } else if (type == TYPE_IR) {
     unique_lock<mutex> lock;
     LibCacheEntry* entry = nullptr;
-    RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, &lock, &entry));
+    RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, -1, &lock, &entry));
     DCHECK(entry != nullptr);
     DCHECK_EQ(entry->type, TYPE_IR);
     if (entry->symbols.find(symbol) == entry->symbols.end()) {
@@ -209,12 +218,15 @@ Status LibCache::CheckSymbolExists(const string& hdfs_lib_file, LibType type,
          << " (local path: " << entry->local_path << ")";
       return quiet ? Status::Expected(ss.str()) : Status(ss.str());
     }
+    *mtime = entry->last_mod_time;
     return Status::OK();
   } else if (type == TYPE_JAR) {
     // TODO: figure out how to inspect contents of jars
     unique_lock<mutex> lock;
-    LibCacheEntry* dummy_entry = nullptr;
-    return GetCacheEntry(hdfs_lib_file, type, &lock, &dummy_entry);
+    LibCacheEntry* entry = nullptr;
+    RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, -1, &lock, &entry));
+    *mtime = entry->last_mod_time;
+    return Status::OK();
   } else {
     DCHECK(false);
     return Status("Shouldn't get here.");
@@ -280,14 +292,15 @@ void LibCache::DropCache() {
 }
 
 Status LibCache::GetCacheEntry(const string& hdfs_lib_file, LibType type,
-                               unique_lock<mutex>* entry_lock, LibCacheEntry** entry) {
+    time_t exp_mtime, unique_lock<mutex>* entry_lock, LibCacheEntry** entry) {
   Status status;
   {
     // If an error occurs, local_entry_lock is released before calling RemoveEntry()
     // below because it takes the global lock_ which must be acquired before taking entry
     // locks.
     unique_lock<mutex> local_entry_lock;
-    status = GetCacheEntryInternal(hdfs_lib_file, type, &local_entry_lock, entry);
+    status =
+        GetCacheEntryInternal(hdfs_lib_file, type, exp_mtime, &local_entry_lock, entry);
     if (status.ok()) {
       entry_lock->swap(local_entry_lock);
       return status;
@@ -305,7 +318,7 @@ Status LibCache::GetCacheEntry(const string& hdfs_lib_file, LibType type,
 }
 
 Status LibCache::GetCacheEntryInternal(const string& hdfs_lib_file, LibType type,
-    unique_lock<mutex>* entry_lock, LibCacheEntry** entry) {
+    time_t exp_mtime, unique_lock<mutex>* entry_lock, LibCacheEntry** entry) {
   DCHECK(!hdfs_lib_file.empty());
   *entry = nullptr;
 
@@ -314,7 +327,8 @@ Status LibCache::GetCacheEntryInternal(const string& hdfs_lib_file, LibType type
     unique_lock<mutex> lib_cache_lock(lock_);
     LibMap::iterator it = lib_cache_.find(hdfs_lib_file);
     if (it != lib_cache_.end()) {
-      RETURN_IF_ERROR(RefreshCacheEntry(hdfs_lib_file, type, it, entry_lock, entry));
+      RETURN_IF_ERROR(
+          RefreshCacheEntry(hdfs_lib_file, type, exp_mtime, it, entry_lock, entry));
       if (*entry != nullptr) return Status::OK();
     }
   }
@@ -324,7 +338,7 @@ Status LibCache::GetCacheEntryInternal(const string& hdfs_lib_file, LibType type
   // expensive, so *not* holding the cache lock and *not* making the entry visible to
   // other threads avoids blocking other threads with an expensive operation.
   unique_ptr<LibCacheEntry> new_entry = make_unique<LibCacheEntry>();
-  RETURN_IF_ERROR(LoadCacheEntry(hdfs_lib_file, type, new_entry.get()));
+  RETURN_IF_ERROR(LoadCacheEntry(hdfs_lib_file, exp_mtime, type, new_entry.get()));
 
   // Entry is now loaded. Check that another thread did not already load and add an entry
   // for the same key. If so, refresh it if needed. If the existing entry is valid, then
@@ -333,7 +347,8 @@ Status LibCache::GetCacheEntryInternal(const string& hdfs_lib_file, LibType type
     unique_lock<mutex> lib_cache_lock(lock_);
     LibMap::iterator it = lib_cache_.find(hdfs_lib_file);
     if (it != lib_cache_.end()) {
-      Status status = RefreshCacheEntry(hdfs_lib_file, type, it, entry_lock, entry);
+      Status status =
+          RefreshCacheEntry(hdfs_lib_file, type, exp_mtime, it, entry_lock, entry);
       // The entry lock is held at this point if entry is valid.
       if (!status.ok() || *entry != nullptr) {
         // new_entry will be discarded; while wasted work, it avoids holding
@@ -354,7 +369,8 @@ Status LibCache::GetCacheEntryInternal(const string& hdfs_lib_file, LibType type
 }
 
 Status LibCache::RefreshCacheEntry(const string& hdfs_lib_file, LibType type,
-    const LibMap::iterator& iter, unique_lock<mutex>* entry_lock, LibCacheEntry** entry) {
+    time_t exp_mtime, const LibMap::iterator& iter, unique_lock<mutex>* entry_lock,
+    LibCacheEntry** entry) {
   // Check if an error occurred on another thread while loading the library.
   {
     unique_lock<mutex> local_entry_lock((iter->second)->lock);
@@ -365,14 +381,15 @@ Status LibCache::RefreshCacheEntry(const string& hdfs_lib_file, LibType type,
     }
   }
 
-  // Refresh the cache entry if needed. If refreshed or an error occurred, remove
-  // the entry and set the returned entry to nullptr.
+  // Refresh the cache entry if needed. A refresh is needed if check_needs_refresh is set
+  // (e.g., set by ddl) or if the exp_mtime argument is more recent.
+  // If refreshed or an error occurred, remove the entry and set the returned entry to
+  // nullptr.
   *entry = iter->second;
-  if ((*entry)->check_needs_refresh) {
+  if ((*entry)->check_needs_refresh || (*entry)->last_mod_time < exp_mtime) {
     // Check if file has been modified since loading the cached copy. If so, remove the
     // cached entry and create a new one.
     (*entry)->check_needs_refresh = false;
-    time_t last_mod_time;
     hdfsFS hdfs_conn;
     Status status = HdfsFsCache::instance()->GetConnection(hdfs_lib_file, &hdfs_conn);
     if (!status.ok()) {
@@ -380,8 +397,16 @@ Status LibCache::RefreshCacheEntry(const string& hdfs_lib_file, LibType type,
       *entry = nullptr;
       return status;
     }
-    status = GetLastModificationTime(hdfs_conn, hdfs_lib_file.c_str(), &last_mod_time);
-    if (!status.ok() || (*entry)->last_mod_time < last_mod_time) {
+    time_t fs_last_modified_time;
+    status =
+        GetLastModificationTime(hdfs_conn, hdfs_lib_file.c_str(), &fs_last_modified_time);
+
+    // Check that the expected last_modified_time is the same as what's on the filesystem.
+    if (status.ok() && exp_mtime >= 0 && fs_last_modified_time != exp_mtime) {
+      status = Status(TErrorCode::LIB_VERSION_MISMATCH, hdfs_lib_file,
+          fs_last_modified_time, exp_mtime);
+    }
+    if (!status.ok() || (*entry)->last_mod_time < fs_last_modified_time) {
       RemoveEntryInternal(hdfs_lib_file, iter);
       *entry = nullptr;
     }
@@ -403,8 +428,8 @@ Status LibCache::RefreshCacheEntry(const string& hdfs_lib_file, LibType type,
   return Status::OK();
 }
 
-Status LibCache::LoadCacheEntry(
-    const std::string& hdfs_lib_file, LibType type, LibCacheEntry* entry) {
+Status LibCache::LoadCacheEntry(const std::string& hdfs_lib_file, time_t exp_mtime,
+    LibType type, LibCacheEntry* entry) {
   DCHECK(entry != nullptr);
   entry->type = type;
 
@@ -424,6 +449,12 @@ Status LibCache::LoadCacheEntry(
       GetLastModificationTime(hdfs_conn, hdfs_lib_file.c_str(), &entry->last_mod_time);
   RETURN_IF_ERROR(entry->copy_file_status);
 
+  // Check that the exp_mtime is the same as what's on the filesystem.
+  if (exp_mtime >= 0 && exp_mtime != entry->last_mod_time) {
+    return Status(
+        TErrorCode::LIB_VERSION_MISMATCH, hdfs_lib_file, entry->last_mod_time, exp_mtime);
+  }
+
   entry->copy_file_status =
       CopyHdfsFile(hdfs_conn, hdfs_lib_file, local_conn, entry->local_path);
   RETURN_IF_ERROR(entry->copy_file_status);

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/be/src/runtime/lib-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h
index b1b70b1..820a1a8 100644
--- a/be/src/runtime/lib-cache.h
+++ b/be/src/runtime/lib-cache.h
@@ -72,15 +72,23 @@ class LibCache {
   static Status Init();
 
   /// Gets the local file system path for the library at 'hdfs_lib_file'. If
-  /// this file is not already on the local fs, it copies it and caches the
-  /// result. Returns an error if 'hdfs_lib_file' cannot be copied to the local fs.
-  Status GetLocalLibPath(const std::string& hdfs_lib_file, LibType type,
-                         std::string* local_path);
+  /// this file is not already on the local fs, or if the cached entry's last modified
+  /// is older than expected mtime, 'exp_mtime', it copies it and caches the result.
+  /// An 'exp_mtime' of -1 makes the mtime check a no-op.
+  /// Returns an error if 'hdfs_lib_file' cannot be copied to the local fs or if
+  /// exp_mtime differs from the mtime on the file system.
+  /// If error is due to refresh, then the entry will be removed from the cache.
+  Status GetLocalLibPath(const std::string& hdfs_lib_file, LibType type, time_t exp_mtime,
+      std::string* local_path);
 
   /// Returns status.ok() if the symbol exists in 'hdfs_lib_file', non-ok otherwise.
-  /// If 'quiet' is true, the error status for non-Java unfound symbols will not be logged.
+  /// If status.ok() is true, 'mtime' is set to the cache entry's last modified time.
+  /// If an mtime is not applicable, for example, if lookup is for a builtin, then
+  /// a default mtime of -1 is set.
+  /// If 'quiet' is true, the error status for non-Java unfound symbols will not be
+  /// logged.
   Status CheckSymbolExists(const std::string& hdfs_lib_file, LibType type,
-      const std::string& symbol, bool quiet = false);
+      const std::string& symbol, bool quiet, time_t* mtime);
 
   /// Returns a pointer to the function for the given library and symbol.
   /// If 'hdfs_lib_file' is empty, the symbol is looked up in the impalad process.
@@ -94,8 +102,13 @@ class LibCache {
   /// using fn_ptr and it is no longer valid to use fn_ptr.
   //
   /// If 'quiet' is true, returned error statuses will not be logged.
+  /// If the entry is already cached, if its last modified time is older than
+  /// expected mtime, 'exp_mtime', the entry is refreshed.
+  /// An 'exp_mtime' of -1 makes the mtime check a no-op.
+  /// An error is returned if exp_mtime differs from the mtime on the file system.
+  /// If error is due to refresh, then the entry will be removed from the cache.
   Status GetSoFunctionPtr(const std::string& hdfs_lib_file, const std::string& symbol,
-      void** fn_ptr, LibCacheEntry** entry, bool quiet = false);
+      time_t exp_mtime, void** fn_ptr, LibCacheEntry** entry, bool quiet = false);
 
   /// Marks the entry for 'hdfs_lib_file' as needing to be refreshed if the file in HDFS is
   /// newer than the local cached copied. The refresh will occur the next time the entry is
@@ -139,32 +152,44 @@ class LibCache {
 
   /// Returns the cache entry for 'hdfs_lib_file'. If this library has not been
   /// copied locally, it will copy it and add a new LibCacheEntry to 'lib_cache_'.
-  /// Result is returned in *entry.
+  /// If the entry is already cached, if its last modified time is older than
+  /// expected mtime, 'exp_mtime', the entry is refreshed. Result is returned in *entry.
+  /// An 'exp_mtime' of -1 makes the mtime check a no-op.
+  /// An error is returned if exp_mtime differs from the mtime on the file system.
   /// No locks should be taken before calling this. On return the entry's lock is
   /// taken and returned in *entry_lock.
   /// If an error is returned, there will be no entry in lib_cache_ and *entry is NULL.
-  Status GetCacheEntry(const std::string& hdfs_lib_file, LibType type,
+  Status GetCacheEntry(const std::string& hdfs_lib_file, LibType type, time_t exp_mtime,
       boost::unique_lock<boost::mutex>* entry_lock, LibCacheEntry** entry);
 
   /// Implementation to get the cache entry for 'hdfs_lib_file'. Errors are returned
   /// without evicting the cache entry if the status is not OK and *entry is not NULL.
   Status GetCacheEntryInternal(const std::string& hdfs_lib_file, LibType type,
-      boost::unique_lock<boost::mutex>* entry_lock, LibCacheEntry** entry);
+      time_t exp_mtime, boost::unique_lock<boost::mutex>* entry_lock,
+      LibCacheEntry** entry);
 
   /// Returns iter's cache entry in 'entry' with 'entry_lock' held if entry does not
   /// need to be refreshed.
   /// If entry needs to be refreshed, then it is removed and '*entry' is set to nullptr.
+  /// The entry is refreshed if needs_refresh is set and its mtime is
+  /// older than the file on the fs OR its mtime is older than the
+  /// 'exp_mtime' argument.
+  /// An 'exp_mtime' of -1 makes the mtime check a no-op.
+  /// An error is returned if exp_mtime differs from the mtime on the file system.
+  /// If an error occurs when refreshing the entry, the entry is removed.
   /// The cache lock must be held prior to calling this method. On return the entry's
   /// lock is taken and returned in '*entry_lock' if entry does not need to be refreshed.
   /// TODO: cleanup this method's interface and how the outputs are used.
   Status RefreshCacheEntry(const std::string& hdfs_lib_file, LibType type,
-      const LibMap::iterator& iter, boost::unique_lock<boost::mutex>* entry_lock,
-      LibCacheEntry** entry);
+      time_t exp_mtime, const LibMap::iterator& iter,
+      boost::unique_lock<boost::mutex>* entry_lock, LibCacheEntry** entry);
 
   /// 'hdfs_lib_file' is copied locally and 'entry' is initialized with its contents.
+  /// An error is returned if exp_mtime differs from the mtime on the file system.
+  /// An 'exp_mtime' of -1 makes the mtime check a no-op.
   /// No locks are assumed held; 'entry' should be visible only to a single thread.
-  Status LoadCacheEntry(
-      const std::string& hdfs_lib_file, LibType type, LibCacheEntry* entry);
+  Status LoadCacheEntry(const std::string& hdfs_lib_file, time_t exp_mtime, LibType type,
+      LibCacheEntry* entry);
 
   /// Utility function for generating a filename unique to this process and
   /// 'hdfs_path'. This is to prevent multiple impalad processes or different library files

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 9d59883..abb4acf 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -295,11 +295,14 @@ static void ResolveSymbolLookup(const TSymbolLookupParams params,
 
   // Builtin functions are loaded directly from the running process
   if (params.fn_binary_type != TFunctionBinaryType::BUILTIN) {
-    // Refresh the library if necessary since we're creating a new function
-    LibCache::instance()->SetNeedsRefresh(params.location);
+    // Use the latest version of the file from the file system if specified.
+    if (params.needs_refresh) {
+      // Refresh the library if necessary.
+      LibCache::instance()->SetNeedsRefresh(params.location);
+    }
     string dummy_local_path;
     Status status = LibCache::instance()->GetLocalLibPath(
-        params.location, type, &dummy_local_path);
+        params.location, type, -1, &dummy_local_path);
     if (!status.ok()) {
       result->__set_result_code(TSymbolLookupResultCode::BINARY_NOT_FOUND);
       result->__set_error_msg(status.GetDetail());
@@ -310,11 +313,13 @@ static void ResolveSymbolLookup(const TSymbolLookupParams params,
   // Check if the FE-specified symbol exists as-is.
   // Set 'quiet' to true so we don't flood the log with unfound builtin symbols on
   // startup.
-  Status status =
-      LibCache::instance()->CheckSymbolExists(params.location, type, params.symbol, true);
+  time_t mtime = -1;
+  Status status = LibCache::instance()->CheckSymbolExists(
+      params.location, type, params.symbol, true, &mtime);
   if (status.ok()) {
     result->__set_result_code(TSymbolLookupResultCode::SYMBOL_FOUND);
     result->__set_symbol(params.symbol);
+    result->__set_last_modified_time(mtime);
     return;
   }
 
@@ -348,7 +353,8 @@ static void ResolveSymbolLookup(const TSymbolLookupParams params,
   }
 
   // Look up the mangled symbol
-  status = LibCache::instance()->CheckSymbolExists(params.location, type, symbol);
+  status = LibCache::instance()->CheckSymbolExists(
+      params.location, type, symbol, false, &mtime);
   if (!status.ok()) {
     result->__set_result_code(TSymbolLookupResultCode::SYMBOL_NOT_FOUND);
     stringstream ss;
@@ -379,6 +385,7 @@ static void ResolveSymbolLookup(const TSymbolLookupParams params,
   // We were able to resolve the symbol.
   result->__set_result_code(TSymbolLookupResultCode::SYMBOL_FOUND);
   result->__set_symbol(symbol);
+  result->__set_last_modified_time(mtime);
 }
 
 extern "C"
@@ -391,8 +398,9 @@ Java_org_apache_impala_service_FeSupport_NativeCacheJar(
 
   TCacheJarResult result;
   string local_path;
-  Status status = LibCache::instance()->GetLocalLibPath(params.hdfs_location,
-      LibCache::TYPE_JAR, &local_path);
+  // TODO(IMPALA-6727): used for external data sources; add proper mtime.
+  Status status = LibCache::instance()->GetLocalLibPath(
+      params.hdfs_location, LibCache::TYPE_JAR, -1, &local_path);
   status.ToThrift(&result.status);
   if (status.ok()) result.__set_local_path(local_path);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index b81c1a1..3bb0700 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -634,7 +634,13 @@ struct TSymbolLookupParams {
   6: optional Types.TColumnType ret_arg_type
 
   // Determines the signature of the mangled symbol
-  7: required TSymbolType symbol_type;
+  7: required TSymbolType symbol_type
+
+  // Does the lookup require the backend lib-cache entry be refreshed?
+  // If so, the file system is checked for a newer version of the file
+  // referenced by 'location'. If not, the entry in the lib-cache is used
+  // if present, otherwise the file is read from file-system.
+  8: required bool needs_refresh
 }
 
 enum TSymbolLookupResultCode {
@@ -652,6 +658,9 @@ struct TSymbolLookupResult {
 
   // The error message if the symbol found not be found.
   3: optional string error_msg
+
+  // Last modified time in backend lib-cache entry for the file referenced by 'location'.
+  4: optional i64 last_modified_time
 }
 
 // Sent from the impalad BE to FE with the results of each CatalogUpdate heartbeat.

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/common/thrift/Types.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index 280c744..3edf42b 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -227,4 +227,11 @@ struct TFunction {
 
   // True for builtins or user-defined functions persisted by the catalog
   11: optional bool is_persistent
+
+  // Last modified time of the 'hdfs_location'. Set by the coordinator to record
+  // the mtime its aware of for the lib. Executors expect that the lib they use
+  // has the same mtime as the coordinator's. An mtime of -1 makes the mtime check
+  // a no-op.
+  // Not set when stored in the catalog.
+  12: optional i64 last_modified_time
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index e878498..fc47b2a 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -355,6 +355,10 @@ error_codes = (
 
   ("ROW_BATCH_TOO_LARGE", 116,
    "Row batch cannot be serialized: size of $0 bytes exceeds supported limit of $1"),
+
+  ("LIB_VERSION_MISMATCH", 117,
+   "The library $0 last modified time $1 does not match the expected last "
+   "modified time $2. Run 'refresh functions <db name>'."),
 )
 
 import sys

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/fe/src/main/java/org/apache/impala/analysis/Expr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index a235662..e1fb5a8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -37,6 +37,7 @@ import org.apache.impala.common.TreeNode;
 import org.apache.impala.rewrite.ExprRewriter;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.TExprNode;
+import org.apache.impala.thrift.TFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -610,7 +611,9 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
     msg.is_constant = isConstant_;
     msg.num_children = children_.size();
     if (fn_ != null) {
-      msg.setFn(fn_.toThrift());
+      TFunction thriftFn = fn_.toThrift();
+      thriftFn.setLast_modified_time(fn_.getLastModifiedTime());
+      msg.setFn(thriftFn);
       if (fn_.hasVarArgs()) msg.setVararg_start_idx(fn_.getNumArgs() - 1);
     }
     toThrift(msg);

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/fe/src/main/java/org/apache/impala/catalog/AggregateFunction.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/AggregateFunction.java b/fe/src/main/java/org/apache/impala/catalog/AggregateFunction.java
index d944fb4..bfa741d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/AggregateFunction.java
+++ b/fe/src/main/java/org/apache/impala/catalog/AggregateFunction.java
@@ -25,6 +25,9 @@ import org.apache.impala.analysis.HdfsUri;
 import org.apache.impala.thrift.TAggregateFunction;
 import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TFunctionBinaryType;
+import org.apache.impala.thrift.TSymbolLookupParams;
+import org.apache.impala.thrift.TSymbolType;
+
 import com.google.common.base.Preconditions;
 
 /**
@@ -195,6 +198,12 @@ public class AggregateFunction extends Function {
   public void setIntermediateType(Type t) { intermediateType_ = t; }
 
   @Override
+  protected TSymbolLookupParams getLookupParams() {
+    return buildLookupParams(getUpdateFnSymbol(), TSymbolType.UDF_EVALUATE,
+        intermediateType_, hasVarArgs(), false, getArgs());
+  }
+
+  @Override
   public String toSql(boolean ifNotExists) {
     StringBuilder sb = new StringBuilder("CREATE AGGREGATE FUNCTION ");
     if (ifNotExists) sb.append("IF NOT EXISTS ");

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/fe/src/main/java/org/apache/impala/catalog/Function.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Function.java b/fe/src/main/java/org/apache/impala/catalog/Function.java
index 03cd867..59ee5ea 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Function.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Function.java
@@ -19,6 +19,7 @@ package org.apache.impala.catalog;
 
 import java.util.List;
 
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.impala.analysis.FunctionName;
 import org.apache.impala.analysis.HdfsUri;
 import org.apache.impala.common.AnalysisException;
@@ -35,6 +36,7 @@ import org.apache.impala.thrift.TScalarFunction;
 import org.apache.impala.thrift.TSymbolLookupParams;
 import org.apache.impala.thrift.TSymbolLookupResult;
 import org.apache.impala.thrift.TSymbolType;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -368,6 +370,49 @@ public class Function extends CatalogObjectImpl {
     return function;
   }
 
+  protected final TSymbolLookupParams buildLookupParams(String symbol,
+      TSymbolType symbolType, Type retArgType, boolean hasVarArgs, boolean needsRefresh,
+      Type... argTypes) {
+    TSymbolLookupParams lookup = new TSymbolLookupParams();
+    // Builtin functions do not have an external library, they are loaded directly from
+    // the running process
+    lookup.location =
+        binaryType_ != TFunctionBinaryType.BUILTIN ? location_.toString() : "";
+    lookup.symbol = symbol;
+    lookup.symbol_type = symbolType;
+    lookup.fn_binary_type = binaryType_;
+    lookup.arg_types = Type.toThrift(argTypes);
+    lookup.has_var_args = hasVarArgs;
+    lookup.needs_refresh = needsRefresh;
+    if (retArgType != null) lookup.setRet_arg_type(retArgType.toThrift());
+    return lookup;
+  }
+
+  protected TSymbolLookupParams getLookupParams() {
+    throw new NotImplementedException(
+        "getLookupParams not implemented for " + getClass().getSimpleName());
+  }
+
+  // Looks up the last time the function's source file was updated as recorded in its
+  // backend lib-cache entry. Returns -1 if a modified time is not applicable.
+  // If an error occurs and the mtime cannot be retrieved, an IllegalStateException is
+  // thrown.
+  public final long getLastModifiedTime() {
+    if (getBinaryType() != TFunctionBinaryType.BUILTIN && getLocation() != null) {
+      Preconditions.checkState(!getLocation().toString().isEmpty());
+      TSymbolLookupParams lookup = Preconditions.checkNotNull(getLookupParams());
+      try {
+        TSymbolLookupResult result = FeSupport.LookupSymbol(lookup);
+        return result.last_modified_time;
+      } catch (Exception e) {
+        throw new IllegalStateException(
+            "Unable to get last modified time for lib file: " + getLocation().toString(),
+            e);
+      }
+    }
+    return -1;
+  }
+
   // Returns the resolved symbol in the binary. The BE will do a lookup of 'symbol'
   // in the binary and try to resolve unmangled names.
   // If this function is expecting a return argument, retArgType is that type. It should
@@ -383,17 +428,8 @@ public class Function extends CatalogObjectImpl {
       throw new AnalysisException("Could not find symbol ''");
     }
 
-    TSymbolLookupParams lookup = new TSymbolLookupParams();
-    // Builtin functions do not have an external library, they are loaded directly from
-    // the running process
-    lookup.location =  binaryType_ != TFunctionBinaryType.BUILTIN ?
-        location_.toString() : "";
-    lookup.symbol = symbol;
-    lookup.symbol_type = symbolType;
-    lookup.fn_binary_type = binaryType_;
-    lookup.arg_types = Type.toThrift(argTypes);
-    lookup.has_var_args = hasVarArgs;
-    if (retArgType != null) lookup.setRet_arg_type(retArgType.toThrift());
+    TSymbolLookupParams lookup =
+        buildLookupParams(symbol, symbolType, retArgType, hasVarArgs, true, argTypes);
 
     try {
       TSymbolLookupResult result = FeSupport.LookupSymbol(lookup);

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/fe/src/main/java/org/apache/impala/catalog/ScalarFunction.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ScalarFunction.java b/fe/src/main/java/org/apache/impala/catalog/ScalarFunction.java
index dd88a71..8872e1e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ScalarFunction.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ScalarFunction.java
@@ -32,7 +32,9 @@ import org.apache.impala.hive.executor.UdfExecutor.JavaUdfDataType;
 import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TFunctionBinaryType;
 import org.apache.impala.thrift.TScalarFunction;
+import org.apache.impala.thrift.TSymbolLookupParams;
 import org.apache.impala.thrift.TSymbolType;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
@@ -257,6 +259,12 @@ public class ScalarFunction extends Function {
   public String getSymbolName() { return symbolName_; }
 
   @Override
+  protected TSymbolLookupParams getLookupParams() {
+    return buildLookupParams(
+        getSymbolName(), TSymbolType.UDF_EVALUATE, null, hasVarArgs(), false, getArgs());
+  }
+
+  @Override
   public String toSql(boolean ifNotExists) {
     StringBuilder sb = new StringBuilder("CREATE FUNCTION ");
     if (ifNotExists) sb.append("IF NOT EXISTS ");

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
index a20d643..4bb0e05 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeExprsTest.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.impala.analysis.TimestampArithmeticExpr.TimeUnit;
+import org.apache.impala.catalog.AggregateFunction;
 import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.Column;
@@ -41,6 +42,7 @@ import org.apache.impala.catalog.TestSchemaUtils;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TExpr;
+import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TFunctionBinaryType;
 import org.apache.impala.thrift.TQueryOptions;
 import org.junit.Assert;
@@ -2155,6 +2157,81 @@ public class AnalyzeExprsTest extends AnalyzerTest {
     catalog_.removeFunction(udf);
   }
 
+  // Serializes to thrift and checks the mtime setting.
+  private void checkSerializedMTime(Expr expr, boolean expectMTime) {
+    Preconditions.checkNotNull(expr.getFn());
+    TExpr thriftExpr = expr.treeToThrift();
+    Preconditions.checkState(thriftExpr.getNodesSize() > 0);
+    Preconditions.checkState(thriftExpr.getNodes().get(0).isSetFn());
+    TFunction thriftFn = thriftExpr.getNodes().get(0).getFn();
+    if (expectMTime) {
+      assertTrue(thriftFn.getLast_modified_time() >= 0);
+    } else {
+      assertEquals(thriftFn.getLast_modified_time(), -1);
+    }
+  }
+
+  // Checks that 'expr', when serialized to thrift, has mtime
+  // set as expected. 'expr' must be a single function call.
+  private void testMTime(String expr, boolean expectMTime) {
+    SelectStmt stmt =
+        (SelectStmt) AnalyzesOk("select " + expr + " from functional.alltypes");
+    Preconditions.checkState(stmt.getSelectList().getItems().size() == 1);
+    TupleDescriptor tblRefDesc = stmt.fromClause_.get(0).getDesc();
+    tblRefDesc.materializeSlots();
+    tblRefDesc.computeMemLayout();
+    if (stmt.hasAggInfo()) {
+      TupleDescriptor intDesc = stmt.getAggInfo().intermediateTupleDesc_;
+      intDesc.materializeSlots();
+      intDesc.computeMemLayout();
+      checkSerializedMTime(stmt.getAggInfo().getAggregateExprs().get(0), expectMTime);
+      checkSerializedMTime(
+          stmt.getAggInfo().getMergeAggInfo().getAggregateExprs().get(0), expectMTime);
+    } else {
+      checkSerializedMTime(stmt.getSelectList().getItems().get(0).getExpr(), expectMTime);
+    }
+  }
+
+  @Test
+  public void TestFunctionMTime() {
+    // Expect unset mtime for builtin.
+    testMTime("sleep(1)", false);
+    testMTime("concat('a', 'b')", false);
+    testMTime("3 is null", false);
+    testMTime("1 < 3", false);
+    testMTime("1 + 1", false);
+    testMTime("avg(int_col)", false);
+
+    final String nativeSymbol =
+        "'_Z8IdentityPN10impala_udf15FunctionContextERKNS_10BooleanValE'";
+    final String nativeUdfPath = "/test-warehouse/libTestUdfs.so";
+    final String javaSymbol = "SYMBOL='org.apache.impala.TestUdf";
+    final String javaPath = "/test-warehouse/impala-hive-udfs.jar";
+    final String nativeUdaPath = "hdfs://localhost:20500/test-warehouse/libTestUdas.so";
+
+    // Spec for a bogus builtin that specifies a bogus path.
+    catalog_.addFunction(ScalarFunction.createForTesting("default", "udf_builtin_bug",
+        new ArrayList<Type>(), Type.INT, "/dummy", "dummy.class", null, null,
+        TFunctionBinaryType.BUILTIN));
+    // Valid specs for native and java udf/uda's.
+    catalog_.addFunction(ScalarFunction.createForTesting("default", "udf_jar",
+        Lists.<Type>newArrayList(Type.INT), Type.INT, javaPath, javaSymbol, null, null,
+        TFunctionBinaryType.JAVA));
+    catalog_.addFunction(ScalarFunction.createForTesting("default", "udf_native",
+        new ArrayList<Type>(), Type.INT, nativeUdfPath, nativeSymbol, null, null,
+        TFunctionBinaryType.NATIVE));
+    catalog_.addFunction(AggregateFunction.createForTesting(
+        new FunctionName("default", "uda"), Lists.<Type>newArrayList(Type.INT), Type.INT,
+        Type.INT, new HdfsUri(nativeUdaPath), "init_fn_symbol", "update_fn_symbol", null,
+        null, null, null, null, TFunctionBinaryType.NATIVE));
+
+    // Expect these to have mtime set.
+    testMTime("udf_builtin_bug()", false);
+    testMTime("udf_jar(3)", true);
+    testMTime("udf_native()", true);
+    testMTime("uda(int_col)", true);
+  }
+
   @Test
   public void TestExprChildLimit() {
     // Test IN predicate.

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/testdata/bin/copy-udfs-udas.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/copy-udfs-udas.sh b/testdata/bin/copy-udfs-udas.sh
index 3e1587d..adf64a0 100755
--- a/testdata/bin/copy-udfs-udas.sh
+++ b/testdata/bin/copy-udfs-udas.sh
@@ -57,9 +57,15 @@ then
   # back
   find . -type f -name 'TestUpdateUdf.java' -execdir \
        bash -c "sed -i s/'Text(\"Old UDF\")'/'Text(\"New UDF\")'/g '{}'" \;
+  # Create a new Java function by copying and renaming an existing Java file for testing
+  # purposes. Then remove it.
+  find . -type f -name 'ReplaceStringUdf.java' -execdir \
+       bash -c "sed s/'ReplaceStringUdf'/'NewReplaceStringUdf'/g '{}'" \; \
+       > src/main/java/org/apache/impala/NewReplaceStringUdf.java
   "${IMPALA_HOME}/bin/mvn-quiet.sh" package
   find . -type f -name 'TestUpdateUdf.java' -execdir \
        bash -c "sed -i s/'Text(\"New UDF\")'/'Text(\"Old UDF\")'/g '{}'" \;
+  rm src/main/java/org/apache/impala/NewReplaceStringUdf.java
   popd
 fi
 

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/tests/custom_cluster/test_coordinators.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py
index eb5a125..4ec3317 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -18,7 +18,10 @@
 # The base class that should be used for almost all Impala tests
 
 import pytest
-
+import os
+import time
+from subprocess import check_call
+from tests.util.filesystem_utils import get_fs_path
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 
 class TestCoordinators(CustomClusterTestSuite):
@@ -120,3 +123,125 @@ class TestCoordinators(CustomClusterTestSuite):
     self._start_impala_cluster([], cluster_size=3, num_coordinators=1,
         use_exclusive_coordinators=True)
     exec_and_verify_num_executors(2)
+
+  @pytest.mark.execute_serially
+  def test_executor_only_lib_cache(self):
+    """IMPALA-6670: checks that the lib-cache gets refreshed on executor-only nodes"""
+
+    self._start_impala_cluster([], cluster_size=3, num_coordinators=1,
+                               use_exclusive_coordinators=True)
+
+    db_name = 'TEST_EXEC_ONLY_CACHE'
+
+    # jar src/tgt paths
+    old_src_path = os.path.join(
+        os.environ['IMPALA_HOME'], 'testdata/udfs/impala-hive-udfs.jar')
+    new_src_path = os.path.join(
+        os.environ['IMPALA_HOME'], 'tests/test-hive-udfs/target/test-hive-udfs-1.0.jar')
+    tgt_dir = get_fs_path('/test-warehouse/{0}.db'.format(db_name))
+    tgt_path = tgt_dir + "/tmp.jar"
+
+    try:
+      # copy jar with TestUpdateUdf (old) to tmp.jar
+      check_call(["hadoop", "fs", "-mkdir", "-p", tgt_dir])
+      check_call(["hadoop", "fs", "-put", "-f", old_src_path, tgt_path])
+
+      coordinator = self.cluster.impalads[0]
+      client = coordinator.service.create_beeswax_client()
+
+      # create the database
+      self.execute_query_expect_success(client,
+          "create database if not exists %s" % db_name)
+
+      # create a function for TestUpdateUdf (old)
+      create_old_fn = (
+          "create function `{0}`.`old_fn`(string) returns string LOCATION '{1}' "
+          "SYMBOL='org.apache.impala.TestUpdateUdf'".format(db_name, tgt_path))
+      self.execute_query_expect_success(client, create_old_fn);
+
+      # run the query for TestUpdateUdf (old) and expect it to work
+      old_query = (
+          "select count(*) from functional.alltypes where "
+          "`{0}`.old_fn(string_col) = 'Old UDF'".format(db_name));
+      result = self.execute_query_expect_success(client, old_query)
+      assert result.data == ['7300']
+
+      # copy a new jar with TestUpdateUdf (new) and NewReplaceStringUdf to tmp.jar.
+      check_call(["hadoop", "fs", "-put", "-f", new_src_path, tgt_path])
+
+      # create a function for the updated TestUpdateUdf.
+      create_new_fn = (
+          "create function `{0}`.`new_fn`(string) returns string LOCATION '{1}' "
+          "SYMBOL='org.apache.impala.TestUpdateUdf'".format(db_name, tgt_path))
+      self.execute_query_expect_success(client, create_new_fn);
+
+      # run the query for TestUdf (new) and expect the updated version to work.
+      # the udf argument prevents constant expression optimizations, which can mask
+      # incorrect lib-cache state/handling.
+      # (bug behavior was to get the old version, so number of results would be = 0)
+      # Note: if old_fn is run in the same way now, it will pick up the new
+      #       implementation. that is current system behavior, so expected.
+      new_query = (
+          "select count(*) from functional.alltypes where "
+          "`{0}`.new_fn(string_col) = 'New UDF'".format(db_name));
+      result = self.execute_query_expect_success(client, new_query)
+      assert result.data == ['7300']
+
+      # create a function for NewReplaceStringUdf which does not exist in the previous
+      # version of the jar.
+      create_add_fn = (
+          "create function `{0}`.`add_fn`(string) returns string LOCATION '{1}' "
+          "SYMBOL='org.apache.impala.NewReplaceStringUdf'".format(db_name, tgt_path))
+      self.execute_query_expect_success(client, create_add_fn);
+
+      # run the query for ReplaceString and expect the query to run.
+      # (bug behavior is to not find the class)
+      add_query = (
+          "select count(*) from functional.alltypes where "
+          "`{0}`.add_fn(string_col) = 'not here'".format(db_name));
+      result = self.execute_query_expect_success(client, add_query)
+      assert result.data == ['0']
+
+      # Copy jar to a new path.
+      tgt_path_2 = tgt_dir + "/tmp2.jar"
+      check_call(["hadoop", "fs", "-put", "-f", old_src_path, tgt_path_2])
+
+      # Add the function.
+      create_mismatch_fn = (
+          "create function `{0}`.`mismatch_fn`(string) returns string LOCATION '{1}' "
+          "SYMBOL='org.apache.impala.TestUpdateUdf'".format(db_name, tgt_path_2))
+      self.execute_query_expect_success(client, create_mismatch_fn);
+
+      # Run a query that'll run on only one executor.
+      small_query = (
+          "select count(*) from functional.tinytable where "
+          "`{0}`.mismatch_fn(a) = 'x'").format(db_name)
+      self.execute_query_expect_success(client, small_query)
+
+      # Overwrite the jar, giving it a new mtime. The sleep prevents the write to
+      # happen too quickly so that its within mtime granularity (1 second).
+      time.sleep(2)
+      check_call(["hadoop", "fs", "-put", "-f", new_src_path, tgt_path_2])
+
+      # Run the query. Expect the query fails due to mismatched libs at the
+      # coordinator and one of the executors.
+      mismatch_query = (
+          "select count(*) from functional.alltypes where "
+          "`{0}`.mismatch_fn(string_col) = 'Old UDF'".format(db_name));
+      result = self.execute_query_expect_failure(client, mismatch_query)
+      assert "does not match the expected last modified time" in str(result)
+
+      # Refresh, as suggested by the error message.
+      # IMPALA-6719: workaround lower-cases db_name.
+      self.execute_query_expect_success(client, "refresh functions " + db_name.lower())
+
+      # The coordinator should have picked up the new lib, so retry the query.
+      self.execute_query_expect_success(client, mismatch_query)
+
+    # cleanup
+    finally:
+      self.execute_query_expect_success(client,
+          "drop database if exists %s cascade" % db_name)
+      if client is not None:
+        client.close()
+      self._stop_impala_cluster()

http://git-wip-us.apache.org/repos/asf/impala/blob/66ca5db7/tests/test-hive-udfs/src/main/java/org/apache/impala/TestUpdateUdf.java
----------------------------------------------------------------------
diff --git a/tests/test-hive-udfs/src/main/java/org/apache/impala/TestUpdateUdf.java b/tests/test-hive-udfs/src/main/java/org/apache/impala/TestUpdateUdf.java
index 0366edf..56daaa8 100644
--- a/tests/test-hive-udfs/src/main/java/org/apache/impala/TestUpdateUdf.java
+++ b/tests/test-hive-udfs/src/main/java/org/apache/impala/TestUpdateUdf.java
@@ -35,4 +35,8 @@ public class TestUpdateUdf extends UDF {
   public Text evaluate() {
     return new Text("Old UDF");
   }
+
+  public Text evaluate(Text arg) {
+    return new Text("Old UDF");
+  }
 }