You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2018/10/17 17:03:04 UTC

[09/11] impala git commit: IMPALA-7669: Gracefully handle concurrent invalidate/partial fetch RPCs

IMPALA-7669: Gracefully handle concurrent invalidate/partial fetch RPCs

The bug here was that any partial RPC on an IncompleteTable was throwing
an NPE.

Ideally, we attempt to load the table (if we find that it is not loaded)
before making the partial info request, but a concurrent invalidate could
reset the table state and move it back to an uninitialized state.

This patch handles this case better by propagating a meaningful error to
the caller.

Testing:
-------
- Added a test that fails consistently with an NPE without this patch.

Change-Id: I8533f73f25ca42a20f146ddfd95d4213add9b705
Reviewed-on: http://gerrit.cloudera.org:8080/11638
Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2b2cf8d9
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2b2cf8d9
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2b2cf8d9

Branch: refs/heads/master
Commit: 2b2cf8d96617320d184d070f9319c2463aa0d84f
Parents: 28aecd6
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Tue Oct 9 16:40:59 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Oct 17 04:03:53 2018 +0000

----------------------------------------------------------------------
 common/thrift/CatalogService.thrift             |  1 +
 .../impala/catalog/CatalogServiceCatalog.java   |  4 ++
 .../apache/impala/catalog/IncompleteTable.java  |  2 +
 .../catalog/local/CatalogdMetaProvider.java     |  1 +
 .../impala/catalog/PartialCatalogInfoTest.java  |  3 --
 tests/custom_cluster/test_local_catalog.py      | 52 ++++++++++++++++++++
 6 files changed, 60 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2b2cf8d9/common/thrift/CatalogService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 6b94697..c0792b3 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -378,6 +378,7 @@ enum CatalogLookupStatus {
   OK,
   DB_NOT_FOUND,
   TABLE_NOT_FOUND,
+  TABLE_NOT_LOADED,
   FUNCTION_NOT_FOUND
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/2b2cf8d9/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index b3c714f..9533507 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -2195,6 +2195,10 @@ public class CatalogServiceCatalog extends Catalog {
       }
       if (table == null) {
         return createGetPartialCatalogObjectError(CatalogLookupStatus.TABLE_NOT_FOUND);
+      } else if (!table.isLoaded()) {
+        // Table can still remain in an incomplete state if there was a concurrent
+        // invalidate request.
+        return createGetPartialCatalogObjectError(CatalogLookupStatus.TABLE_NOT_LOADED);
       }
       // TODO(todd): consider a read-write lock here.
       table.getLock().lock();

http://git-wip-us.apache.org/repos/asf/impala/blob/2b2cf8d9/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
index ba3e5cf..7de3d89 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
@@ -20,6 +20,7 @@ package org.apache.impala.catalog;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 
 import org.apache.impala.common.ImpalaException;
@@ -136,6 +137,7 @@ public class IncompleteTable extends Table {
   @Override
   public TGetPartialCatalogObjectResponse getPartialInfo(
       TGetPartialCatalogObjectRequest req) throws TableLoadingException {
+    Preconditions.checkNotNull(cause_);
     Throwables.propagateIfPossible(cause_, TableLoadingException.class);
     throw new TableLoadingException(cause_.getMessage());
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/2b2cf8d9/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 62f1d3e..e099b53 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -344,6 +344,7 @@ public class CatalogdMetaProvider implements MetaProvider {
       case DB_NOT_FOUND:
       case FUNCTION_NOT_FOUND:
       case TABLE_NOT_FOUND:
+      case TABLE_NOT_LOADED:
         invalidateCacheForObject(req.object_desc);
         throw new InconsistentMetadataFetchException(
             String.format("Fetching %s failed. Could not find %s",

http://git-wip-us.apache.org/repos/asf/impala/blob/2b2cf8d9/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
index 07d3309..63e0fda 100644
--- a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
@@ -34,7 +34,6 @@ import avro.shaded.com.google.common.collect.Lists;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.impala.common.InternalException;
-import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.testutil.CatalogServiceTestCatalog;
 import org.apache.impala.thrift.TCatalogInfoSelector;
@@ -50,8 +49,6 @@ import org.apache.impala.thrift.TTableInfoSelector;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;

http://git-wip-us.apache.org/repos/asf/impala/blob/2b2cf8d9/tests/custom_cluster/test_local_catalog.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_local_catalog.py b/tests/custom_cluster/test_local_catalog.py
index 14a9a54..916443b 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -18,6 +18,7 @@
 # Test behaviors specific to --use_local_catalog being enabled.
 
 import pytest
+import Queue
 import random
 import threading
 import time
@@ -230,6 +231,57 @@ class TestCompactCatalogUpdates(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
+      impalad_args="--use_local_catalog=true",
+      catalogd_args="--catalog_topic_mode=minimal")
+  def test_concurrent_invalidate_with_queries(self, unique_database):
+    """
+    Tests that the queries are replanned when they clash with concurrent invalidates.
+    """
+    # TODO: Merge this with the above test after fixing IMPALA-7717
+    try:
+      impalad1 = self.cluster.impalads[0]
+      impalad2 = self.cluster.impalads[1]
+      client1 = impalad1.service.create_beeswax_client()
+      client2 = impalad2.service.create_beeswax_client()
+
+      # Track the number of replans.
+      replans_seen = [0]
+      replans_seen_lock = threading.Lock()
+
+      # Queue to propagate exceptions from failed queries, if any.
+      failed_queries = Queue.Queue()
+
+      def stress_thread(client):
+        while replans_seen[0] == 0:
+          q = random.choice([
+              'invalidate metadata functional.alltypesnopart',
+              'select count(*) from functional.alltypesnopart',
+              'select count(*) from functional.alltypesnopart'])
+          try:
+            ret = self.execute_query_expect_success(client, q)
+          except Exception as e:
+            failed_queries.put((q, str(e)))
+          if RETRY_PROFILE_MSG in ret.runtime_profile:
+            with replans_seen_lock:
+              replans_seen[0] += 1
+
+      threads = [threading.Thread(target=stress_thread, args=(c,))
+                 for c in [client1, client2]]
+      for t in threads:
+        t.start()
+      for t in threads:
+        t.join(30)
+      assert failed_queries.empty(),\
+          "Failed query count non zero: %s" % list(failed_queries.queue)
+      assert replans_seen[0] > 0, "Did not trigger any re-plans"
+
+    finally:
+      client1.close()
+      client2.close()
+
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
       impalad_args="--use_local_catalog=true --local_catalog_max_fetch_retries=0",
       catalogd_args="--catalog_topic_mode=minimal")
   def test_replan_limit(self, unique_database):