You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/07/16 15:26:40 UTC

[impala] 02/06: IMPALA-8486: fix stale libCache entries in LocalCatalog mode coordinators

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1cd85d1f8a0d772a4cab263cce4f41728f6ebac7
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Fri Jul 12 00:20:36 2019 +0000

    IMPALA-8486: fix stale libCache entries in LocalCatalog mode coordinators
    
    In LocalCatalog mode, after a function is dropped, statestored will
    broadcast the update to invalidate the cached CatalogObject in each
    coordinator (if they have). However, the current code path does not
    trigger libCache to remove the cached JAR/SO file. If we replace the
    function file in HDFS with a new one and create the function again
    using the same HDFS path, the SELECT statements in other coordinators
    won't trigger libCache to refresh the local cached file, so they still
    use the old cached file which causes errors.
    
    When a coordinator invalidates its cached CatalogObject of a function,
    it should also mark the corresponding libCache entry as "needs refresh".
    So the later usage of this function will check the last modified time of
    the HDFS file and refresh it in needs. To achieve this, we have to
    propagate the HDFS path of the function along with the full function
    name in the minimal topic, so libCache can target the cached entry.
    
    Note that this does not prevent the dedicated executors to have stale
    libCache entries. It needs some architecture changes. We'll follow it
    in IMPALA-8763.
    
    Tests
     - Re-enable test_udf_update_via_drop and test_udf_update_via_create for
    LocalCatalog mode.
    
    Change-Id: Ie4812fb8737de3ba6074ffeb9007927bfbbbaf9b
    Reviewed-on: http://gerrit.cloudera.org:8080/13849
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../java/org/apache/impala/catalog/CatalogServiceCatalog.java    | 6 +++++-
 .../org/apache/impala/catalog/local/CatalogdMetaProvider.java    | 9 +++++++--
 tests/common/skip.py                                             | 7 -------
 tests/query_test/test_udfs.py                                    | 4 +---
 4 files changed, 13 insertions(+), 13 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 0c8ea59..b881b60 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -642,7 +642,11 @@ public class CatalogServiceCatalog extends Catalog {
         // the full information rather than doing fetch-on-demand.
         return obj;
       case FUNCTION:
-        min.setFn(new TFunction(obj.fn.getName()));
+        TFunction fnObject = new TFunction(obj.fn.getName());
+        // IMPALA-8486: add the hdfs location so coordinators can mark their libCache
+        // entry for this function to be stale.
+        if (obj.fn.hdfs_location != null) fnObject.setHdfs_location(obj.fn.hdfs_location);
+        min.setFn(fnObject);
         break;
       case DATA_SOURCE:
       case HDFS_CACHE_POOL:
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 9a36bb4..81432de 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -260,8 +260,6 @@ public class CatalogdMetaProvider implements MetaProvider {
    *
    * For details of the usage of Futures within the cache, see
    * {@link #loadWithCaching(String, String, Object, Callable).
-   *
-
    */
   final Cache<Object,Object> cache_;
 
@@ -1213,6 +1211,13 @@ public class CatalogdMetaProvider implements MetaProvider {
           invalidated);
       invalidateCacheForFunction(obj.fn.name.db_name, obj.fn.name.function_name,
           invalidated);
+      if (obj.fn.hdfs_location != null) {
+        // After the coordinator creates a function, it will also receive an invalidation
+        // update for this function from the statestored's broadcast. We shouldn't remove
+        // the libcache entry for this case, just mark it as needs refresh. LibCache will
+        // refresh the cached file if its mtime changes in HDFS.
+        FeSupport.NativeLibCacheSetNeedsRefresh(obj.fn.hdfs_location);
+      }
       break;
     case DATABASE:
       if (cache_.asMap().remove(DB_LIST_CACHE_KEY) != null) {
diff --git a/tests/common/skip.py b/tests/common/skip.py
index c4d03ca..151f2c8 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -232,13 +232,6 @@ class SkipIfCatalogV2:
       IMPALA_TEST_CLUSTER_PROPERTIES.is_catalog_v2_cluster(),
       reason="Test is specific to old implementation of catalog.")
 
-  # TODO: IMPALA-8486: fix invalidation or update tests to reflect expected behaviour.
-  @classmethod
-  def lib_cache_invalidation_broken(self):
-    return pytest.mark.skipif(
-      IMPALA_TEST_CLUSTER_PROPERTIES.is_catalog_v2_cluster(),
-      reason="IMPALA-8486: LibCache isn't invalidated by function DDL.")
-
   # TODO: IMPALA-7131: add support or update tests to reflect expected behaviour.
   @classmethod
   def data_sources_unsupported(self):
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 5823ad3..3839044 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -24,7 +24,7 @@ from subprocess import call, check_call
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIfLocal, SkipIfCatalogV2
+from tests.common.skip import SkipIfLocal
 from tests.common.test_dimensions import (
     create_exec_option_dimension,
     create_exec_option_dimension_from_dict,
@@ -507,7 +507,6 @@ class TestUdfTargeted(TestUdfBase):
   def test_libs_with_same_filenames(self, vector, unique_database):
     self.run_test_case('QueryTest/libs_with_same_filenames', vector, use_db=unique_database)
 
-  @SkipIfCatalogV2.lib_cache_invalidation_broken()
   def test_udf_update_via_drop(self, vector, unique_database):
     """Test updating the UDF binary without restarting Impala. Dropping
     the function should remove the binary from the local cache."""
@@ -541,7 +540,6 @@ class TestUdfTargeted(TestUdfBase):
     self.execute_query_expect_success(self.client, create_fn_stmt, exec_options)
     self._run_query_all_impalads(exec_options, query_stmt, ["New UDF"])
 
-  @SkipIfCatalogV2.lib_cache_invalidation_broken()
   def test_udf_update_via_create(self, vector, unique_database):
     """Test updating the UDF binary without restarting Impala. Creating a new function
     from the library should refresh the cache."""