You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/12/15 04:34:12 UTC

(impala) branch master updated: IMPALA-10949: Improve batching logic of partition events

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new a6de494f2 IMPALA-10949: Improve batching logic of partition events
a6de494f2 is described below

commit a6de494f24c47fbd679a037341ae0a34b9f696ff
Author: Sai Hemanth Gantasala <sa...@cloudera.com>
AuthorDate: Wed Sep 13 14:19:34 2023 -0700

    IMPALA-10949: Improve batching logic of partition events
    
    Currently the batching logic for partitions evaluates self-event check
    by looking at the event id of the last event in the batch. This patch
    improves the batching logic by evaluating table's lastSyncEventId to
    the current event's event id and decide whether to batch the event or
    not. This way we can have fewer batch sizes and avoid self events if
    any, beforehand.
    
    Testing: Added an end-to-end test to verfiy the batch size for
    ALTER_PARTITION batch events.
    
    Change-Id: I4e79510739347cbe669719a9e4cb9cabd5daa7d3
    Reviewed-on: http://gerrit.cloudera.org:8080/20485
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/events/MetastoreEvents.java     | 10 +++++++
 .../events/MetastoreEventsProcessorTest.java       | 30 ++++++++++++---------
 tests/custom_cluster/test_events_custom_configs.py | 31 ++++++++++++++++++++++
 3 files changed, 58 insertions(+), 13 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 9d00f67c0..0e40c37bf 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -737,6 +737,14 @@ public class MetastoreEvents {
     public String toString() {
       return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId_, eventType_);
     }
+
+    protected boolean isOlderThanLastSyncEventId(MetastoreEvent event) {
+      org.apache.impala.catalog.Table tbl = catalog_.getTableNoThrow(dbName_, tblName_);
+      if (tbl != null && tbl.getLastSyncedEventId() >= event.getEventId()) {
+        return true;
+      }
+      return false;
+    }
   }
 
   public static String getStringProperty(
@@ -1297,6 +1305,7 @@ public class MetastoreEvents {
     @Override
     public boolean canBeBatched(MetastoreEvent event) {
       if (!(event instanceof InsertEvent)) return false;
+      if (isOlderThanLastSyncEventId(event)) return false;
       InsertEvent insertEvent = (InsertEvent) event;
       // batched events must have consecutive event ids
       if (event.getEventId() != 1 + getEventId()) return false;
@@ -2197,6 +2206,7 @@ public class MetastoreEvents {
     @Override
     public boolean canBeBatched(MetastoreEvent event) {
       if (!(event instanceof AlterPartitionEvent)) return false;
+      if (isOlderThanLastSyncEventId(event)) return false;
       AlterPartitionEvent alterPartitionEvent = (AlterPartitionEvent) event;
       if (event.getEventId() != 1 + getEventId()) return false;
       // make sure that the event is on the same table
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 0ebe1528d..bba5f717a 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -2486,11 +2486,15 @@ public class MetastoreEventsProcessorTest {
 
   @SuppressWarnings({"rawtypes", "unchecked"})
   private void runEventBatchingTest(String testTblName,
-      Map<String, String> eventTypeToMessage) throws MetastoreNotificationException {
+      Map<String, String> eventTypeToMessage) throws DatabaseNotFoundException,
+      MetastoreNotificationException {
+    Table tbl = catalog_.getTable(TEST_DB_NAME, testTblName);
+    long lastSyncedEventId = tbl.getLastSyncedEventId();
     for (String eventType : eventTypeToMessage.keySet()) {
       String eventMessage = eventTypeToMessage.get(eventType);
-      // we have 10 mock batchable events which should be batched into 1
-      List<MetastoreEvent> mockEvents = createMockEvents(100, 10,
+      // we have 10 mock batchable events which should be batched into 1. Always create
+      // mock events with start event id much greater than table's lastSyncEventId
+      List<MetastoreEvent> mockEvents = createMockEvents(lastSyncedEventId + 100, 10,
           eventType, TEST_DB_NAME, testTblName, eventMessage);
       MetastoreEventFactory eventFactory = eventsProcessor_.getEventsFactory();
       List<MetastoreEvent> batch = eventFactory.createBatchEvents(mockEvents,
@@ -2502,16 +2506,16 @@ public class MetastoreEventsProcessorTest {
       // create a batch which consists of some other events
       // only contiguous events should be batched
       // 13-15 mock events which can be batched
-      mockEvents = createMockEvents(13, 3,
+      mockEvents = createMockEvents(lastSyncedEventId + 113, 3,
           eventType, TEST_DB_NAME, testTblName, eventMessage);
       // 17-18 can be batched
-      mockEvents.addAll(createMockEvents(17, 2,
+      mockEvents.addAll(createMockEvents(lastSyncedEventId + 117, 2,
           eventType, TEST_DB_NAME, testTblName, eventMessage));
       // event id 20 should not be batched
-      mockEvents.addAll(createMockEvents(20, 1,
+      mockEvents.addAll(createMockEvents(lastSyncedEventId + 120, 1,
           eventType, TEST_DB_NAME, testTblName, eventMessage));
       // events 22-24 should be batched
-      mockEvents.addAll(createMockEvents(22, 3,
+      mockEvents.addAll(createMockEvents(lastSyncedEventId + 122, 3,
           eventType, TEST_DB_NAME, testTblName, eventMessage));
 
       batch = eventFactory.createBatchEvents(mockEvents, eventsProcessor_.getMetrics());
@@ -2530,9 +2534,9 @@ public class MetastoreEventsProcessorTest {
       assertEquals(3, ((BatchPartitionEvent) batch4).getBatchEvents().size());
       // test to make sure that events which have different database name are not
       // batched
-      mockEvents = createMockEvents(100, 1, eventType, TEST_DB_NAME,
+      mockEvents = createMockEvents(lastSyncedEventId + 100, 1, eventType, TEST_DB_NAME,
           testTblName, eventMessage);
-      mockEvents.addAll(createMockEvents(101, 1, eventType, "db1",
+      mockEvents.addAll(createMockEvents(lastSyncedEventId + 101, 1, eventType, "db1",
           testTblName, eventMessage));
 
       List<MetastoreEvent> batchEvents = eventFactory.createBatchEvents(mockEvents,
@@ -2547,10 +2551,10 @@ public class MetastoreEventsProcessorTest {
       }
 
       // test no batching when table name is different
-      mockEvents = createMockEvents(100, 1, eventType, TEST_DB_NAME,
+      mockEvents = createMockEvents(lastSyncedEventId + 100, 1, eventType, TEST_DB_NAME,
           testTblName, eventMessage);
-      mockEvents.addAll(createMockEvents(101, 1, eventType, TEST_DB_NAME,
-          "testtbl", eventMessage));
+      mockEvents.addAll(createMockEvents(lastSyncedEventId + 101, 1, eventType,
+          TEST_DB_NAME, "testtbl", eventMessage));
       batchEvents = eventFactory.createBatchEvents(mockEvents,
           eventsProcessor_.getMetrics());
       assertEquals(2, batchEvents.size());
@@ -2563,7 +2567,7 @@ public class MetastoreEventsProcessorTest {
       }
     }
     // make sure 2 events of different event types are not batched together
-    long startEventId = 17;
+    long startEventId = lastSyncedEventId + 117;
     // batch 1
     List<MetastoreEvent> mockEvents = createMockEvents(startEventId, 3,
         "ALTER_PARTITION", TEST_DB_NAME, testTblName,
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index 1de5e06d1..04efc7255 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -377,6 +377,37 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     verify_skipping_older_events(test_old_table, False, True)
     verify_skipping_older_events(test_old_table, True, True)
 
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--hms_event_polling_interval_s=5"
+                  " --enable_sync_to_latest_event_on_ddls=true")
+  def test_skipping_batching_events(self, unique_database):
+    """Test to verify IMPALA-10949, improving batching logic for partition events.
+    Before batching the events, each event is checked if the event id is greater than
+    table's lastSyncEventId then the event can be batched else it can be skipped."""
+    test_batch_table = "test_batch_table"
+    self.client.execute(
+      "create table {}.{} like functional.alltypes"
+      .format(unique_database, test_batch_table))
+    self.client.execute(
+      "insert into {}.{} partition (year,month) select * from functional.alltypes"
+      .format(unique_database, test_batch_table))
+    # Generate batch ALTER_PARTITION events
+    self.run_stmt_in_hive(
+      "analyze table {}.{} compute statistics".format(unique_database, test_batch_table))
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_metric = "batch-events-created"
+    batch_events_1 = EventProcessorUtils.get_int_metric(batch_events_metric)
+    prev_skipped_events = EventProcessorUtils.get_int_metric("events-skipped")
+    self.run_stmt_in_hive(
+      "analyze table {}.{} compute statistics".format(unique_database, test_batch_table))
+    self.client.execute("refresh {0}.{1}".format(unique_database, test_batch_table))
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_2 = EventProcessorUtils.get_int_metric(batch_events_metric)
+    current_skipped_events = EventProcessorUtils.get_int_metric("events-skipped")
+    # Make sure no new batch events are created
+    assert batch_events_2 == batch_events_1
+    assert current_skipped_events - prev_skipped_events >= 24
+
   @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
   def test_commit_compaction_events(self, unique_database):
     """Test is to verify Impala-11626, commit compaction events triggered in HMS would