You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2021/11/24 06:37:38 UTC

[impala] 02/03: IMPALA-11028: Table loading can fail when events are cleaned up

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit cc6f6d5c91ba1db3fca83c65f7d2f87c98077025
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Thu Nov 18 17:20:32 2021 -0800

    IMPALA-11028: Table loading can fail when events are cleaned up
    
    IMPALA-10502 introduces a createEventId field of a table which
    is updated when Impala creates a table. This is used by
    the events processor to determine if the subsequent CREATE_TABLE
    event which is received should be skipped or not.
    
    When the table is loaded for the first time, in order to avoid
    race conditions, TableLoader updates the createEventId to the
    last CREATE_TABLE event id from the metastore. In order to
    fetch the latest CREATE_TABLE event id, it fetches all the
    events from metastore since the last known createEventId of the
    table. However, if there is a significant delay between
    (more than 24hrs) between the time a table is created
    or invalidated, and the table is queried, it is possible that
    the metastore cleanup thread deletes the events which are generated
    since the table's createEventId. In such a case, the HMS Client method
    getNextNotification() throws an IllegalStateException due to the missing
    events. This exception causes the Table load to fail and query to error
    out.
    
    The fix is to not rely on the HMS Client method which throws the
    IllegalStateException. Instead we use the backing thrift API directly.
    
    Testing:
    1. Introduced a custom cluster test which can reproduce this issue.
    2. Test works after the patch.
    3. Core tests.
    
    Change-Id: I95e5e20e1a2086688a92abdfb28e89177e996a1a
    Reviewed-on: http://gerrit.cloudera.org:8080/18038
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 bin/create-test-configuration.sh                   |  6 +++
 .../org/apache/impala/catalog/TableLoader.java     |  7 +--
 .../catalog/events/MetastoreEventsProcessor.java   | 62 +++++++++++++++-------
 .../catalog/metastore/MetastoreServiceHandler.java |  7 +--
 .../apache/impala/service/CatalogOpExecutor.java   |  9 ++--
 .../events/MetastoreEventsProcessorTest.java       |  9 ++--
 fe/src/test/resources/hive-site.xml.py             |  6 +++
 .../test_metastore_events_cleanup.py               | 44 +++++++++++++++
 8 files changed, 111 insertions(+), 39 deletions(-)

diff --git a/bin/create-test-configuration.sh b/bin/create-test-configuration.sh
index 642d310..90fa257 100755
--- a/bin/create-test-configuration.sh
+++ b/bin/create-test-configuration.sh
@@ -145,6 +145,12 @@ mkdir -p hive-site-without-hms
 rm -f hive-site-without-hms/hive-site.xml
 ln -s "${CONFIG_DIR}/hive-site_without_hms.xml" hive-site-without-hms/hive-site.xml
 
+export HIVE_VARIANT=events_cleanup
+$IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py hive-site_events_cleanup.xml
+mkdir -p hive-site-events-cleanup
+rm -f hive-site-events-cleanup/hive-site.xml
+ln -s "${CONFIG_DIR}/hive-site_events_cleanup.xml" hive-site-events-cleanup/hive-site.xml
+
 export HIVE_VARIANT=ranger_auth
 HIVE_RANGER_CONF_DIR=hive-site-ranger-auth
 $IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py hive-site_ranger_auth.xml
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
index 14442df..0b263ed 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
@@ -17,11 +17,9 @@
 
 package org.apache.impala.catalog;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@@ -34,7 +32,6 @@ import com.google.common.base.Stopwatch;
 
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.util.ThreadNameAnnotator;
-import org.apache.thrift.TException;
 
 /**
  * Class that implements the logic for how a table's metadata should be loaded from
@@ -86,8 +83,8 @@ public class TableLoader {
         // we are only interested in fetching the events if we have a valid eventId
         // for a table. For tables where eventId is unknown are not created by
         // this catalogd and hence the self-event detection logic does not apply.
-        events = MetastoreEventsProcessor.getNextMetastoreEvents(catalog_, eventId,
-            notificationEvent -> CreateTableEvent.CREATE_TABLE_EVENT_TYPE
+        events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches(catalog_,
+            eventId, notificationEvent -> CreateTableEvent.CREATE_TABLE_EVENT_TYPE
                 .equals(notificationEvent.getEventType())
                 && notificationEvent.getDbName().equalsIgnoreCase(db.getName())
                 && notificationEvent.getTableName().equalsIgnoreCase(tblName));
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index c602053..b089fd9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
 import org.apache.impala.catalog.CatalogException;
@@ -43,12 +44,9 @@ import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.events.ConfigValidator.ValidationResult;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
-import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Metrics;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.CatalogOpExecutor;
-import org.apache.impala.thrift.TCreateTableParams;
-import org.apache.impala.thrift.TDdlExecResponse;
 import org.apache.impala.thrift.TEventProcessorMetrics;
 import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
 import org.apache.impala.util.MetaStoreUtil;
@@ -248,13 +246,15 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
   public static final String NUMBER_OF_BATCH_EVENTS = "batch-events-created";
 
   /**
-   * Wrapper around {@link MetastoreEventsProcessor#getNextMetastoreEvents} which passes
-   * the default batch size.
+   * Wrapper around {@link
+   * MetastoreEventsProcessor#getNextMetastoreEventsInBatches(CatalogServiceCatalog,
+   * long, NotificationFilter, int)} which passes the default batch size.
    */
-  public static List<NotificationEvent> getNextMetastoreEvents(
+  public static List<NotificationEvent> getNextMetastoreEventsInBatches(
       CatalogServiceCatalog catalog, long eventId, NotificationFilter filter)
-      throws ImpalaRuntimeException {
-    return getNextMetastoreEvents(catalog, eventId, filter, EVENTS_BATCH_SIZE_PER_RPC);
+      throws MetastoreNotificationFetchException {
+    return getNextMetastoreEventsInBatches(catalog, eventId, filter,
+        EVENTS_BATCH_SIZE_PER_RPC);
   }
 
   /**
@@ -269,12 +269,12 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
    * @param eventsBatchSize the batch size for fetching the events from metastore.
    * @return List of {@link NotificationEvent} which are all greater than eventId and
    * satisfy the given filter.
-   * @throws ImpalaRuntimeException in case of RPC errors to metastore.
+   * @throws MetastoreNotificationFetchException in case of RPC errors to metastore.
    */
   @VisibleForTesting
-  public static List<NotificationEvent> getNextMetastoreEvents(
+  public static List<NotificationEvent> getNextMetastoreEventsInBatches(
       CatalogServiceCatalog catalog, long eventId, NotificationFilter filter,
-      int eventsBatchSize) throws ImpalaRuntimeException {
+      int eventsBatchSize) throws MetastoreNotificationFetchException {
     Preconditions.checkArgument(eventsBatchSize > 0);
     List<NotificationEvent> result = new ArrayList<>();
     try (MetaStoreClient msc = catalog.getMetaStoreClient()) {
@@ -285,8 +285,16 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
       while (currentEventId < toEventId) {
         int batchSize = Math
             .min(eventsBatchSize, (int)(toEventId - currentEventId));
-        for (NotificationEvent event : msc.getHiveClient()
-            .getNextNotification(currentEventId, batchSize, null).getEvents()) {
+        // we don't call the HiveMetaStoreClient's getNextNotification()
+        // call here because it can throw a IllegalStateException if the eventId
+        // which we pass in is very old and metastore has already cleaned up
+        // the events since that eventId.
+        NotificationEventRequest eventRequest = new NotificationEventRequest();
+        eventRequest.setMaxEvents(batchSize);
+        eventRequest.setLastEvent(currentEventId);
+        NotificationEventResponse notificationEventResponse = msc.getHiveClient()
+            .getThriftClient().get_next_notification(eventRequest);
+        for (NotificationEvent event : notificationEventResponse.getEvents()) {
           // if no filter is provided we add all the events
           if (filter == null || filter.accept(event)) result.add(event);
           currentEventId = event.getEventId();
@@ -294,7 +302,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
       }
       return result;
     } catch (TException e) {
-      throw new ImpalaRuntimeException(String.format(
+      throw new MetastoreNotificationFetchException(String.format(
           CatalogOpExecutor.HMS_RPC_ERROR_FORMAT_STR, "getNextNotification"), e);
     }
   }
@@ -575,7 +583,10 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
    * NotificationEvents are filtered using the NotificationFilter provided if it is not
    * null.
    * @param eventId The returned events are all after this given event id.
-   * @param skipBatching If this is true all the events since eventId are returned.
+   * @param getAllEvents If this is true all the events since eventId are returned.
+   *                     Note that Hive MetaStore can limit the response to a specific
+   *                     maximum number of limit based on the value of configuration
+   *                     {@code hive.metastore.max.event.response}.
    *                     If it is false, only EVENTS_BATCH_SIZE_PER_RPC events are
    *                     returned, caller is expected to issue more calls to this method
    *                     to fetch the remaining events.
@@ -585,7 +596,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
    * @throws MetastoreNotificationFetchException In case of exceptions from HMS.
    */
   public List<NotificationEvent> getNextMetastoreEvents(final long eventId,
-      final boolean skipBatching, @Nullable final NotificationFilter filter)
+      final boolean getAllEvents, @Nullable final NotificationFilter filter)
       throws MetastoreNotificationFetchException {
     final Timer.Context context = metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).time();
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
@@ -601,12 +612,23 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
       if (currentEventId <= eventId) {
         return Collections.emptyList();
       }
-      int batchSize = skipBatching ? -1 : EVENTS_BATCH_SIZE_PER_RPC;
-      NotificationEventResponse response = msClient.getHiveClient()
-          .getNextNotification(eventId, batchSize, filter);
+      int batchSize = getAllEvents ? -1 : EVENTS_BATCH_SIZE_PER_RPC;
+      // we use the thrift API directly instead of
+      // HiveMetastoreClient#getNextNotification because the HMS client can throw an
+      // exception when there is a gap between the eventIds returned.
+      NotificationEventRequest eventRequest = new NotificationEventRequest();
+      eventRequest.setLastEvent(eventId);
+      eventRequest.setMaxEvents(batchSize);
+      NotificationEventResponse response = msClient.getHiveClient().getThriftClient()
+          .get_next_notification(eventRequest);
       LOG.info(String.format("Received %d events. Start event id : %d",
           response.getEvents().size(), eventId));
-      return response.getEvents();
+      if (filter == null) return response.getEvents();
+      List<NotificationEvent> filteredEvents = new ArrayList<>();
+      for (NotificationEvent event : response.getEvents()) {
+        if (filter.accept(event)) filteredEvents.add(event);
+      }
+      return filteredEvents;
     } catch (TException e) {
       throw new MetastoreNotificationFetchException(
           "Unable to fetch notifications from metastore. Last synced event id is "
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
index b5f00a8..f800362 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.AbstractThriftHiveMetastore;
 import org.apache.hadoop.hive.metastore.DefaultPartitionExpressionProxy;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
 import org.apache.hadoop.hive.metastore.PartFilterExprUtil;
 import org.apache.hadoop.hive.metastore.PartitionExpressionProxy;
 import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
@@ -269,13 +268,11 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.impala.catalog.CatalogHmsAPIHelper;
 import org.apache.impala.catalog.DatabaseNotFoundException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
-import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.IncompleteTable;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.events.MetastoreEvents;
 import org.apache.impala.catalog.events.MetastoreEvents.DropTableEvent;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor;
-import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Reference;
 import org.apache.impala.common.Pair;
 import org.apache.impala.compat.MetastoreShim;
@@ -3024,7 +3021,7 @@ public abstract class MetastoreServiceHandler extends AbstractThriftHiveMetastor
     }
     try {
       List<NotificationEvent> events = MetastoreEventsProcessor
-          .getNextMetastoreEvents(catalog_, beforeDropEventId,
+          .getNextMetastoreEventsInBatches(catalog_, beforeDropEventId,
               event -> event.getEventType()
                   .equalsIgnoreCase(DropTableEvent.DROP_TABLE_EVENT_TYPE)
                   && catName.equalsIgnoreCase(event.getCatName())
@@ -3119,7 +3116,7 @@ public abstract class MetastoreServiceHandler extends AbstractThriftHiveMetastor
 
     try {
       List<NotificationEvent> events = MetastoreEventsProcessor
-          .getNextMetastoreEvents(catalog_, beforeDropEventId,
+          .getNextMetastoreEventsInBatches(catalog_, beforeDropEventId,
               event -> event.getEventType()
                   .equalsIgnoreCase(MetastoreEvents.DropDatabaseEvent
                       .DROP_DATABASE_EVENT_TYPE)
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index f7ca3e0..860b5d7 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -233,7 +233,6 @@ import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.KuduUtil;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.MetaStoreUtil.TableInsertEventInfo;
-import org.apache.kudu.client.Delete;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1853,15 +1852,15 @@ public class CatalogOpExecutor {
 
   /**
    * Wrapper around
-   * {@code MetastoreEventsProcessor#getNextMetastoreEvents} with the
+   * {@code MetastoreEventsProcessor#getNextMetastoreEventsInBatches} with the
    * addition that it checks if events processing is active or not. If not active,
    * returns an empty list.
    */
   private List<NotificationEvent> getNextMetastoreEventsIfEnabled(long eventId,
-      NotificationFilter eventsFilter) throws ImpalaRuntimeException {
+      NotificationFilter eventsFilter) throws MetastoreNotificationException {
     if (!catalog_.isEventProcessingActive()) return Collections.emptyList();
     return MetastoreEventsProcessor
-        .getNextMetastoreEvents(catalog_, eventId, eventsFilter);
+        .getNextMetastoreEventsInBatches(catalog_, eventId, eventsFilter);
   }
 
   /**
@@ -4434,7 +4433,7 @@ public class CatalogOpExecutor {
                     partitionToEventSubMap.get(part));
           }
         }
-      } catch (TException e) {
+      } catch (MetastoreNotificationException | TException e) {
         throw new ImpalaRuntimeException(
             String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partitions"), e);
       }
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index ad064d4..baad1a0 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -308,16 +308,17 @@ public class MetastoreEventsProcessorTest {
     createDatabaseFromImpala(TEST_DB_NAME, null);
     createTableFromImpala(TEST_DB_NAME, "testNextMetastoreEvents1", false);
     createTable("testNextMetastoreEvents2", false);
-    List<NotificationEvent> events = MetastoreEventsProcessor.getNextMetastoreEvents(
+    List<NotificationEvent> events =
+        MetastoreEventsProcessor.getNextMetastoreEventsInBatches(
         eventsProcessor_.catalog_, currentEventId, null, 2);
     assertEquals(3, events.size());
-    events = MetastoreEventsProcessor.getNextMetastoreEvents(
+    events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches(
         eventsProcessor_.catalog_, currentEventId+1, null, 10);
     assertEquals(2, events.size());
-    events = MetastoreEventsProcessor.getNextMetastoreEvents(
+    events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches(
         eventsProcessor_.catalog_, currentEventId, null, 3);
     assertEquals(3, events.size());
-    events = MetastoreEventsProcessor.getNextMetastoreEvents(
+    events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches(
         eventsProcessor_.catalog_, currentEventId+3, null, 3);
     assertEquals(0, events.size());
   }
diff --git a/fe/src/test/resources/hive-site.xml.py b/fe/src/test/resources/hive-site.xml.py
index d99bdf7..b67a162 100644
--- a/fe/src/test/resources/hive-site.xml.py
+++ b/fe/src/test/resources/hive-site.xml.py
@@ -78,6 +78,12 @@ elif variant == 'ranger_auth':
     'hive.metastore.pre.event.listeners':
         'org.apache.hadoop.hive.ql.security.authorization.plugin.metastore.HiveMetaStoreAuthorizer',
   })
+elif variant == 'events_cleanup':
+  # HMS configs needed for regression test for IMPALA-11028
+  CONFIG.update({
+    'hive.metastore.event.db.listener.timetolive': '60s',
+    'hive.metastore.event.db.listener.clean.interval': '10s'
+  })
 
 # HBase-related configs.
 # Impala processes need to connect to zookeeper on INTERNAL_LISTEN_HOST for HBase.
diff --git a/tests/custom_cluster/test_metastore_events_cleanup.py b/tests/custom_cluster/test_metastore_events_cleanup.py
new file mode 100644
index 0000000..7966268
--- /dev/null
+++ b/tests/custom_cluster/test_metastore_events_cleanup.py
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import os
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+IMPALA_HOME = os.getenv('IMPALA_HOME')
+HIVE_SITE_EVENTS_CLEANUP = IMPALA_HOME + '/fe/src/test/resources/hive-site-events-cleanup'
+
+
+class TestTableLoadingWithEventsCleanUp(CustomClusterTestSuite):
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(hive_conf_dir=HIVE_SITE_EVENTS_CLEANUP)
+  def test_table_load_with_events_cleanup(self, unique_database):
+    """Regression test for IMPALA-11028"""
+    self.execute_query_expect_success(self.client, "create table {}.{}"
+                                                   "(id int)".format(unique_database,
+      "t1"))
+    self.execute_query_expect_success(self.client, "create table {}.{}"
+                                                   "(id int)".format(unique_database,
+      "t2"))
+    self.execute_query_expect_success(self.client, "select sleep(120000)")
+    self.execute_query_expect_success(self.client, "create table {}.{}"
+                                                   "(id int)".format(unique_database,
+      "t3"))
+    self.execute_query_expect_success(self.client, "select * from "
+                                                   "{}.{}".format(unique_database, "t1"))