You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2021/07/20 16:42:59 UTC

[impala] 03/03: IMPALA-10502: Fetch events in batches (Addendum)

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 565d0bfa1d12df583ab6d2725ac6ecf2644cd50d
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Mon Jul 19 11:39:50 2021 -0700

    IMPALA-10502: Fetch events in batches (Addendum)
    
    The earlier change for IMPALA-10502 passes in a batch size
    of -1 to fetch all the events from a given event id during a
    DDL execution. While this works when HMS backing database is
    postgres, it doesn't work well when the HMS backend
    is a MySQL database due to HIVE-20226. This change works around the hive
    bug to fetch the events in batches of 1000 instead of fetching all the events
    in one RPC during the DDL execution.
    
    Testing:
    1. Added a unit test for the new changes introduced.
    2. Ran the previously failing tests on MySQL HMS backend.
    
    Change-Id: I34bb8984aeb91b37439f77722746f638d8774478
    Reviewed-on: http://gerrit.cloudera.org:8080/17698
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
    Tested-by: Zoltan Borok-Nagy <bo...@cloudera.com>
---
 .../catalog/events/MetastoreEventsProcessor.java   | 35 ++++++++++++++++++++--
 .../apache/impala/service/CatalogOpExecutor.java   |  1 +
 .../events/MetastoreEventsProcessorTest.java       | 23 ++++++++++++--
 .../SynchronousHMSEventProcessorForTests.java      | 25 ----------------
 4 files changed, 53 insertions(+), 31 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index e6d4f5d..e7061a2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -247,6 +247,16 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
   public static final String DELETE_EVENT_LOG_SIZE = "delete-event-log-size";
 
   /**
+   * Wrapper around {@link MetastoreEventsProcessor#getNextMetastoreEvents} which passes
+   * the default batch size.
+   */
+  public static List<NotificationEvent> getNextMetastoreEvents(
+      CatalogServiceCatalog catalog, long eventId, NotificationFilter filter)
+      throws ImpalaRuntimeException {
+    return getNextMetastoreEvents(catalog, eventId, filter, EVENTS_BATCH_SIZE_PER_RPC);
+  }
+
+  /**
    * Gets the next list of {@link NotificationEvent} from Hive Metastore which are
    * greater than the given eventId and filtered according to the provided filter.
    * @param catalog The CatalogServiceCatalog used to get the metastore client
@@ -255,15 +265,33 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
    *               events. Note that this is a client side filter not a server side
    *               filter. Unfortunately, HMS doesn't provide a similar mechanism to
    *               do server side filtering.
+   * @param eventsBatchSize the batch size for fetching the events from metastore.
    * @return List of {@link NotificationEvent} which are all greater than eventId and
    * satisfy the given filter.
    * @throws ImpalaRuntimeException in case of RPC errors to metastore.
    */
+  @VisibleForTesting
   public static List<NotificationEvent> getNextMetastoreEvents(
-      CatalogServiceCatalog catalog, long eventId, NotificationFilter filter)
-      throws ImpalaRuntimeException {
+      CatalogServiceCatalog catalog, long eventId, NotificationFilter filter,
+      int eventsBatchSize) throws ImpalaRuntimeException {
+    Preconditions.checkArgument(eventsBatchSize > 0);
+    List<NotificationEvent> result = new ArrayList<>();
     try (MetaStoreClient msc = catalog.getMetaStoreClient()) {
-      return msc.getHiveClient().getNextNotification(eventId, -1, filter).getEvents();
+      long toEventId = msc.getHiveClient().getCurrentNotificationEventId()
+          .getEventId();
+      if (toEventId <= eventId) return result;
+      long currentEventId = eventId;
+      while (currentEventId < toEventId) {
+        int batchSize = Math
+            .min(eventsBatchSize, (int)(toEventId - currentEventId));
+        for (NotificationEvent event : msc.getHiveClient()
+            .getNextNotification(currentEventId, batchSize, null).getEvents()) {
+          // if no filter is provided we add all the events
+          if (filter == null || filter.accept(event)) result.add(event);
+          currentEventId = event.getEventId();
+        }
+      }
+      return result;
     } catch (TException e) {
       throw new ImpalaRuntimeException(String.format(
           CatalogOpExecutor.HMS_RPC_ERROR_FORMAT_STR, "getNextNotification"), e);
@@ -429,6 +457,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
       return MetaStoreUtil.getMetastoreConfigValue(iMetaStoreClient, config, defaultVal);
     }
   }
+
   /**
    * returns the current value of LastSyncedEventId. This method is not thread-safe and
    * only to be used for testing purposes
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 18b8cd5..ee9bcb6 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -3253,6 +3253,7 @@ public class CatalogOpExecutor {
       }
       Table newTbl = catalog_.addIncompleteTable(msTable.getDbName(),
           msTable.getTableName(), eventIdTblPair.first);
+      Preconditions.checkNotNull(newTbl);
       LOG.debug("Created catalog table {} with create event id {}", newTbl.getFullName(),
           eventIdTblPair.first);
       // Submit the cache request and update the table metadata.
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 960c740..7fd3815 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -180,9 +180,6 @@ public class MetastoreEventsProcessorTest {
   private static CatalogOpExecutor catalogOpExecutor_;
   private static MetastoreEventsProcessor eventsProcessor_;
 
-  private static final Logger LOG =
-      LoggerFactory.getLogger(MetastoreEventsProcessorTest.class);
-
   @BeforeClass
   public static void setUpTestEnvironment() throws TException, ImpalaException {
     catalog_ = CatalogServiceTestCatalog.create();
@@ -287,6 +284,26 @@ public class MetastoreEventsProcessorTest {
     }
   }
 
+  @Test
+  public void testNextMetastoreEvents() throws Exception {
+    long currentEventId = eventsProcessor_.getCurrentEventId();
+    createDatabaseFromImpala(TEST_DB_NAME, null);
+    createTableFromImpala(TEST_DB_NAME, "testNextMetastoreEvents1", false);
+    createTable("testNextMetastoreEvents2", false);
+    List<NotificationEvent> events = MetastoreEventsProcessor.getNextMetastoreEvents(
+        eventsProcessor_.catalog_, currentEventId, null, 2);
+    assertEquals(3, events.size());
+    events = MetastoreEventsProcessor.getNextMetastoreEvents(
+        eventsProcessor_.catalog_, currentEventId+1, null, 10);
+    assertEquals(2, events.size());
+    events = MetastoreEventsProcessor.getNextMetastoreEvents(
+        eventsProcessor_.catalog_, currentEventId, null, 3);
+    assertEquals(3, events.size());
+    events = MetastoreEventsProcessor.getNextMetastoreEvents(
+        eventsProcessor_.catalog_, currentEventId+3, null, 3);
+    assertEquals(0, events.size());
+  }
+
   /**
    * Test provides a mock value and confirms if the MetastoreEventConfig validate
    * suceeds or fails as expected
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java b/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
index 9137f48..511150e 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
@@ -44,29 +44,4 @@ public class SynchronousHMSEventProcessorForTests extends MetastoreEventsProcess
   public void startScheduler() {
     // nothing to do here; there is no background thread for this processor
   }
-
-  public List<NotificationEvent> getNextNotificationEvents(long eventId)
-      throws MetastoreNotificationFetchException {
-    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      // fetch the current notification event id. We assume that the polling interval
-      // is small enough that most of these polling operations result in zero new
-      // events. In such a case, fetching current notification event id is much faster
-      // (and cheaper on HMS side) instead of polling for events directly
-      CurrentNotificationEventId currentNotificationEventId =
-          msClient.getHiveClient().getCurrentNotificationEventId();
-      long currentEventId = currentNotificationEventId.getEventId();
-
-      // no new events since we last polled
-      if (currentEventId <= eventId) {
-        return Collections.emptyList();
-      }
-
-      NotificationEventResponse response = msClient.getHiveClient()
-          .getNextNotification(eventId, 1000, null);
-      return response.getEvents();
-    } catch (TException e) {
-      throw new MetastoreNotificationFetchException(
-          "Unable to fetch notifications from metastore", e);
-    }
-  }
 }