You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2018/01/30 20:54:33 UTC

[1/3] impala git commit: IMPALA-4835: prerequisite buffer pool changes

Repository: impala
Updated Branches:
  refs/heads/master 0b494d55a -> 4aafa5e9b


IMPALA-4835: prerequisite buffer pool changes

The scanner/buffer pool changes will have different scanner
threads sharing the same buffer pool client. This requires that the
AllocateBuffer() API is safe to call concurrently from different
threads, which was true previously but not documented or tested.

This updates the comments and adds a couple of tests.

Change-Id: I8f2196722df59f2d367787c0550058022e296e24
Reviewed-on: http://gerrit.cloudera.org:8080/9097
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/12938fd4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/12938fd4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/12938fd4

Branch: refs/heads/master
Commit: 12938fd4490f3cb79bd39577193bd9e7e4979f03
Parents: 0b494d5
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Jan 22 14:30:58 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Jan 29 08:00:00 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/bufferpool/buffer-pool-test.cc | 136 +++++++++++++++++++++
 be/src/runtime/bufferpool/buffer-pool.h       |  15 ++-
 2 files changed, 145 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/12938fd4/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 720dc13..0138a08 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -40,7 +40,9 @@
 #include "testutil/death-test-util.h"
 #include "testutil/gtest-util.h"
 #include "testutil/rand-util.h"
+#include "util/blocking-queue.h"
 #include "util/filesystem-util.h"
+#include "util/spinlock.h"
 #include "util/metrics.h"
 
 #include "common/names.h"
@@ -751,6 +753,78 @@ TEST_F(BufferPoolTest, BufferTransfer) {
   global_reservations_.Close();
 }
 
+TEST_F(BufferPoolTest, BufferTransferConcurrent) {
+  // Transfer buffers between threads in a circular fashion. Each client needs to have
+  // enough reservation for two buffers, since it may receive a buffer before handing
+  // off the next one.
+  const int NUM_CLIENTS = 5;
+  const int64_t TOTAL_MEM = NUM_CLIENTS * TEST_BUFFER_LEN * 2;
+  global_reservations_.InitRootTracker(NULL, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
+
+  BufferPool::ClientHandle clients[NUM_CLIENTS];
+  BufferPool::BufferHandle handles[NUM_CLIENTS];
+  SpinLock locks[NUM_CLIENTS]; // Each lock protects the corresponding BufferHandle.
+  for (int i = 0; i < NUM_CLIENTS; ++i) {
+    ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
+        TOTAL_MEM, NewProfile(), &clients[i]));
+    ASSERT_TRUE(clients[i].IncreaseReservationToFit(2 * TEST_BUFFER_LEN));
+  }
+
+  thread_group workers;
+
+  for (int thread_idx = 0; thread_idx < NUM_CLIENTS; ++thread_idx) {
+    workers.add_thread(new thread([&pool, &clients, &handles, &locks, thread_idx] {
+      // Transfer buffers around between the clients repeatedly in a circle.
+      BufferHandle handle;
+      {
+        lock_guard<SpinLock> l(locks[thread_idx]);
+        LOG(INFO) << "Allocate from " << (void*)&clients[thread_idx];
+        ASSERT_OK(pool.AllocateBuffer(
+              &clients[thread_idx], TEST_BUFFER_LEN, &handle));
+      }
+      for (int iter = 0; iter < 100; ++iter) {
+        int next_thread_idx = (thread_idx + 1) % NUM_CLIENTS;
+        // Transfer our buffer to the next thread.
+        {
+          unique_lock<SpinLock> l(locks[next_thread_idx]);
+          // Spin until we can add the handle.
+          while (true) {
+            if (!handles[next_thread_idx].is_open()) break;
+            l.unlock();
+            sched_yield();
+            l.lock();
+          }
+          ASSERT_TRUE(handle.is_open());
+          ASSERT_OK(pool.TransferBuffer(&clients[thread_idx], &handle,
+              &clients[next_thread_idx], &handles[next_thread_idx]));
+          // Check that the transfer left things in a consistent state.
+          ASSERT_TRUE(handles[next_thread_idx].is_open());
+          ASSERT_FALSE(handle.is_open());
+          ASSERT_GE(clients[next_thread_idx].GetUsedReservation(), TEST_BUFFER_LEN);
+        }
+        // Get a new buffer from the previous thread.
+        {
+          unique_lock<SpinLock> l(locks[thread_idx]);
+          // Spin until we receive a handle from the previous thread.
+          while (true) {
+            if (handles[thread_idx].is_open()) break;
+            l.unlock();
+            sched_yield();
+            l.lock();
+          }
+          handle = move(handles[thread_idx]);
+        }
+      }
+      pool.FreeBuffer(&clients[thread_idx], &handle);
+      }));
+  }
+  workers.join_all();
+  for (BufferPool::ClientHandle& client : clients) pool.DeregisterClient(&client);
+  ASSERT_EQ(global_reservations_.GetReservation(), 0);
+  global_reservations_.Close();
+}
+
 /// Test basic pinning and unpinning.
 TEST_F(BufferPoolTest, Pin) {
   int64_t total_mem = TEST_BUFFER_LEN * 1024;
@@ -2047,6 +2121,68 @@ TEST_F(BufferPoolTest, DecreaseReservation) {
   pool.DeregisterClient(&client);
   global_reservations_.Close();
 }
+
+// Test concurrent operations using the same client and different buffers.
+TEST_F(BufferPoolTest, ConcurrentBufferOperations) {
+  const int DELETE_THREADS = 2;
+  const int ALLOCATE_THREADS = 2;
+  const int NUM_ALLOCATIONS_PER_THREAD = 128;
+  const int MAX_NUM_BUFFERS = 16;
+  const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
+  BufferPool::ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", nullptr, &global_reservations_, nullptr,
+      TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservationToFit(TOTAL_MEM));
+
+  thread_group allocate_threads;
+  thread_group delete_threads;
+  AtomicInt64 available_reservation(TOTAL_MEM);
+
+  // Queue of buffers to be deleted, along with the first byte of the data in
+  // the buffer, for validation purposes.
+  BlockingQueue<pair<uint8_t, BufferHandle>> delete_queue(MAX_NUM_BUFFERS);
+
+  // Allocate threads allocate buffers whenever able and enqueue them.
+  for (int i = 0; i < ALLOCATE_THREADS; ++i) {
+    allocate_threads.add_thread(new thread([&] {
+        for (int i = 0; i < NUM_ALLOCATIONS_PER_THREAD; ++i) {
+          // Try to deduct reservation.
+          while (true) {
+            int64_t val = available_reservation.Load();
+            if (val >= TEST_BUFFER_LEN
+                && available_reservation.CompareAndSwap(val, val - TEST_BUFFER_LEN)) {
+              break;
+            }
+          }
+          BufferHandle buffer;
+          ASSERT_OK(pool.AllocateBuffer(&client, TEST_BUFFER_LEN, &buffer));
+          uint8_t first_byte = static_cast<uint8_t>(i % 256);
+          buffer.data()[0] = first_byte;
+          delete_queue.BlockingPut(pair<uint8_t, BufferHandle>(first_byte, move(buffer)));
+        }
+        }));
+  }
+
+  // Delete threads pull buffers off the queue and free them.
+  for (int i = 0; i < DELETE_THREADS; ++i) {
+    delete_threads.add_thread(new thread([&] {
+          pair<uint8_t, BufferHandle> item;
+          while (delete_queue.BlockingGet(&item)) {
+            ASSERT_EQ(item.first, item.second.data()[0]);
+            pool.FreeBuffer(&client, &item.second);
+            available_reservation.Add(TEST_BUFFER_LEN);
+          }
+        }));
+
+  }
+  allocate_threads.join_all();
+  delete_queue.Shutdown();
+  delete_threads.join_all();
+  pool.DeregisterClient(&client);
+  global_reservations_.Close();
+}
 }
 
 int main(int argc, char** argv) {

http://git-wip-us.apache.org/repos/asf/impala/blob/12938fd4/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 86be6f9..5b98579 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -227,19 +227,21 @@ class BufferPool : public CacheLineAligned {
   /// pinned multiple times via 'page_handle'. May return an error if 'page_handle' was
   /// unpinned earlier with no subsequent GetBuffer() call and a read error is
   /// encountered while bringing the page back into memory.
-  Status ExtractBuffer(
-      ClientHandle* client, PageHandle* page_handle, BufferHandle* buffer_handle) WARN_UNUSED_RESULT;
+  Status ExtractBuffer(ClientHandle* client, PageHandle* page_handle,
+      BufferHandle* buffer_handle) WARN_UNUSED_RESULT;
 
   /// Allocates a new buffer of 'len' bytes. Uses reservation from 'client'. The caller
   /// is responsible for ensuring it has enough unused reservation before calling
   /// AllocateBuffer() (otherwise it will DCHECK). AllocateBuffer() only fails when
   /// a system error prevents the buffer pool from fulfilling the reservation.
+  /// Safe to call concurrently with any other operations for 'client', except for
+  /// operations on the same 'handle'.
   Status AllocateBuffer(
       ClientHandle* client, int64_t len, BufferHandle* handle) WARN_UNUSED_RESULT;
 
   /// If 'handle' is open, close 'handle', free the buffer and decrease the reservation
-  /// usage from 'client'. Idempotent. Safe to call concurrently with any other
-  /// operations for 'client'.
+  /// usage from 'client'. Idempotent. Safe to call concurrently with other operations
+  /// for 'client', except for operations on the same 'handle'.
   void FreeBuffer(ClientHandle* client, BufferHandle* handle);
 
   /// Transfer ownership of buffer from 'src_client' to 'dst_client' and move the
@@ -247,7 +249,8 @@ class BufferPool : public CacheLineAligned {
   /// decreases reservation usage in 'src_client'. 'src' must be open and 'dst' must be
   /// closed before calling. 'src'/'dst' and 'src_client'/'dst_client' must be different.
   /// After a successful call, 'src' is closed and 'dst' is open. Safe to call
-  /// concurrently with any other operations for 'src_client'.
+  /// concurrently with any other operations for 'src_client', except for operations
+  /// on the same handles.
   Status TransferBuffer(ClientHandle* src_client, BufferHandle* src,
       ClientHandle* dst_client, BufferHandle* dst) WARN_UNUSED_RESULT;
 
@@ -507,7 +510,7 @@ class BufferPool::PageHandle {
   DISALLOW_COPY_AND_ASSIGN(PageHandle);
   friend class BufferPool;
   friend class BufferPoolTest;
-  friend class Page;
+  friend struct Page;
 
   /// Internal helper to open the handle for the given page.
   void Open(Page* page, ClientHandle* client);


[3/3] impala git commit: IMPALA-6215: Removes race when using LibCache.

Posted by mi...@apache.org.
IMPALA-6215: Removes race when using LibCache.

LibCache's api to provide access to locally cached files has a race.
Currently, the client of the cache accesses the locally cached path
as a string, but nothing guarantees that the associated file is not
removed before the client is done using it. This race is suspected
as the root cause for the flakiness seen in IMPALA-6092. These tests
fail once in a while with classloader errors unable to load java udf
classes. In these tests, the lib cache makes no guarantee that the path
to the jar will remain valid from the time the path is acquired through
the time needed to fetch the jar and resolve the needed classes.

LibCache offers liveness guarantees for shared objects via reference
counting. The fix in this patch extends this API to also cover paths
to locally cached files.

Testing:
- added a test to test_udfs.py that does many concurrent udf uses and
  removals. By increasing the concurrent operations to 100, the issue
  in IMPALA-6092 is locally reproducible on every run. With this fix,
  the problem is no longer reproducible with this test.

Change-Id: I9175085201fe8b11424ab8f88d7b992cb7b0daea
Reviewed-on: http://gerrit.cloudera.org:8080/9089
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4aafa5e9
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4aafa5e9
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4aafa5e9

Branch: refs/heads/master
Commit: 4aafa5e9ba9fe22d2dbc7764a796b3cd04136cc0
Parents: 82cf99d
Author: Vuk Ercegovac <ve...@cloudera.com>
Authored: Tue Nov 21 08:41:03 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Jan 30 20:00:41 2018 +0000

----------------------------------------------------------------------
 be/src/codegen/llvm-codegen.cc               |  5 +-
 be/src/exec/external-data-source-executor.cc |  5 +-
 be/src/exprs/hive-udf-call.cc                | 57 +++++++--------
 be/src/exprs/hive-udf-call.h                 |  3 -
 be/src/runtime/lib-cache.cc                  | 22 +++---
 be/src/runtime/lib-cache.h                   | 44 ++++++++++--
 be/src/service/fe-support.cc                 | 11 +--
 tests/query_test/test_udfs.py                | 88 +++++++++++++++++++++--
 8 files changed, 177 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4aafa5e9/be/src/codegen/llvm-codegen.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index e1a606c..72293a7 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -327,9 +327,10 @@ Status LlvmCodeGen::LinkModuleFromLocalFs(const string& file) {
 
 Status LlvmCodeGen::LinkModuleFromHdfs(const string& hdfs_location) {
   if (linked_modules_.find(hdfs_location) != linked_modules_.end()) return Status::OK();
+  LibCacheEntryHandle handle;
   string local_path;
-  RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(hdfs_location, LibCache::TYPE_IR,
-      &local_path));
+  RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(hdfs_location, LibCache::TYPE_IR,
+      &handle, &local_path));
   RETURN_IF_ERROR(LinkModuleFromLocalFs(local_path));
   linked_modules_.insert(hdfs_location);
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/4aafa5e9/be/src/exec/external-data-source-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/external-data-source-executor.cc b/be/src/exec/external-data-source-executor.cc
index 7c54f39..20fe50e 100644
--- a/be/src/exec/external-data-source-executor.cc
+++ b/be/src/exec/external-data-source-executor.cc
@@ -136,9 +136,10 @@ ExternalDataSourceExecutor::~ExternalDataSourceExecutor() {
 Status ExternalDataSourceExecutor::Init(const string& jar_path,
     const string& class_name, const string& api_version, const string& init_string) {
   DCHECK(!is_initialized_);
+  LibCacheEntryHandle handle;
   string local_jar_path;
-  RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
-      jar_path, LibCache::TYPE_JAR, &local_jar_path));
+  RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(
+      jar_path, LibCache::TYPE_JAR, &handle, &local_jar_path));
 
   JNIEnv* jni_env = getJNIEnv();
 

http://git-wip-us.apache.org/repos/asf/impala/blob/4aafa5e9/be/src/exprs/hive-udf-call.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc
index 19e2e63..e1ac676 100644
--- a/be/src/exprs/hive-udf-call.cc
+++ b/be/src/exprs/hive-udf-call.cc
@@ -174,10 +174,6 @@ Status HiveUdfCall::Init(const RowDescriptor& row_desc, RuntimeState* state) {
   // Initialize children first.
   RETURN_IF_ERROR(ScalarExpr::Init(row_desc, state));
 
-  // Copy the Hive Jar from hdfs to local file system.
-  RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
-      fn_.hdfs_location, LibCache::TYPE_JAR, &local_location_));
-
   // Initialize input_byte_offsets_ and input_buffer_size_
   for (int i = 0; i < GetNumChildren(); ++i) {
     input_byte_offsets_.push_back(input_buffer_size_);
@@ -202,30 +198,35 @@ Status HiveUdfCall::OpenEvaluator(FunctionContext::FunctionStateScope scope,
   JNIEnv* env = getJNIEnv();
   if (env == NULL) return Status("Failed to get/create JVM");
 
-  THiveUdfExecutorCtorParams ctor_params;
-  ctor_params.fn = fn_;
-  ctor_params.local_location = local_location_;
-  ctor_params.input_byte_offsets = input_byte_offsets_;
-
-  jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_];
-  jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()];
-  jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()];
-
-  ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer;
-  ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer;
-  ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer;
-  ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value;
-
-  jbyteArray ctor_params_bytes;
-
-  // Add a scoped cleanup jni reference object. This cleans up local refs made
-  // below.
-  JniLocalFrame jni_frame;
-  RETURN_IF_ERROR(jni_frame.push(env));
-
-  RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
-  // Create the java executor object
-  jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes);
+  {
+    LibCacheEntryHandle handle;
+    string local_location;
+    RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(
+        fn_.hdfs_location, LibCache::TYPE_JAR, &handle, &local_location));
+    THiveUdfExecutorCtorParams ctor_params;
+    ctor_params.fn = fn_;
+    ctor_params.local_location = local_location;
+    ctor_params.input_byte_offsets = input_byte_offsets_;
+
+    jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_];
+    jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()];
+    jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()];
+
+    ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer;
+    ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer;
+    ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer;
+    ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value;
+
+    jbyteArray ctor_params_bytes;
+
+    // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+    JniLocalFrame jni_frame;
+    RETURN_IF_ERROR(jni_frame.push(env));
+
+    RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+    // Create the java executor object
+    jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes);
+  }
   RETURN_ERROR_IF_EXC(env);
   RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor));
 

http://git-wip-us.apache.org/repos/asf/impala/blob/4aafa5e9/be/src/exprs/hive-udf-call.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/hive-udf-call.h b/be/src/exprs/hive-udf-call.h
index 7ce5eb0..8ca0372 100644
--- a/be/src/exprs/hive-udf-call.h
+++ b/be/src/exprs/hive-udf-call.h
@@ -116,9 +116,6 @@ class HiveUdfCall : public ScalarExpr {
   /// error.
   AnyVal* Evaluate(ScalarExprEvaluator* eval, const TupleRow* row) const;
 
-  /// The path on the local FS to the UDF's jar
-  std::string local_location_;
-
   /// input_byte_offsets_[i] is the byte offset child ith's input argument should
   /// be written to.
   std::vector<int> input_byte_offsets_;

http://git-wip-us.apache.org/repos/asf/impala/blob/4aafa5e9/be/src/runtime/lib-cache.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.cc b/be/src/runtime/lib-cache.cc
index d694c49..b4a4f59 100644
--- a/be/src/runtime/lib-cache.cc
+++ b/be/src/runtime/lib-cache.cc
@@ -129,6 +129,10 @@ LibCacheEntry::~LibCacheEntry() {
   unlink(local_path.c_str());
 }
 
+LibCacheEntryHandle::~LibCacheEntryHandle() {
+  if (entry_ != nullptr) LibCache::instance()->DecrementUseCount(entry_);
+}
+
 Status LibCache::GetSoFunctionPtr(const string& hdfs_lib_file, const string& symbol,
     void** fn_ptr, LibCacheEntry** ent, bool quiet) {
   if (hdfs_lib_file.empty()) {
@@ -173,21 +177,23 @@ void LibCache::DecrementUseCount(LibCacheEntry* entry) {
   if (entry == NULL) return;
   bool can_delete = false;
   {
-    unique_lock<mutex> lock(entry->lock);;
+    unique_lock<mutex> lock(entry->lock);
     --entry->use_count;
     can_delete = (entry->use_count == 0 && entry->should_remove);
   }
   if (can_delete) delete entry;
 }
 
-Status LibCache::GetLocalLibPath(const string& hdfs_lib_file, LibType type,
-                                 string* local_path) {
+Status LibCache::GetLocalPath(const std::string& hdfs_lib_file, LibType type,
+    LibCacheEntryHandle* handle, string* path) {
+  DCHECK(handle != nullptr && handle->entry() == nullptr);
+  LibCacheEntry* entry = nullptr;
   unique_lock<mutex> lock;
-  LibCacheEntry* entry = NULL;
   RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, &lock, &entry));
-  DCHECK(entry != NULL);
-  DCHECK_EQ(entry->type, type);
-  *local_path = entry->local_path;
+  DCHECK(entry != nullptr);
+  ++entry->use_count;
+  handle->SetEntry(entry);
+  *path = entry->local_path;
   return Status::OK();
 }
 
@@ -352,7 +358,7 @@ Status LibCache::GetCacheEntryInternal(const string& hdfs_lib_file, LibType type
     entry_lock->swap(local_entry_lock);
 
     RETURN_IF_ERROR((*entry)->copy_file_status);
-    DCHECK_EQ((*entry)->type, type);
+    DCHECK_EQ((*entry)->type, type) << (*entry)->local_path;
     DCHECK(!(*entry)->local_path.empty());
     return Status::OK();
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/4aafa5e9/be/src/runtime/lib-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h
index 4a564ee..7296a00 100644
--- a/be/src/runtime/lib-cache.h
+++ b/be/src/runtime/lib-cache.h
@@ -49,11 +49,16 @@ class RuntimeState;
 /// using the library. When the caller requests a ptr into the library, they
 /// are given the entry handle and must decrement the ref count when they
 /// are done.
+/// Note: Explicitly managing this reference count at the client is error-prone. See the
+/// api for accessing a path, GetLocalPath(), that uses the handle's scope to manage the
+/// reference count.
 //
 /// TODO:
 /// - refresh libraries
-/// - better cached module management.
+/// - better cached module management
+/// - improve the api to be less error-prone (IMPALA-6439)
 struct LibCacheEntry;
+class LibCacheEntryHandle;
 
 class LibCache {
  public:
@@ -71,11 +76,16 @@ class LibCache {
   /// Initializes the libcache. Must be called before any other APIs.
   static Status Init();
 
-  /// Gets the local file system path for the library at 'hdfs_lib_file'. If
-  /// this file is not already on the local fs, it copies it and caches the
-  /// result. Returns an error if 'hdfs_lib_file' cannot be copied to the local fs.
-  Status GetLocalLibPath(const std::string& hdfs_lib_file, LibType type,
-                         std::string* local_path);
+  /// Gets the local 'path' used to cache the file stored at the global 'hdfs_lib_file'.
+  /// If the referenced global file has not been copied locally, it copies it and
+  /// caches the result.
+  ///
+  /// 'handle' must remain in scope while 'path' is used. The reference count to the
+  /// underlying cache entry is decremented when 'handle' goes out-of-scope.
+  ///
+  /// Returns an error if 'hdfs_lib_file' cannot be copied to the local fs.
+  Status GetLocalPath(const std::string& hdfs_lib_file, LibType type,
+      LibCacheEntryHandle* handle, string* path);
 
   /// Returns status.ok() if the symbol exists in 'hdfs_lib_file', non-ok otherwise.
   /// If 'quiet' is true, the error status for non-Java unfound symbols will not be logged.
@@ -94,6 +104,7 @@ class LibCache {
   /// using fn_ptr and it is no longer valid to use fn_ptr.
   //
   /// If 'quiet' is true, returned error statuses will not be logged.
+  /// TODO: api is error-prone. upgrade to LibCacheEntryHandle (see IMPALA-6439).
   Status GetSoFunctionPtr(const std::string& hdfs_lib_file, const std::string& symbol,
       void** fn_ptr, LibCacheEntry** entry, bool quiet = false);
 
@@ -164,6 +175,27 @@ class LibCache {
                            const LibMap::iterator& entry_iterator);
 };
 
+/// Handle for a LibCacheEntry that decrements its reference count when the handle is
+/// destroyed or re-used for another entry.
+class LibCacheEntryHandle {
+ public:
+  LibCacheEntryHandle() {}
+  ~LibCacheEntryHandle();
+
+ private:
+  friend class LibCache;
+
+  LibCacheEntry* entry() const { return entry_; }
+  void SetEntry(LibCacheEntry* entry) {
+    if (entry_ != nullptr) LibCache::instance()->DecrementUseCount(entry);
+    entry_ = entry;
+  }
+
+  LibCacheEntry* entry_ = nullptr;
+
+  DISALLOW_COPY_AND_ASSIGN(LibCacheEntryHandle);
+};
+
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/impala/blob/4aafa5e9/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 35b5a15..0a84d09 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -294,9 +294,11 @@ static void ResolveSymbolLookup(const TSymbolLookupParams params,
   if (params.fn_binary_type != TFunctionBinaryType::BUILTIN) {
     // Refresh the library if necessary since we're creating a new function
     LibCache::instance()->SetNeedsRefresh(params.location);
+    LibCacheEntryHandle handle;
     string dummy_local_path;
-    Status status = LibCache::instance()->GetLocalLibPath(
-        params.location, type, &dummy_local_path);
+    Status status = LibCache::instance()->GetLocalPath(
+        params.location, type, &handle, &dummy_local_path);
+
     if (!status.ok()) {
       result->__set_result_code(TSymbolLookupResultCode::BINARY_NOT_FOUND);
       result->__set_error_msg(status.GetDetail());
@@ -387,9 +389,10 @@ Java_org_apache_impala_service_FeSupport_NativeCacheJar(
       JniUtil::internal_exc_class(), nullptr);
 
   TCacheJarResult result;
+  LibCacheEntryHandle handle;
   string local_path;
-  Status status = LibCache::instance()->GetLocalLibPath(params.hdfs_location,
-      LibCache::TYPE_JAR, &local_path);
+  Status status = LibCache::instance()->GetLocalPath(
+      params.hdfs_location, LibCache::TYPE_JAR, &handle, &local_path);
   status.ToThrift(&result.status);
   if (status.ok()) result.__set_local_path(local_path);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/4aafa5e9/tests/query_test/test_udfs.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 1ff716a..61dd54c 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -18,6 +18,9 @@
 from copy import copy
 import os
 import pytest
+import random
+import threading
+import time
 from subprocess import check_call
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
@@ -316,8 +319,6 @@ class TestUdfExecution(TestUdfBase):
       self.run_test_case('QueryTest/udf-non-deterministic', vector,
           use_db=unique_database)
 
-  # Runs serially as a temporary workaround for IMPALA_6092.
-  @pytest.mark.execute_serially
   def test_java_udfs(self, vector, unique_database):
     self.run_test_case('QueryTest/load-java-udfs', vector, use_db=unique_database)
     self.run_test_case('QueryTest/java-udf', vector, use_db=unique_database)
@@ -418,9 +419,6 @@ class TestUdfTargeted(TestUdfBase):
             unique_database, tgt_udf_path))
     query = "select `{0}`.fn_invalid_symbol('test')".format(unique_database)
 
-    # Dropping the function can interact with other tests whose Java classes are in
-    # the same jar. Use a copy of the jar to avoid unintended interactions.
-    # See IMPALA-6215 and IMPALA-6092 for examples.
     check_call(["hadoop", "fs", "-put", "-f", src_udf_path, tgt_udf_path])
     self.client.execute(drop_fn_stmt)
     self.client.execute(create_fn_stmt)
@@ -429,6 +427,86 @@ class TestUdfTargeted(TestUdfBase):
       assert "Unable to find class" in str(ex)
     self.client.execute(drop_fn_stmt)
 
+  def test_concurrent_jar_drop_use(self, vector, unique_database):
+    """IMPALA-6215: race between dropping/using java udf's defined in the same jar.
+       This test runs concurrent drop/use threads that result in class not found
+       exceptions when the race is present.
+    """
+    udf_src_path = os.path.join(
+      os.environ['IMPALA_HOME'], "testdata/udfs/impala-hive-udfs.jar")
+    udf_tgt_path = get_fs_path(
+      '/test-warehouse/impala-hive-udfs-{0}.jar'.format(unique_database))
+
+    create_fn_to_drop = """create function {0}.foo_{1}() returns string
+                           LOCATION '{2}' SYMBOL='org.apache.impala.TestUpdateUdf'"""
+    create_fn_to_use = """create function {0}.use_it(string) returns string
+                          LOCATION '{1}' SYMBOL='org.apache.impala.TestUdf'"""
+    drop_fn = "drop function if exists {0}.foo_{1}()"
+    use_fn = """select * from (select max(int_col) from functional.alltypesagg
+                where {0}.use_it(string_col) = 'blah' union all
+                (select max(int_col) from functional.alltypesagg
+                 where {0}.use_it(String_col) > '1' union all
+                (select max(int_col) from functional.alltypesagg
+                 where {0}.use_it(string_col) > '1'))) v"""
+    num_drops = 100
+    num_uses = 100
+
+    # use a unique jar for this test to avoid interactions with other tests
+    # that use the same jar
+    check_call(["hadoop", "fs", "-put", "-f", udf_src_path, udf_tgt_path])
+
+    # create all the functions.
+    setup_client = self.create_impala_client()
+    try:
+      s = create_fn_to_use.format(unique_database, udf_tgt_path)
+      print "use create: " + s
+      setup_client.execute(s)
+    except Exception as e:
+      print e
+      assert False
+    for i in range(0, num_drops):
+      try:
+        setup_client.execute(create_fn_to_drop.format(unique_database, i, udf_tgt_path))
+      except Exception as e:
+        print e
+        assert False
+
+    errors = []
+    def use_fn_method():
+      time.sleep(5 + random.random())
+      client = self.create_impala_client()
+      try:
+        client.execute(use_fn.format(unique_database))
+      except Exception as e: errors.append(e)
+
+    def drop_fn_method(i):
+      time.sleep(1 + random.random())
+      client = self.create_impala_client()
+      try:
+        client.execute(drop_fn.format(unique_database, i))
+      except Exception as e: errors.append(e)
+
+    # create threads to use functions.
+    runner_threads = []
+    for i in range(0, num_uses):
+      runner_threads.append(threading.Thread(target=use_fn_method))
+
+    # create threads to drop functions.
+    drop_threads = []
+    for i in range(0, num_drops):
+      runner_threads.append(threading.Thread(target=drop_fn_method, args=(i, )))
+
+    # launch all runner threads.
+    for t in runner_threads: t.start()
+
+    # join all threads.
+    for t in runner_threads: t.join();
+
+    # Check for any errors.
+    for e in errors: print e
+    assert len(errors) == 0
+
+
   @SkipIfLocal.multiple_impalad
   def test_hive_udfs_missing_jar(self, vector, unique_database):
     """ IMPALA-2365: Impalad shouldn't crash if the udf jar isn't present


[2/3] impala git commit: IMPALA-6447: remove Python 2.7 dictionary comprehensions

Posted by mi...@apache.org.
IMPALA-6447: remove Python 2.7 dictionary comprehensions

In the fix for IMPALA-6441, we began importing from the stress test
(concurrent_select.py). The import fails on some downstream environments
that use Python 2.6. The failure is due to the fact that
concurrent_select.py uses a few dictionary comprehensions, a language
feature introduced in Python 2.7.

This problem wasn't caught upstream, because upstream is using
Python2.7.

The fix is to remove the dictionary comprehensions and create the
dictionaries in a more backward-compatible way. The problematic import
succeeds on Python 2.6.

Change-Id: I3174e1bd1b6ac007b345d42474401af50d006a52
Reviewed-on: http://gerrit.cloudera.org:8080/9150
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Reviewed-by: David Knupp <dk...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/82cf99d4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/82cf99d4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/82cf99d4

Branch: refs/heads/master
Commit: 82cf99d4383f3e568d59999db978d7bd4b03f149
Parents: 12938fd
Author: Michael Brown <mi...@cloudera.com>
Authored: Mon Jan 29 11:37:51 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Jan 29 23:32:17 2018 +0000

----------------------------------------------------------------------
 tests/comparison/query_profile.py | 3 ++-
 tests/stress/concurrent_select.py | 4 ++--
 2 files changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/82cf99d4/tests/comparison/query_profile.py
----------------------------------------------------------------------
diff --git a/tests/comparison/query_profile.py b/tests/comparison/query_profile.py
index 81a98a1..5530104 100644
--- a/tests/comparison/query_profile.py
+++ b/tests/comparison/query_profile.py
@@ -514,7 +514,8 @@ class DefaultProfile(object):
     func_weights = _func_weights
     if func_weights:
       distinct_funcs_in_signatures = set([s.func for s in signatures])
-      pruned_func_weights = {f: func_weights[f] for f in distinct_funcs_in_signatures}
+      pruned_func_weights = dict(
+          (f, func_weights[f]) for f in distinct_funcs_in_signatures)
       func_weights = pruned_func_weights
     else:
       # First a function will be chosen then a signature. This is done so that the number

http://git-wip-us.apache.org/repos/asf/impala/blob/82cf99d4/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index 69a4434..86e8978 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -1641,7 +1641,7 @@ def prepare_database(cursor):
   Note: At this time we only support Kudu tables with a simple hash partitioning based on
   the primary key. (SHOW CREATE TABLE would not work otherwise.)
   """
-  tables = {t: cursor.describe_table(t) for t in cursor.list_table_names()}
+  tables = dict((t, cursor.describe_table(t)) for t in cursor.list_table_names())
   for table_name in tables:
     if not table_name.endswith("_original") and table_name + "_original" not in tables:
       LOG.debug("Creating original table: {0}".format(table_name))
@@ -1665,7 +1665,7 @@ def reset_databases(cursor):
   the primary key. (SHOW CREATE TABLE would not work otherwise.)
   """
   LOG.info("Resetting {0} database".format(cursor.db_name))
-  tables = {t: cursor.describe_table(t) for t in cursor.list_table_names()}
+  tables = dict((t, cursor.describe_table(t)) for t in cursor.list_table_names())
   for table_name in tables:
     if not table_name.endswith("_original"):
       if table_name + "_original" in tables: