You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ph...@apache.org on 2018/02/02 18:51:30 UTC

[06/19] impala git commit: IMPALA-6215: Removes race when using LibCache.

IMPALA-6215: Removes race when using LibCache.

LibCache's api to provide access to locally cached files has a race.
Currently, the client of the cache accesses the locally cached path
as a string, but nothing guarantees that the associated file is not
removed before the client is done using it. This race is suspected
as the root cause for the flakiness seen in IMPALA-6092. These tests
fail once in a while with classloader errors unable to load java udf
classes. In these tests, the lib cache makes no guarantee that the path
to the jar will remain valid from the time the path is acquired through
the time needed to fetch the jar and resolve the needed classes.

LibCache offers liveness guarantees for shared objects via reference
counting. The fix in this patch extends this API to also cover paths
to locally cached files.

Testing:
- added a test to test_udfs.py that does many concurrent udf uses and
  removals. By increasing the concurrent operations to 100, the issue
  in IMPALA-6092 is locally reproducible on every run. With this fix,
  the problem is no longer reproducible with this test.

Change-Id: I9175085201fe8b11424ab8f88d7b992cb7b0daea
Reviewed-on: http://gerrit.cloudera.org:8080/9089
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/885776ed
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/885776ed
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/885776ed

Branch: refs/heads/2.x
Commit: 885776ed2360c8b280b53d1806154821a4f32a0d
Parents: 44ba20a
Author: Vuk Ercegovac <ve...@cloudera.com>
Authored: Tue Nov 21 08:41:03 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 2 01:10:15 2018 +0000

----------------------------------------------------------------------
 be/src/codegen/llvm-codegen.cc               |  5 +-
 be/src/exec/external-data-source-executor.cc |  5 +-
 be/src/exprs/hive-udf-call.cc                | 57 +++++++--------
 be/src/exprs/hive-udf-call.h                 |  3 -
 be/src/runtime/lib-cache.cc                  | 22 +++---
 be/src/runtime/lib-cache.h                   | 44 ++++++++++--
 be/src/service/fe-support.cc                 | 11 +--
 tests/query_test/test_udfs.py                | 88 +++++++++++++++++++++--
 8 files changed, 177 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/885776ed/be/src/codegen/llvm-codegen.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index e1a606c..72293a7 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -327,9 +327,10 @@ Status LlvmCodeGen::LinkModuleFromLocalFs(const string& file) {
 
 Status LlvmCodeGen::LinkModuleFromHdfs(const string& hdfs_location) {
   if (linked_modules_.find(hdfs_location) != linked_modules_.end()) return Status::OK();
+  LibCacheEntryHandle handle;
   string local_path;
-  RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(hdfs_location, LibCache::TYPE_IR,
-      &local_path));
+  RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(hdfs_location, LibCache::TYPE_IR,
+      &handle, &local_path));
   RETURN_IF_ERROR(LinkModuleFromLocalFs(local_path));
   linked_modules_.insert(hdfs_location);
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/885776ed/be/src/exec/external-data-source-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/external-data-source-executor.cc b/be/src/exec/external-data-source-executor.cc
index 7c54f39..20fe50e 100644
--- a/be/src/exec/external-data-source-executor.cc
+++ b/be/src/exec/external-data-source-executor.cc
@@ -136,9 +136,10 @@ ExternalDataSourceExecutor::~ExternalDataSourceExecutor() {
 Status ExternalDataSourceExecutor::Init(const string& jar_path,
     const string& class_name, const string& api_version, const string& init_string) {
   DCHECK(!is_initialized_);
+  LibCacheEntryHandle handle;
   string local_jar_path;
-  RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
-      jar_path, LibCache::TYPE_JAR, &local_jar_path));
+  RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(
+      jar_path, LibCache::TYPE_JAR, &handle, &local_jar_path));
 
   JNIEnv* jni_env = getJNIEnv();
 

http://git-wip-us.apache.org/repos/asf/impala/blob/885776ed/be/src/exprs/hive-udf-call.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc
index 19e2e63..e1ac676 100644
--- a/be/src/exprs/hive-udf-call.cc
+++ b/be/src/exprs/hive-udf-call.cc
@@ -174,10 +174,6 @@ Status HiveUdfCall::Init(const RowDescriptor& row_desc, RuntimeState* state) {
   // Initialize children first.
   RETURN_IF_ERROR(ScalarExpr::Init(row_desc, state));
 
-  // Copy the Hive Jar from hdfs to local file system.
-  RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
-      fn_.hdfs_location, LibCache::TYPE_JAR, &local_location_));
-
   // Initialize input_byte_offsets_ and input_buffer_size_
   for (int i = 0; i < GetNumChildren(); ++i) {
     input_byte_offsets_.push_back(input_buffer_size_);
@@ -202,30 +198,35 @@ Status HiveUdfCall::OpenEvaluator(FunctionContext::FunctionStateScope scope,
   JNIEnv* env = getJNIEnv();
   if (env == NULL) return Status("Failed to get/create JVM");
 
-  THiveUdfExecutorCtorParams ctor_params;
-  ctor_params.fn = fn_;
-  ctor_params.local_location = local_location_;
-  ctor_params.input_byte_offsets = input_byte_offsets_;
-
-  jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_];
-  jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()];
-  jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()];
-
-  ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer;
-  ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer;
-  ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer;
-  ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value;
-
-  jbyteArray ctor_params_bytes;
-
-  // Add a scoped cleanup jni reference object. This cleans up local refs made
-  // below.
-  JniLocalFrame jni_frame;
-  RETURN_IF_ERROR(jni_frame.push(env));
-
-  RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
-  // Create the java executor object
-  jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes);
+  {
+    LibCacheEntryHandle handle;
+    string local_location;
+    RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(
+        fn_.hdfs_location, LibCache::TYPE_JAR, &handle, &local_location));
+    THiveUdfExecutorCtorParams ctor_params;
+    ctor_params.fn = fn_;
+    ctor_params.local_location = local_location;
+    ctor_params.input_byte_offsets = input_byte_offsets_;
+
+    jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_];
+    jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()];
+    jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()];
+
+    ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer;
+    ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer;
+    ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer;
+    ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value;
+
+    jbyteArray ctor_params_bytes;
+
+    // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+    JniLocalFrame jni_frame;
+    RETURN_IF_ERROR(jni_frame.push(env));
+
+    RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+    // Create the java executor object
+    jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes);
+  }
   RETURN_ERROR_IF_EXC(env);
   RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor));
 

http://git-wip-us.apache.org/repos/asf/impala/blob/885776ed/be/src/exprs/hive-udf-call.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/hive-udf-call.h b/be/src/exprs/hive-udf-call.h
index 7ce5eb0..8ca0372 100644
--- a/be/src/exprs/hive-udf-call.h
+++ b/be/src/exprs/hive-udf-call.h
@@ -116,9 +116,6 @@ class HiveUdfCall : public ScalarExpr {
   /// error.
   AnyVal* Evaluate(ScalarExprEvaluator* eval, const TupleRow* row) const;
 
-  /// The path on the local FS to the UDF's jar
-  std::string local_location_;
-
   /// input_byte_offsets_[i] is the byte offset child ith's input argument should
   /// be written to.
   std::vector<int> input_byte_offsets_;

http://git-wip-us.apache.org/repos/asf/impala/blob/885776ed/be/src/runtime/lib-cache.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.cc b/be/src/runtime/lib-cache.cc
index d694c49..b4a4f59 100644
--- a/be/src/runtime/lib-cache.cc
+++ b/be/src/runtime/lib-cache.cc
@@ -129,6 +129,10 @@ LibCacheEntry::~LibCacheEntry() {
   unlink(local_path.c_str());
 }
 
+LibCacheEntryHandle::~LibCacheEntryHandle() {
+  if (entry_ != nullptr) LibCache::instance()->DecrementUseCount(entry_);
+}
+
 Status LibCache::GetSoFunctionPtr(const string& hdfs_lib_file, const string& symbol,
     void** fn_ptr, LibCacheEntry** ent, bool quiet) {
   if (hdfs_lib_file.empty()) {
@@ -173,21 +177,23 @@ void LibCache::DecrementUseCount(LibCacheEntry* entry) {
   if (entry == NULL) return;
   bool can_delete = false;
   {
-    unique_lock<mutex> lock(entry->lock);;
+    unique_lock<mutex> lock(entry->lock);
     --entry->use_count;
     can_delete = (entry->use_count == 0 && entry->should_remove);
   }
   if (can_delete) delete entry;
 }
 
-Status LibCache::GetLocalLibPath(const string& hdfs_lib_file, LibType type,
-                                 string* local_path) {
+Status LibCache::GetLocalPath(const std::string& hdfs_lib_file, LibType type,
+    LibCacheEntryHandle* handle, string* path) {
+  DCHECK(handle != nullptr && handle->entry() == nullptr);
+  LibCacheEntry* entry = nullptr;
   unique_lock<mutex> lock;
-  LibCacheEntry* entry = NULL;
   RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, &lock, &entry));
-  DCHECK(entry != NULL);
-  DCHECK_EQ(entry->type, type);
-  *local_path = entry->local_path;
+  DCHECK(entry != nullptr);
+  ++entry->use_count;
+  handle->SetEntry(entry);
+  *path = entry->local_path;
   return Status::OK();
 }
 
@@ -352,7 +358,7 @@ Status LibCache::GetCacheEntryInternal(const string& hdfs_lib_file, LibType type
     entry_lock->swap(local_entry_lock);
 
     RETURN_IF_ERROR((*entry)->copy_file_status);
-    DCHECK_EQ((*entry)->type, type);
+    DCHECK_EQ((*entry)->type, type) << (*entry)->local_path;
     DCHECK(!(*entry)->local_path.empty());
     return Status::OK();
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/885776ed/be/src/runtime/lib-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h
index 4a564ee..7296a00 100644
--- a/be/src/runtime/lib-cache.h
+++ b/be/src/runtime/lib-cache.h
@@ -49,11 +49,16 @@ class RuntimeState;
 /// using the library. When the caller requests a ptr into the library, they
 /// are given the entry handle and must decrement the ref count when they
 /// are done.
+/// Note: Explicitly managing this reference count at the client is error-prone. See the
+/// api for accessing a path, GetLocalPath(), that uses the handle's scope to manage the
+/// reference count.
 //
 /// TODO:
 /// - refresh libraries
-/// - better cached module management.
+/// - better cached module management
+/// - improve the api to be less error-prone (IMPALA-6439)
 struct LibCacheEntry;
+class LibCacheEntryHandle;
 
 class LibCache {
  public:
@@ -71,11 +76,16 @@ class LibCache {
   /// Initializes the libcache. Must be called before any other APIs.
   static Status Init();
 
-  /// Gets the local file system path for the library at 'hdfs_lib_file'. If
-  /// this file is not already on the local fs, it copies it and caches the
-  /// result. Returns an error if 'hdfs_lib_file' cannot be copied to the local fs.
-  Status GetLocalLibPath(const std::string& hdfs_lib_file, LibType type,
-                         std::string* local_path);
+  /// Gets the local 'path' used to cache the file stored at the global 'hdfs_lib_file'.
+  /// If the referenced global file has not been copied locally, it copies it and
+  /// caches the result.
+  ///
+  /// 'handle' must remain in scope while 'path' is used. The reference count to the
+  /// underlying cache entry is decremented when 'handle' goes out-of-scope.
+  ///
+  /// Returns an error if 'hdfs_lib_file' cannot be copied to the local fs.
+  Status GetLocalPath(const std::string& hdfs_lib_file, LibType type,
+      LibCacheEntryHandle* handle, string* path);
 
   /// Returns status.ok() if the symbol exists in 'hdfs_lib_file', non-ok otherwise.
   /// If 'quiet' is true, the error status for non-Java unfound symbols will not be logged.
@@ -94,6 +104,7 @@ class LibCache {
   /// using fn_ptr and it is no longer valid to use fn_ptr.
   //
   /// If 'quiet' is true, returned error statuses will not be logged.
+  /// TODO: api is error-prone. upgrade to LibCacheEntryHandle (see IMPALA-6439).
   Status GetSoFunctionPtr(const std::string& hdfs_lib_file, const std::string& symbol,
       void** fn_ptr, LibCacheEntry** entry, bool quiet = false);
 
@@ -164,6 +175,27 @@ class LibCache {
                            const LibMap::iterator& entry_iterator);
 };
 
+/// Handle for a LibCacheEntry that decrements its reference count when the handle is
+/// destroyed or re-used for another entry.
+class LibCacheEntryHandle {
+ public:
+  LibCacheEntryHandle() {}
+  ~LibCacheEntryHandle();
+
+ private:
+  friend class LibCache;
+
+  LibCacheEntry* entry() const { return entry_; }
+  void SetEntry(LibCacheEntry* entry) {
+    if (entry_ != nullptr) LibCache::instance()->DecrementUseCount(entry);
+    entry_ = entry;
+  }
+
+  LibCacheEntry* entry_ = nullptr;
+
+  DISALLOW_COPY_AND_ASSIGN(LibCacheEntryHandle);
+};
+
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/impala/blob/885776ed/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 35b5a15..0a84d09 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -294,9 +294,11 @@ static void ResolveSymbolLookup(const TSymbolLookupParams params,
   if (params.fn_binary_type != TFunctionBinaryType::BUILTIN) {
     // Refresh the library if necessary since we're creating a new function
     LibCache::instance()->SetNeedsRefresh(params.location);
+    LibCacheEntryHandle handle;
     string dummy_local_path;
-    Status status = LibCache::instance()->GetLocalLibPath(
-        params.location, type, &dummy_local_path);
+    Status status = LibCache::instance()->GetLocalPath(
+        params.location, type, &handle, &dummy_local_path);
+
     if (!status.ok()) {
       result->__set_result_code(TSymbolLookupResultCode::BINARY_NOT_FOUND);
       result->__set_error_msg(status.GetDetail());
@@ -387,9 +389,10 @@ Java_org_apache_impala_service_FeSupport_NativeCacheJar(
       JniUtil::internal_exc_class(), nullptr);
 
   TCacheJarResult result;
+  LibCacheEntryHandle handle;
   string local_path;
-  Status status = LibCache::instance()->GetLocalLibPath(params.hdfs_location,
-      LibCache::TYPE_JAR, &local_path);
+  Status status = LibCache::instance()->GetLocalPath(
+      params.hdfs_location, LibCache::TYPE_JAR, &handle, &local_path);
   status.ToThrift(&result.status);
   if (status.ok()) result.__set_local_path(local_path);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/885776ed/tests/query_test/test_udfs.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 1ff716a..61dd54c 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -18,6 +18,9 @@
 from copy import copy
 import os
 import pytest
+import random
+import threading
+import time
 from subprocess import check_call
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
@@ -316,8 +319,6 @@ class TestUdfExecution(TestUdfBase):
       self.run_test_case('QueryTest/udf-non-deterministic', vector,
           use_db=unique_database)
 
-  # Runs serially as a temporary workaround for IMPALA_6092.
-  @pytest.mark.execute_serially
   def test_java_udfs(self, vector, unique_database):
     self.run_test_case('QueryTest/load-java-udfs', vector, use_db=unique_database)
     self.run_test_case('QueryTest/java-udf', vector, use_db=unique_database)
@@ -418,9 +419,6 @@ class TestUdfTargeted(TestUdfBase):
             unique_database, tgt_udf_path))
     query = "select `{0}`.fn_invalid_symbol('test')".format(unique_database)
 
-    # Dropping the function can interact with other tests whose Java classes are in
-    # the same jar. Use a copy of the jar to avoid unintended interactions.
-    # See IMPALA-6215 and IMPALA-6092 for examples.
     check_call(["hadoop", "fs", "-put", "-f", src_udf_path, tgt_udf_path])
     self.client.execute(drop_fn_stmt)
     self.client.execute(create_fn_stmt)
@@ -429,6 +427,86 @@ class TestUdfTargeted(TestUdfBase):
       assert "Unable to find class" in str(ex)
     self.client.execute(drop_fn_stmt)
 
+  def test_concurrent_jar_drop_use(self, vector, unique_database):
+    """IMPALA-6215: race between dropping/using java udf's defined in the same jar.
+       This test runs concurrent drop/use threads that result in class not found
+       exceptions when the race is present.
+    """
+    udf_src_path = os.path.join(
+      os.environ['IMPALA_HOME'], "testdata/udfs/impala-hive-udfs.jar")
+    udf_tgt_path = get_fs_path(
+      '/test-warehouse/impala-hive-udfs-{0}.jar'.format(unique_database))
+
+    create_fn_to_drop = """create function {0}.foo_{1}() returns string
+                           LOCATION '{2}' SYMBOL='org.apache.impala.TestUpdateUdf'"""
+    create_fn_to_use = """create function {0}.use_it(string) returns string
+                          LOCATION '{1}' SYMBOL='org.apache.impala.TestUdf'"""
+    drop_fn = "drop function if exists {0}.foo_{1}()"
+    use_fn = """select * from (select max(int_col) from functional.alltypesagg
+                where {0}.use_it(string_col) = 'blah' union all
+                (select max(int_col) from functional.alltypesagg
+                 where {0}.use_it(String_col) > '1' union all
+                (select max(int_col) from functional.alltypesagg
+                 where {0}.use_it(string_col) > '1'))) v"""
+    num_drops = 100
+    num_uses = 100
+
+    # use a unique jar for this test to avoid interactions with other tests
+    # that use the same jar
+    check_call(["hadoop", "fs", "-put", "-f", udf_src_path, udf_tgt_path])
+
+    # create all the functions.
+    setup_client = self.create_impala_client()
+    try:
+      s = create_fn_to_use.format(unique_database, udf_tgt_path)
+      print "use create: " + s
+      setup_client.execute(s)
+    except Exception as e:
+      print e
+      assert False
+    for i in range(0, num_drops):
+      try:
+        setup_client.execute(create_fn_to_drop.format(unique_database, i, udf_tgt_path))
+      except Exception as e:
+        print e
+        assert False
+
+    errors = []
+    def use_fn_method():
+      time.sleep(5 + random.random())
+      client = self.create_impala_client()
+      try:
+        client.execute(use_fn.format(unique_database))
+      except Exception as e: errors.append(e)
+
+    def drop_fn_method(i):
+      time.sleep(1 + random.random())
+      client = self.create_impala_client()
+      try:
+        client.execute(drop_fn.format(unique_database, i))
+      except Exception as e: errors.append(e)
+
+    # create threads to use functions.
+    runner_threads = []
+    for i in range(0, num_uses):
+      runner_threads.append(threading.Thread(target=use_fn_method))
+
+    # create threads to drop functions.
+    drop_threads = []
+    for i in range(0, num_drops):
+      runner_threads.append(threading.Thread(target=drop_fn_method, args=(i, )))
+
+    # launch all runner threads.
+    for t in runner_threads: t.start()
+
+    # join all threads.
+    for t in runner_threads: t.join();
+
+    # Check for any errors.
+    for e in errors: print e
+    assert len(errors) == 0
+
+
   @SkipIfLocal.multiple_impalad
   def test_hive_udfs_missing_jar(self, vector, unique_database):
     """ IMPALA-2365: Impalad shouldn't crash if the udf jar isn't present