You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2018/02/07 21:23:15 UTC
[4/4] impala git commit: Revert "IMPALA-6215: Removes race when using
LibCache."
Revert "IMPALA-6215: Removes race when using LibCache."
This reverts commit 4aafa5e9ba9fe22d2dbc7764a796b3cd04136cc0.
See IMPALA-6488 for an example of a crash that this revert is
trying to avoid.
Change-Id: I2e0a22d38f15fb3e34f08633ab0fc7c87c92d40f
Reviewed-on: http://gerrit.cloudera.org:8080/9244
Reviewed-by: Alex Behm <al...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>
Tested-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/136267e8
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/136267e8
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/136267e8
Branch: refs/heads/master
Commit: 136267e87ff3b9304b9aefa70947b160721ca32b
Parents: 0a9f036
Author: Vuk Ercegovac <ve...@cloudera.com>
Authored: Wed Feb 7 11:24:50 2018 -0800
Committer: Thomas Tauber-Marshall <tm...@cloudera.com>
Committed: Wed Feb 7 20:54:37 2018 +0000
----------------------------------------------------------------------
be/src/codegen/llvm-codegen.cc | 5 +-
be/src/exec/external-data-source-executor.cc | 5 +-
be/src/exprs/hive-udf-call.cc | 57 ++++++++-------
be/src/exprs/hive-udf-call.h | 3 +
be/src/runtime/lib-cache.cc | 22 +++---
be/src/runtime/lib-cache.h | 44 ++----------
be/src/service/fe-support.cc | 11 ++-
tests/query_test/test_udfs.py | 88 ++---------------------
8 files changed, 58 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/136267e8/be/src/codegen/llvm-codegen.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index 72293a7..e1a606c 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -327,10 +327,9 @@ Status LlvmCodeGen::LinkModuleFromLocalFs(const string& file) {
Status LlvmCodeGen::LinkModuleFromHdfs(const string& hdfs_location) {
if (linked_modules_.find(hdfs_location) != linked_modules_.end()) return Status::OK();
- LibCacheEntryHandle handle;
string local_path;
- RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(hdfs_location, LibCache::TYPE_IR,
- &handle, &local_path));
+ RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(hdfs_location, LibCache::TYPE_IR,
+ &local_path));
RETURN_IF_ERROR(LinkModuleFromLocalFs(local_path));
linked_modules_.insert(hdfs_location);
return Status::OK();
http://git-wip-us.apache.org/repos/asf/impala/blob/136267e8/be/src/exec/external-data-source-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/external-data-source-executor.cc b/be/src/exec/external-data-source-executor.cc
index 20fe50e..7c54f39 100644
--- a/be/src/exec/external-data-source-executor.cc
+++ b/be/src/exec/external-data-source-executor.cc
@@ -136,10 +136,9 @@ ExternalDataSourceExecutor::~ExternalDataSourceExecutor() {
Status ExternalDataSourceExecutor::Init(const string& jar_path,
const string& class_name, const string& api_version, const string& init_string) {
DCHECK(!is_initialized_);
- LibCacheEntryHandle handle;
string local_jar_path;
- RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(
- jar_path, LibCache::TYPE_JAR, &handle, &local_jar_path));
+ RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
+ jar_path, LibCache::TYPE_JAR, &local_jar_path));
JNIEnv* jni_env = getJNIEnv();
http://git-wip-us.apache.org/repos/asf/impala/blob/136267e8/be/src/exprs/hive-udf-call.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc
index e1ac676..19e2e63 100644
--- a/be/src/exprs/hive-udf-call.cc
+++ b/be/src/exprs/hive-udf-call.cc
@@ -174,6 +174,10 @@ Status HiveUdfCall::Init(const RowDescriptor& row_desc, RuntimeState* state) {
// Initialize children first.
RETURN_IF_ERROR(ScalarExpr::Init(row_desc, state));
+ // Copy the Hive Jar from hdfs to local file system.
+ RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
+ fn_.hdfs_location, LibCache::TYPE_JAR, &local_location_));
+
// Initialize input_byte_offsets_ and input_buffer_size_
for (int i = 0; i < GetNumChildren(); ++i) {
input_byte_offsets_.push_back(input_buffer_size_);
@@ -198,35 +202,30 @@ Status HiveUdfCall::OpenEvaluator(FunctionContext::FunctionStateScope scope,
JNIEnv* env = getJNIEnv();
if (env == NULL) return Status("Failed to get/create JVM");
- {
- LibCacheEntryHandle handle;
- string local_location;
- RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(
- fn_.hdfs_location, LibCache::TYPE_JAR, &handle, &local_location));
- THiveUdfExecutorCtorParams ctor_params;
- ctor_params.fn = fn_;
- ctor_params.local_location = local_location;
- ctor_params.input_byte_offsets = input_byte_offsets_;
-
- jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_];
- jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()];
- jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()];
-
- ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer;
- ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer;
- ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer;
- ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value;
-
- jbyteArray ctor_params_bytes;
-
- // Add a scoped cleanup jni reference object. This cleans up local refs made below.
- JniLocalFrame jni_frame;
- RETURN_IF_ERROR(jni_frame.push(env));
-
- RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
- // Create the java executor object
- jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes);
- }
+ THiveUdfExecutorCtorParams ctor_params;
+ ctor_params.fn = fn_;
+ ctor_params.local_location = local_location_;
+ ctor_params.input_byte_offsets = input_byte_offsets_;
+
+ jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_];
+ jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()];
+ jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()];
+
+ ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer;
+ ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer;
+ ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer;
+ ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value;
+
+ jbyteArray ctor_params_bytes;
+
+ // Add a scoped cleanup jni reference object. This cleans up local refs made
+ // below.
+ JniLocalFrame jni_frame;
+ RETURN_IF_ERROR(jni_frame.push(env));
+
+ RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+ // Create the java executor object
+ jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes);
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor));
http://git-wip-us.apache.org/repos/asf/impala/blob/136267e8/be/src/exprs/hive-udf-call.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/hive-udf-call.h b/be/src/exprs/hive-udf-call.h
index 8ca0372..7ce5eb0 100644
--- a/be/src/exprs/hive-udf-call.h
+++ b/be/src/exprs/hive-udf-call.h
@@ -116,6 +116,9 @@ class HiveUdfCall : public ScalarExpr {
/// error.
AnyVal* Evaluate(ScalarExprEvaluator* eval, const TupleRow* row) const;
+ /// The path on the local FS to the UDF's jar
+ std::string local_location_;
+
/// input_byte_offsets_[i] is the byte offset child ith's input argument should
/// be written to.
std::vector<int> input_byte_offsets_;
http://git-wip-us.apache.org/repos/asf/impala/blob/136267e8/be/src/runtime/lib-cache.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.cc b/be/src/runtime/lib-cache.cc
index b4a4f59..d694c49 100644
--- a/be/src/runtime/lib-cache.cc
+++ b/be/src/runtime/lib-cache.cc
@@ -129,10 +129,6 @@ LibCacheEntry::~LibCacheEntry() {
unlink(local_path.c_str());
}
-LibCacheEntryHandle::~LibCacheEntryHandle() {
- if (entry_ != nullptr) LibCache::instance()->DecrementUseCount(entry_);
-}
-
Status LibCache::GetSoFunctionPtr(const string& hdfs_lib_file, const string& symbol,
void** fn_ptr, LibCacheEntry** ent, bool quiet) {
if (hdfs_lib_file.empty()) {
@@ -177,23 +173,21 @@ void LibCache::DecrementUseCount(LibCacheEntry* entry) {
if (entry == NULL) return;
bool can_delete = false;
{
- unique_lock<mutex> lock(entry->lock);
+ unique_lock<mutex> lock(entry->lock);;
--entry->use_count;
can_delete = (entry->use_count == 0 && entry->should_remove);
}
if (can_delete) delete entry;
}
-Status LibCache::GetLocalPath(const std::string& hdfs_lib_file, LibType type,
- LibCacheEntryHandle* handle, string* path) {
- DCHECK(handle != nullptr && handle->entry() == nullptr);
- LibCacheEntry* entry = nullptr;
+Status LibCache::GetLocalLibPath(const string& hdfs_lib_file, LibType type,
+ string* local_path) {
unique_lock<mutex> lock;
+ LibCacheEntry* entry = NULL;
RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, &lock, &entry));
- DCHECK(entry != nullptr);
- ++entry->use_count;
- handle->SetEntry(entry);
- *path = entry->local_path;
+ DCHECK(entry != NULL);
+ DCHECK_EQ(entry->type, type);
+ *local_path = entry->local_path;
return Status::OK();
}
@@ -358,7 +352,7 @@ Status LibCache::GetCacheEntryInternal(const string& hdfs_lib_file, LibType type
entry_lock->swap(local_entry_lock);
RETURN_IF_ERROR((*entry)->copy_file_status);
- DCHECK_EQ((*entry)->type, type) << (*entry)->local_path;
+ DCHECK_EQ((*entry)->type, type);
DCHECK(!(*entry)->local_path.empty());
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/136267e8/be/src/runtime/lib-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h
index 7296a00..4a564ee 100644
--- a/be/src/runtime/lib-cache.h
+++ b/be/src/runtime/lib-cache.h
@@ -49,16 +49,11 @@ class RuntimeState;
/// using the library. When the caller requests a ptr into the library, they
/// are given the entry handle and must decrement the ref count when they
/// are done.
-/// Note: Explicitly managing this reference count at the client is error-prone. See the
-/// api for accessing a path, GetLocalPath(), that uses the handle's scope to manage the
-/// reference count.
//
/// TODO:
/// - refresh libraries
-/// - better cached module management
-/// - improve the api to be less error-prone (IMPALA-6439)
+/// - better cached module management.
struct LibCacheEntry;
-class LibCacheEntryHandle;
class LibCache {
public:
@@ -76,16 +71,11 @@ class LibCache {
/// Initializes the libcache. Must be called before any other APIs.
static Status Init();
- /// Gets the local 'path' used to cache the file stored at the global 'hdfs_lib_file'.
- /// If the referenced global file has not been copied locally, it copies it and
- /// caches the result.
- ///
- /// 'handle' must remain in scope while 'path' is used. The reference count to the
- /// underlying cache entry is decremented when 'handle' goes out-of-scope.
- ///
- /// Returns an error if 'hdfs_lib_file' cannot be copied to the local fs.
- Status GetLocalPath(const std::string& hdfs_lib_file, LibType type,
- LibCacheEntryHandle* handle, string* path);
+ /// Gets the local file system path for the library at 'hdfs_lib_file'. If
+ /// this file is not already on the local fs, it copies it and caches the
+ /// result. Returns an error if 'hdfs_lib_file' cannot be copied to the local fs.
+ Status GetLocalLibPath(const std::string& hdfs_lib_file, LibType type,
+ std::string* local_path);
/// Returns status.ok() if the symbol exists in 'hdfs_lib_file', non-ok otherwise.
/// If 'quiet' is true, the error status for non-Java unfound symbols will not be logged.
@@ -104,7 +94,6 @@ class LibCache {
/// using fn_ptr and it is no longer valid to use fn_ptr.
//
/// If 'quiet' is true, returned error statuses will not be logged.
- /// TODO: api is error-prone. upgrade to LibCacheEntryHandle (see IMPALA-6439).
Status GetSoFunctionPtr(const std::string& hdfs_lib_file, const std::string& symbol,
void** fn_ptr, LibCacheEntry** entry, bool quiet = false);
@@ -175,27 +164,6 @@ class LibCache {
const LibMap::iterator& entry_iterator);
};
-/// Handle for a LibCacheEntry that decrements its reference count when the handle is
-/// destroyed or re-used for another entry.
-class LibCacheEntryHandle {
- public:
- LibCacheEntryHandle() {}
- ~LibCacheEntryHandle();
-
- private:
- friend class LibCache;
-
- LibCacheEntry* entry() const { return entry_; }
- void SetEntry(LibCacheEntry* entry) {
- if (entry_ != nullptr) LibCache::instance()->DecrementUseCount(entry);
- entry_ = entry;
- }
-
- LibCacheEntry* entry_ = nullptr;
-
- DISALLOW_COPY_AND_ASSIGN(LibCacheEntryHandle);
-};
-
}
#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/136267e8/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 12ac874..d1979e7 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -296,11 +296,9 @@ static void ResolveSymbolLookup(const TSymbolLookupParams params,
if (params.fn_binary_type != TFunctionBinaryType::BUILTIN) {
// Refresh the library if necessary since we're creating a new function
LibCache::instance()->SetNeedsRefresh(params.location);
- LibCacheEntryHandle handle;
string dummy_local_path;
- Status status = LibCache::instance()->GetLocalPath(
- params.location, type, &handle, &dummy_local_path);
-
+ Status status = LibCache::instance()->GetLocalLibPath(
+ params.location, type, &dummy_local_path);
if (!status.ok()) {
result->__set_result_code(TSymbolLookupResultCode::BINARY_NOT_FOUND);
result->__set_error_msg(status.GetDetail());
@@ -391,10 +389,9 @@ Java_org_apache_impala_service_FeSupport_NativeCacheJar(
JniUtil::internal_exc_class(), nullptr);
TCacheJarResult result;
- LibCacheEntryHandle handle;
string local_path;
- Status status = LibCache::instance()->GetLocalPath(
- params.hdfs_location, LibCache::TYPE_JAR, &handle, &local_path);
+ Status status = LibCache::instance()->GetLocalLibPath(params.hdfs_location,
+ LibCache::TYPE_JAR, &local_path);
status.ToThrift(&result.status);
if (status.ok()) result.__set_local_path(local_path);
http://git-wip-us.apache.org/repos/asf/impala/blob/136267e8/tests/query_test/test_udfs.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 61dd54c..1ff716a 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -18,9 +18,6 @@
from copy import copy
import os
import pytest
-import random
-import threading
-import time
from subprocess import check_call
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
@@ -319,6 +316,8 @@ class TestUdfExecution(TestUdfBase):
self.run_test_case('QueryTest/udf-non-deterministic', vector,
use_db=unique_database)
+ # Runs serially as a temporary workaround for IMPALA_6092.
+ @pytest.mark.execute_serially
def test_java_udfs(self, vector, unique_database):
self.run_test_case('QueryTest/load-java-udfs', vector, use_db=unique_database)
self.run_test_case('QueryTest/java-udf', vector, use_db=unique_database)
@@ -419,6 +418,9 @@ class TestUdfTargeted(TestUdfBase):
unique_database, tgt_udf_path))
query = "select `{0}`.fn_invalid_symbol('test')".format(unique_database)
+ # Dropping the function can interact with other tests whose Java classes are in
+ # the same jar. Use a copy of the jar to avoid unintended interactions.
+ # See IMPALA-6215 and IMPALA-6092 for examples.
check_call(["hadoop", "fs", "-put", "-f", src_udf_path, tgt_udf_path])
self.client.execute(drop_fn_stmt)
self.client.execute(create_fn_stmt)
@@ -427,86 +429,6 @@ class TestUdfTargeted(TestUdfBase):
assert "Unable to find class" in str(ex)
self.client.execute(drop_fn_stmt)
- def test_concurrent_jar_drop_use(self, vector, unique_database):
- """IMPALA-6215: race between dropping/using java udf's defined in the same jar.
- This test runs concurrent drop/use threads that result in class not found
- exceptions when the race is present.
- """
- udf_src_path = os.path.join(
- os.environ['IMPALA_HOME'], "testdata/udfs/impala-hive-udfs.jar")
- udf_tgt_path = get_fs_path(
- '/test-warehouse/impala-hive-udfs-{0}.jar'.format(unique_database))
-
- create_fn_to_drop = """create function {0}.foo_{1}() returns string
- LOCATION '{2}' SYMBOL='org.apache.impala.TestUpdateUdf'"""
- create_fn_to_use = """create function {0}.use_it(string) returns string
- LOCATION '{1}' SYMBOL='org.apache.impala.TestUdf'"""
- drop_fn = "drop function if exists {0}.foo_{1}()"
- use_fn = """select * from (select max(int_col) from functional.alltypesagg
- where {0}.use_it(string_col) = 'blah' union all
- (select max(int_col) from functional.alltypesagg
- where {0}.use_it(String_col) > '1' union all
- (select max(int_col) from functional.alltypesagg
- where {0}.use_it(string_col) > '1'))) v"""
- num_drops = 100
- num_uses = 100
-
- # use a unique jar for this test to avoid interactions with other tests
- # that use the same jar
- check_call(["hadoop", "fs", "-put", "-f", udf_src_path, udf_tgt_path])
-
- # create all the functions.
- setup_client = self.create_impala_client()
- try:
- s = create_fn_to_use.format(unique_database, udf_tgt_path)
- print "use create: " + s
- setup_client.execute(s)
- except Exception as e:
- print e
- assert False
- for i in range(0, num_drops):
- try:
- setup_client.execute(create_fn_to_drop.format(unique_database, i, udf_tgt_path))
- except Exception as e:
- print e
- assert False
-
- errors = []
- def use_fn_method():
- time.sleep(5 + random.random())
- client = self.create_impala_client()
- try:
- client.execute(use_fn.format(unique_database))
- except Exception as e: errors.append(e)
-
- def drop_fn_method(i):
- time.sleep(1 + random.random())
- client = self.create_impala_client()
- try:
- client.execute(drop_fn.format(unique_database, i))
- except Exception as e: errors.append(e)
-
- # create threads to use functions.
- runner_threads = []
- for i in range(0, num_uses):
- runner_threads.append(threading.Thread(target=use_fn_method))
-
- # create threads to drop functions.
- drop_threads = []
- for i in range(0, num_drops):
- runner_threads.append(threading.Thread(target=drop_fn_method, args=(i, )))
-
- # launch all runner threads.
- for t in runner_threads: t.start()
-
- # join all threads.
- for t in runner_threads: t.join();
-
- # Check for any errors.
- for e in errors: print e
- assert len(errors) == 0
-
-
@SkipIfLocal.multiple_impalad
def test_hive_udfs_missing_jar(self, vector, unique_database):
""" IMPALA-2365: Impalad shouldn't crash if the udf jar isn't present