You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2018/10/19 17:12:18 UTC

impala git commit: IMPALA-7717: Handle concurrent partition changes in local catalog mode

Repository: impala
Updated Branches:
  refs/heads/master 5cc49c343 -> 8c93a4568


IMPALA-7717: Handle concurrent partition changes in local catalog mode

Current code throws a RuntimeException (RTE) when partial fetch RPCs
looking up partition metadata and the corresponding partition ID is
missing on the Catalog server. There are a couple of cases here.

1. The partition could be genuinely missing as it was dropped by a
   concurrent operation.
2. Partial fetch RPCs lookup partitions by IDs instead of names. This is
   problematic since the IDs can change over the lifetime of a table.

In both the cases, throwing a RTE is not the right approach and for (2)
we need to transparently retry the fetch with the new partition ID.

We eventually need to fix (2) as looking up by partition ID is not the
right approach.

Testing: Updated an e-e test which fails without the patch.

Change-Id: I2aa103ee159ce9478af9b5b27b36bc0cc286f442
Reviewed-on: http://gerrit.cloudera.org:8080/11732
Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8c93a456
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8c93a456
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8c93a456

Branch: refs/heads/master
Commit: 8c93a456891587c1add30a08fa8ab395208e0cf1
Parents: 5cc49c3
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Thu Oct 18 16:09:31 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 19 09:22:05 2018 +0000

----------------------------------------------------------------------
 common/thrift/CatalogService.thrift             |  7 ++-
 .../org/apache/impala/catalog/HdfsTable.java    |  9 +++-
 .../catalog/local/CatalogdMetaProvider.java     |  1 +
 .../impala/catalog/PartialCatalogInfoTest.java  |  9 ++--
 tests/custom_cluster/test_local_catalog.py      | 55 +++-----------------
 5 files changed, 25 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/8c93a456/common/thrift/CatalogService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index c0792b3..6237c1f 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -379,7 +379,12 @@ enum CatalogLookupStatus {
   DB_NOT_FOUND,
   TABLE_NOT_FOUND,
   TABLE_NOT_LOADED,
-  FUNCTION_NOT_FOUND
+  FUNCTION_NOT_FOUND,
+  // Partial fetch RPCs currently look up partitions by IDs instead of names. These IDs
+  // change over the lifetime of a table with queries like invalidate metadata. In such
+  // cases this lookup status is set and the caller can retry the fetch.
+  // TODO: Fix partition lookup logic to not do it with IDs.
+  PARTITION_NOT_FOUND
 }
 
 // RPC response for GetPartialCatalogObject.

http://git-wip-us.apache.org/repos/asf/impala/blob/8c93a456/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 1bca8ec..4eba255 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -63,6 +63,7 @@ import org.apache.impala.common.Reference;
 import org.apache.impala.compat.HdfsShim;
 import org.apache.impala.fb.FbFileBlock;
 import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.CatalogLookupStatus;
 import org.apache.impala.thrift.CatalogObjectsConstants;
 import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.TCatalogObjectType;
@@ -1709,8 +1710,12 @@ public class HdfsTable extends Table implements FeFsTable {
       resp.table_info.partitions = Lists.newArrayListWithCapacity(partIds.size());
       for (long partId : partIds) {
         HdfsPartition part = partitionMap_.get(partId);
-        Preconditions.checkArgument(part != null, "Partition id %s does not exist",
-            partId);
+        if (part == null) {
+          LOG.warn(String.format("Missing partition ID: %s, Table: %s", partId,
+              getFullName()));
+          return new TGetPartialCatalogObjectResponse().setLookup_status(
+              CatalogLookupStatus.PARTITION_NOT_FOUND);
+        }
         TPartialPartitionInfo partInfo = new TPartialPartitionInfo(partId);
 
         if (req.table_info_selector.want_partition_names) {

http://git-wip-us.apache.org/repos/asf/impala/blob/8c93a456/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index e099b53..370e4b2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -345,6 +345,7 @@ public class CatalogdMetaProvider implements MetaProvider {
       case FUNCTION_NOT_FOUND:
       case TABLE_NOT_FOUND:
       case TABLE_NOT_LOADED:
+      case PARTITION_NOT_FOUND:
         invalidateCacheForObject(req.object_desc);
         throw new InconsistentMetadataFetchException(
             String.format("Fetching %s failed. Could not find %s",

http://git-wip-us.apache.org/repos/asf/impala/blob/8c93a456/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
index 63e0fda..92fff65 100644
--- a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.thrift.CatalogLookupStatus;
 import org.apache.impala.thrift.TCatalogInfoSelector;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
@@ -175,12 +176,8 @@ public class PartialCatalogInfoTest {
     req.table_info_selector = new TTableInfoSelector();
     req.table_info_selector.want_partition_metadata = true;
     req.table_info_selector.partition_ids = ImmutableList.of(-12345L); // non-existent
-    try {
-      sendRequest(req);
-      fail("did not throw exception for missing partition");
-    } catch (IllegalArgumentException iae) {
-      assertEquals("Partition id -12345 does not exist", iae.getMessage());
-    }
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    assertEquals(resp.lookup_status, CatalogLookupStatus.PARTITION_NOT_FOUND);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/impala/blob/8c93a456/tests/custom_cluster/test_local_catalog.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_local_catalog.py b/tests/custom_cluster/test_local_catalog.py
index 3815f34..5445217 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -340,63 +340,24 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
       replans_seen = [0]
       replans_seen_lock = threading.Lock()
 
+      # Queue to propagate exceptions from failed queries, if any.
+      failed_queries = Queue.Queue()
+
       def stress_thread(client):
         while replans_seen[0] == 0:
           # TODO(todd) EXPLAIN queries don't currently yield a profile, so
           # we have to actually run a COUNT query.
           q = random.choice([
-              'refresh functional.alltypes',
+              'invalidate metadata functional.alltypes',
               'select count(*) from functional.alltypes where month=4',
               'select count(*) from functional.alltypes where month=5'])
-          ret = self.execute_query_expect_success(client, q)
-          if RETRY_PROFILE_MSG in ret.runtime_profile:
-            with replans_seen_lock:
-              replans_seen[0] += 1
-
-      threads = [threading.Thread(target=stress_thread, args=(c,))
-                 for c in [client1, client2]]
-      for t in threads:
-        t.start()
-      for t in threads:
-        t.join(30)
-      assert replans_seen[0] > 0, "Did not trigger any re-plans"
-
-    finally:
-      client1.close()
-      client2.close()
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-      impalad_args="--use_local_catalog=true",
-      catalogd_args="--catalog_topic_mode=minimal")
-  def test_concurrent_invalidate_with_queries(self, unique_database):
-    """
-    Tests that the queries are replanned when they clash with concurrent invalidates.
-    """
-    # TODO: Merge this with the above test after fixing IMPALA-7717
-    try:
-      impalad1 = self.cluster.impalads[0]
-      impalad2 = self.cluster.impalads[1]
-      client1 = impalad1.service.create_beeswax_client()
-      client2 = impalad2.service.create_beeswax_client()
 
-      # Track the number of replans.
-      replans_seen = [0]
-      replans_seen_lock = threading.Lock()
-
-      # Queue to propagate exceptions from failed queries, if any.
-      failed_queries = Queue.Queue()
-
-      def stress_thread(client):
-        while replans_seen[0] == 0:
-          q = random.choice([
-              'invalidate metadata functional.alltypesnopart',
-              'select count(*) from functional.alltypesnopart',
-              'select count(*) from functional.alltypesnopart'])
           try:
             ret = self.execute_query_expect_success(client, q)
           except Exception as e:
             failed_queries.put((q, str(e)))
+            continue
+
           if RETRY_PROFILE_MSG in ret.runtime_profile:
             with replans_seen_lock:
               replans_seen[0] += 1
@@ -407,8 +368,8 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
         t.start()
       for t in threads:
         t.join(30)
-      assert failed_queries.empty(),\
-          "Failed query count non zero: %s" % list(failed_queries.queue)
+      assert failed_queries.empty(), "Failed queries encountered: %s" %\
+          list(failed_queries.queue)
       assert replans_seen[0] > 0, "Did not trigger any re-plans"
 
     finally: