You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2021/07/20 16:42:59 UTC
[impala] 03/03: IMPALA-10502: Fetch events in batches (Addendum)
This is an automated email from the ASF dual-hosted git repository.
wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 565d0bfa1d12df583ab6d2725ac6ecf2644cd50d
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Mon Jul 19 11:39:50 2021 -0700
IMPALA-10502: Fetch events in batches (Addendum)
The earlier change for IMPALA-10502 passes in a batch size
of -1 to fetch all the events from a given event id during a
DDL execution. While this works when HMS backing database is
postgres, it doesn't work well when the HMS backend
is a MySQL database due to HIVE-20226. This change works around the hive
bug to fetch the events in batches of 1000 instead of fetching all the events
in one RPC during the DDL execution.
Testing:
1. Added a unit test for the new changes introduced.
2. Ran the previously failing tests on MySQL HMS backend.
Change-Id: I34bb8984aeb91b37439f77722746f638d8774478
Reviewed-on: http://gerrit.cloudera.org:8080/17698
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
Tested-by: Zoltan Borok-Nagy <bo...@cloudera.com>
---
.../catalog/events/MetastoreEventsProcessor.java | 35 ++++++++++++++++++++--
.../apache/impala/service/CatalogOpExecutor.java | 1 +
.../events/MetastoreEventsProcessorTest.java | 23 ++++++++++++--
.../SynchronousHMSEventProcessorForTests.java | 25 ----------------
4 files changed, 53 insertions(+), 31 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index e6d4f5d..e7061a2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -247,6 +247,16 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
public static final String DELETE_EVENT_LOG_SIZE = "delete-event-log-size";
/**
+ * Wrapper around {@link MetastoreEventsProcessor#getNextMetastoreEvents} which passes
+ * the default batch size.
+ */
+ public static List<NotificationEvent> getNextMetastoreEvents(
+ CatalogServiceCatalog catalog, long eventId, NotificationFilter filter)
+ throws ImpalaRuntimeException {
+ return getNextMetastoreEvents(catalog, eventId, filter, EVENTS_BATCH_SIZE_PER_RPC);
+ }
+
+ /**
* Gets the next list of {@link NotificationEvent} from Hive Metastore which are
* greater than the given eventId and filtered according to the provided filter.
* @param catalog The CatalogServiceCatalog used to get the metastore client
@@ -255,15 +265,33 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
* events. Note that this is a client side filter not a server side
* filter. Unfortunately, HMS doesn't provide a similar mechanism to
* do server side filtering.
+ * @param eventsBatchSize the batch size for fetching the events from metastore.
* @return List of {@link NotificationEvent} which are all greater than eventId and
* satisfy the given filter.
* @throws ImpalaRuntimeException in case of RPC errors to metastore.
*/
+ @VisibleForTesting
public static List<NotificationEvent> getNextMetastoreEvents(
- CatalogServiceCatalog catalog, long eventId, NotificationFilter filter)
- throws ImpalaRuntimeException {
+ CatalogServiceCatalog catalog, long eventId, NotificationFilter filter,
+ int eventsBatchSize) throws ImpalaRuntimeException {
+ Preconditions.checkArgument(eventsBatchSize > 0);
+ List<NotificationEvent> result = new ArrayList<>();
try (MetaStoreClient msc = catalog.getMetaStoreClient()) {
- return msc.getHiveClient().getNextNotification(eventId, -1, filter).getEvents();
+ long toEventId = msc.getHiveClient().getCurrentNotificationEventId()
+ .getEventId();
+ if (toEventId <= eventId) return result;
+ long currentEventId = eventId;
+ while (currentEventId < toEventId) {
+ int batchSize = Math
+ .min(eventsBatchSize, (int)(toEventId - currentEventId));
+ for (NotificationEvent event : msc.getHiveClient()
+ .getNextNotification(currentEventId, batchSize, null).getEvents()) {
+ // if no filter is provided we add all the events
+ if (filter == null || filter.accept(event)) result.add(event);
+ currentEventId = event.getEventId();
+ }
+ }
+ return result;
} catch (TException e) {
throw new ImpalaRuntimeException(String.format(
CatalogOpExecutor.HMS_RPC_ERROR_FORMAT_STR, "getNextNotification"), e);
@@ -429,6 +457,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
return MetaStoreUtil.getMetastoreConfigValue(iMetaStoreClient, config, defaultVal);
}
}
+
/**
* returns the current value of LastSyncedEventId. This method is not thread-safe and
* only to be used for testing purposes
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 18b8cd5..ee9bcb6 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -3253,6 +3253,7 @@ public class CatalogOpExecutor {
}
Table newTbl = catalog_.addIncompleteTable(msTable.getDbName(),
msTable.getTableName(), eventIdTblPair.first);
+ Preconditions.checkNotNull(newTbl);
LOG.debug("Created catalog table {} with create event id {}", newTbl.getFullName(),
eventIdTblPair.first);
// Submit the cache request and update the table metadata.
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 960c740..7fd3815 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -180,9 +180,6 @@ public class MetastoreEventsProcessorTest {
private static CatalogOpExecutor catalogOpExecutor_;
private static MetastoreEventsProcessor eventsProcessor_;
- private static final Logger LOG =
- LoggerFactory.getLogger(MetastoreEventsProcessorTest.class);
-
@BeforeClass
public static void setUpTestEnvironment() throws TException, ImpalaException {
catalog_ = CatalogServiceTestCatalog.create();
@@ -287,6 +284,26 @@ public class MetastoreEventsProcessorTest {
}
}
+ @Test
+ public void testNextMetastoreEvents() throws Exception {
+ long currentEventId = eventsProcessor_.getCurrentEventId();
+ createDatabaseFromImpala(TEST_DB_NAME, null);
+ createTableFromImpala(TEST_DB_NAME, "testNextMetastoreEvents1", false);
+ createTable("testNextMetastoreEvents2", false);
+ List<NotificationEvent> events = MetastoreEventsProcessor.getNextMetastoreEvents(
+ eventsProcessor_.catalog_, currentEventId, null, 2);
+ assertEquals(3, events.size());
+ events = MetastoreEventsProcessor.getNextMetastoreEvents(
+ eventsProcessor_.catalog_, currentEventId+1, null, 10);
+ assertEquals(2, events.size());
+ events = MetastoreEventsProcessor.getNextMetastoreEvents(
+ eventsProcessor_.catalog_, currentEventId, null, 3);
+ assertEquals(3, events.size());
+ events = MetastoreEventsProcessor.getNextMetastoreEvents(
+ eventsProcessor_.catalog_, currentEventId+3, null, 3);
+ assertEquals(0, events.size());
+ }
+
/**
* Test provides a mock value and confirms if the MetastoreEventConfig validate
* suceeds or fails as expected
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java b/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
index 9137f48..511150e 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
@@ -44,29 +44,4 @@ public class SynchronousHMSEventProcessorForTests extends MetastoreEventsProcess
public void startScheduler() {
// nothing to do here; there is no background thread for this processor
}
-
- public List<NotificationEvent> getNextNotificationEvents(long eventId)
- throws MetastoreNotificationFetchException {
- try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
- // fetch the current notification event id. We assume that the polling interval
- // is small enough that most of these polling operations result in zero new
- // events. In such a case, fetching current notification event id is much faster
- // (and cheaper on HMS side) instead of polling for events directly
- CurrentNotificationEventId currentNotificationEventId =
- msClient.getHiveClient().getCurrentNotificationEventId();
- long currentEventId = currentNotificationEventId.getEventId();
-
- // no new events since we last polled
- if (currentEventId <= eventId) {
- return Collections.emptyList();
- }
-
- NotificationEventResponse response = msClient.getHiveClient()
- .getNextNotification(eventId, 1000, null);
- return response.getEvents();
- } catch (TException e) {
- throw new MetastoreNotificationFetchException(
- "Unable to fetch notifications from metastore", e);
- }
- }
}