You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2024/01/29 23:44:14 UTC

(impala) 02/02: IMPALA-12463: Batch non-consecutive table events in the event processor

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit eaa35b02507a834edd0d219343fd4bd075f21762
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Thu Sep 28 15:47:55 2023 -0700

    IMPALA-12463: Batch non-consecutive table events in the event processor
    
    The current batching of events requires events to be
    consecutive. When there are multiple tables being modified,
    events can be interleaved such that each batch is very
    small. If the batching criteria can be relaxed, the
    non-consecutive events could be batched and processed more
    efficiently.
    
    This implements batching for non-consecutive events by
    keeping state on each table individually. Different tables
    can continue to accumulate batchable events independently
    unless they hit a condition that cuts the batch. The batching
    can ignore some events on unrelated tables, but the same
    rules apply about the batching of events on an individual table.
    For example, for a particular table, any non-INSERT event
    between two INSERT events on that table continues to cut the
    batching.
    
    In addition, there are certain cross-table events that need to
    cut batches across multiple tables:
     1. Drop database / alter database cuts any batches on tables
        in the affected database.
     2. Alter table rename cuts any batches on the source or destination
        table.
    
    This emits events in monotonically increasing order by
    maintaining the resulting events in a sorted map. All
    non-batchable events will be emitted in the original order.
    Batchable events are emitted based on the ending Event ID
    of the batch. This means that batchable events can move
    later in the sequence, but they cannot move earlier.
    
    This is based on the original design by Wenzhe Zhou.
    
    Testing:
     - MetastoreEventsProcessorTest has new tests for interleaved
       events on two tables as well as tests for events that
       cut batches across tables (alter table, drop database,
       alter database).
     - A core job showed no other test failures.
    
    Change-Id: I849c0306bc46080ee4059854f42d9c217a89b905
    Reviewed-on: http://gerrit.cloudera.org:8080/20533
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/events/MetastoreEvents.java     | 213 +++++++++++++++----
 .../events/MetastoreEventsProcessorTest.java       | 226 ++++++++++++++++++---
 2 files changed, 376 insertions(+), 63 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 887404d36..0f073ffce 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -299,45 +300,185 @@ public class MetastoreEvents {
     }
 
     /**
-     * This method batches together any eligible consecutive elements from the given
-     * list of {@code MetastoreEvent}. The returned list may or may not contain batch
+     * This method flushes all in-progress batches for tables from the specified
+     * database from the pendingTableEventsMap to the sortedFinalBatches.
+     */
+    void flushBatchesForDb(
+        Map<String, Map<String, MetastoreEvent>> pendingTableEventsMap,
+        TreeMap<Long, MetastoreEvent> sortedFinalBatches, String dbName) {
+      String lowerDbName = dbName.toLowerCase();
+      Map<String, MetastoreEvent> dbMap = pendingTableEventsMap.get(lowerDbName);
+      if (dbMap != null) {
+        // Flush out any pending events in the database map and delete it
+        for (MetastoreEvent event : dbMap.values()) {
+          sortedFinalBatches.put(event.getEventId(), event);
+        }
+        pendingTableEventsMap.remove(lowerDbName);
+      }
+    }
+
+    /**
+     * This method flushes any in-progress batch for the specified table
+     * from the pendingTableEventsMap to the sortedFinalBatches.
+     */
+    void flushBatchForTable(
+        Map<String, Map<String, MetastoreEvent>> pendingTableEventsMap,
+        TreeMap<Long, MetastoreEvent> sortedFinalBatches, Table table) {
+      // Produce the lower-cased fully qualified table name
+      String dbName = table.getDbName().toLowerCase();
+      String tableName = table.getTableName().toLowerCase();
+      Map<String, MetastoreEvent> dbMap = pendingTableEventsMap.get(dbName);
+      if (dbMap != null) {
+        MetastoreEvent tableEvent = dbMap.get(tableName);
+        if (tableEvent != null) {
+          sortedFinalBatches.put(tableEvent.getEventId(), tableEvent);
+          dbMap.remove(tableName);
+          // If this was the last table, delete the DB map
+          if (dbMap.isEmpty()) {
+            pendingTableEventsMap.remove(dbName);
+          }
+        }
+      }
+    }
+
+    /**
+     * Event batching is done on a per-table basis to allow more batching in
+     * interleaved circumstances. Single-table events still follow the same rules
+     * for batching, but certain events cross table boundaries and should cut
+     * batches across multiple tables. This method detects cross-table events and
+     * cuts the appropriate batches. Currently, it handles drop database, alter
+     * database, and alter table rename. It is a no-op for events that are not
+     * cross-table.
+     */
+    void cutBatchesForCrossTableEvents(MetastoreEvent event,
+        Map<String, Map<String, MetastoreEvent>> pendingTableEventsMap,
+        TreeMap<Long, MetastoreEvent> sortedFinalBatches) {
+      // drop database - cuts any existing batches for tables in that database
+      // alter database - cuts any existing batches for tables in the database
+      // alter table rename - cuts any existing batches for the source or destination
+      //   table
+      if (event instanceof DropDatabaseEvent || event instanceof AlterDatabaseEvent) {
+        // Any batched events for tables from this database need to be flushed
+        // before emitting the AlterDatabaseEvent or DropDatabaseEvent.
+        flushBatchesForDb(pendingTableEventsMap, sortedFinalBatches, event.getDbName());
+      } else if (event instanceof AlterTableEvent) {
+        AlterTableEvent alterTable = (AlterTableEvent) event;
+        if (alterTable.isRename()) {
+          // Flush any batched events for the source table.
+          Table beforeTable = alterTable.getBeforeTable();
+          flushBatchForTable(pendingTableEventsMap, sortedFinalBatches, beforeTable);
+
+          // Flush any batched events for the destination table. Given that the
+          // destination table must not exist when doing this rename, valid sequences
+          // are already handled implicitly by the existing batch-breaking logic
+          // (combined with the sorting of the final batches). This does the flushing
+          // explicitly in case there are any edge cases not handled by the existing
+          // mechanisms.
+          Table afterTable = alterTable.getAfterTable();
+          flushBatchForTable(pendingTableEventsMap, sortedFinalBatches, afterTable);
+        }
+      }
+    }
+
+    /**
+     * This method batches together any eligible events from the given list of
+     * {@code MetastoreEvent}. The returned list may or may not contain batch
      * events depending on whether it finds any events which could be batched together.
+     * Events on a table do not need to be contiguous to be batched, but there must
+     * not be an intervening event that cuts the batch.
      */
     @VisibleForTesting
     List<MetastoreEvent> createBatchEvents(List<MetastoreEvent> events, Metrics metrics) {
       if (events.size() < 2) return events;
-      int i = 0, j = 1;
-      List<MetastoreEvent> batchedEventList = new ArrayList<>();
-      MetastoreEvent current = events.get(i);
-      // startEventId points to the current event's or the start of the batch
-      // in case current is a batch event.
-      long startEventId = current.getEventId();
-      while (j < events.size()) {
-        MetastoreEvent next = events.get(j);
-        // check if the current metastore event and the next can be batched together
-        if (!current.canBeBatched(next)) {
-          // events cannot be batched, add the current event under consideration to the
-          // list and update current to the next
-          if (current.getNumberOfEvents() > 1) {
-            current.infoLog("Created a batch event for {} events from {} to {}",
-                current.getNumberOfEvents(), startEventId, current.getEventId());
-            metrics.getCounter(MetastoreEventsProcessor.NUMBER_OF_BATCH_EVENTS).inc();
+      // We can batch certain events on the same table as long as there is no
+      // intervening event that cuts the batch. To allow this non-contiguous batching,
+      // we keep state for each table separately. This is a two-level structure with
+      // the first layer keyed on the database name and the second layer keyed on the
+      // table name. This makes it possible to flush all entries for a database
+      // efficiently. Both database name and table name are lowercased to make this case
+      // insensitive.
+      //
+      // Entries in this hash map are still pending and can accept more entries into
+      // a batch when eligible. Each time an event is added a batch, it changes the
+      // ending Event ID, so this holds the pending batches until the batch is finalized.
+      // When the batch is finalized (either by an event that cuts the batch or by
+      // running out of events), it is moved to the sortedFinalBatches.
+      Map<String, Map<String, MetastoreEvent>> pendingTableEventsMap = new HashMap<>();
+
+      // The output events need to be monotonically increasing in their Event IDs,
+      // so we insert the resulting batches into a TreeMap and use that to produce
+      // the output list. Examples:
+      // 1. Basic ordering
+      // Suppose there are inserts on 4 different tables (Event ID in parens):
+      // A(1), B(2), C(3), D(4)
+      // The sorting will emit those in the same order they arrived. This also applies
+      // to any contiguous batching.
+      // 2. Interleaved events
+      // Suppose there are interleaved events that can be batched for different tables:
+      // A(1), B(2), A(3), B(4)
+      // The sorting will emit the batches in order of ascending ending Event ID, i.e.
+      // A(1-3), B(2-4)
+      // Since the ending Event ID of a batch changes each time an extra event is added,
+      // this structure should only contain finalized batches that can't change.
+      TreeMap<Long, MetastoreEvent> sortedFinalBatches = new TreeMap<>();
+
+      for (MetastoreEvent next : events) {
+        // Events that impact multiple tables need special handling to cut event batches
+        // for all impacted tables. This logic is in addition to the regular branch
+        // cutting logic that happens on a single-table basis.
+        cutBatchesForCrossTableEvents(next, pendingTableEventsMap, sortedFinalBatches);
+
+        if (!(next instanceof MetastoreTableEvent)) {
+          // No batching for non-table events
+          sortedFinalBatches.put(next.getEventId(), next);
+          continue;
+        }
+        String dbName = next.getDbName().toLowerCase();
+        String tableName = next.getTableName().toLowerCase();
+        // First, lookup the dbMap or create it if it doesn't exist
+        Map<String, MetastoreEvent> dbMap =
+            pendingTableEventsMap.computeIfAbsent(dbName, k -> new HashMap<>());
+        // Second, find the table entry in the dbMap
+        MetastoreEvent current = dbMap.get(tableName);
+        if (current != null) {
+          // Check if the next metastore event for the table can be batched into the
+          // current event for the table.
+          if (!current.canBeBatched(next)) {
+            // Events cannot be batched. Flush the current event in the table map to the
+            // output and put the next element into the table map.
+            sortedFinalBatches.put(current.getEventId(), current);
+            dbMap.put(tableName, next);
+          } else {
+            // next can be batched into current event
+            dbMap.put(tableName,
+                      Preconditions.checkNotNull(current.addToBatchEvents(next)));
           }
-          batchedEventList.add(current);
-          current = next;
-          startEventId = next.getEventId();
         } else {
-          // next can be batched into current event
-          current = Preconditions.checkNotNull(current.addToBatchEvents(next));
+          // New entry for this table
+          dbMap.put(tableName, next);
+        }
+      }
+      // Flush out any pending events
+      for (Map<String, MetastoreEvent> dbMap : pendingTableEventsMap.values()) {
+        for (MetastoreEvent event : dbMap.values()) {
+          sortedFinalBatches.put(event.getEventId(), event);
         }
-        j++;
       }
-      if (current.getNumberOfEvents() > 1) {
-        current.infoLog("Created a batch event for {} events from {} to {}",
-            current.getNumberOfEvents(), startEventId, current.getEventId());
-        metrics.getCounter(MetastoreEventsProcessor.NUMBER_OF_BATCH_EVENTS).inc();
+
+      // We defer logging about the batches created until the end so that we can output
+      // them in the sorted order used for the actual output list.
+      List<MetastoreEvent> batchedEventList =
+          new ArrayList<>(sortedFinalBatches.values());
+      for (MetastoreEvent event : batchedEventList) {
+        if (event.getNumberOfEvents() > 1) {
+          Preconditions.checkState(event instanceof BatchPartitionEvent);
+          BatchPartitionEvent batchEvent = (BatchPartitionEvent) event;
+          batchEvent.infoLog("Created a batch event for {} events between {} and {}",
+              batchEvent.getNumberOfEvents(), batchEvent.getFirstEventId(),
+              batchEvent.getLastEventId());
+          metrics.getCounter(MetastoreEventsProcessor.NUMBER_OF_BATCH_EVENTS).inc();
+        }
       }
-      batchedEventList.add(current);
       return batchedEventList;
     }
   }
@@ -1329,8 +1470,6 @@ public class MetastoreEvents {
       if (!(event instanceof InsertEvent)) return false;
       if (isOlderThanLastSyncEventId(event)) return false;
       InsertEvent insertEvent = (InsertEvent) event;
-      // batched events must have consecutive event ids
-      if (event.getEventId() != 1 + getEventId()) return false;
       // make sure that the event is on the same table
       if (!getFullyQualifiedTblName().equalsIgnoreCase(
           insertEvent.getFullyQualifiedTblName())) {
@@ -2246,7 +2385,6 @@ public class MetastoreEvents {
       if (!(event instanceof AlterPartitionEvent)) return false;
       if (isOlderThanLastSyncEventId(event)) return false;
       AlterPartitionEvent alterPartitionEvent = (AlterPartitionEvent) event;
-      if (event.getEventId() != 1 + getEventId()) return false;
       // make sure that the event is on the same table
       if (!getFullyQualifiedTblName().equalsIgnoreCase(
           alterPartitionEvent.getFullyQualifiedTblName())) {
@@ -2464,8 +2602,9 @@ public class MetastoreEvents {
       }
       if (eventsToProcess.isEmpty() && partitionEventsToForceReload.isEmpty()) {
         LOG.info(
-            "Ignoring events from event id {} to {} since they modify parameters "
-            + " which can be ignored", getFirstEventId(), getLastEventId());
+            "Ignoring {} events between event id {} and {} since they modify parameters"
+            + " which can be ignored", getNumberOfEvents(), getFirstEventId(),
+            getLastEventId());
         return;
       }
 
@@ -2499,9 +2638,11 @@ public class MetastoreEvents {
           }
         } catch (CatalogException e) {
           throw new MetastoreNotificationNeedsInvalidateException(String.format(
-              "Refresh partitions on table %s failed when processing event ids %s-%s. "
+              "Refresh partitions on table %s failed when processing a batch of %s "
+              + "events between event ids %s and %s. "
               + "Issue an invalidate command to reset the event processor state.",
-              getFullyQualifiedTblName(), getFirstEventId(), getLastEventId()), e);
+              getFullyQualifiedTblName(), getNumberOfEvents(), getFirstEventId(),
+              getLastEventId()), e);
         }
       }
     }
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index bbdc22700..ddae5a95e 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -88,6 +88,8 @@ import org.apache.impala.catalog.ScalarFunction;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.events.ConfigValidator.ValidationResult;
+import org.apache.impala.catalog.events.MetastoreEvents.AlterDatabaseEvent;
+import org.apache.impala.catalog.events.MetastoreEvents.DropDatabaseEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.AlterPartitionEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.BatchPartitionEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.AlterTableEvent;
@@ -2440,6 +2442,46 @@ public class MetastoreEventsProcessorTest {
     partVals.add(Collections.singletonList("2"));
     addPartitions(TEST_DB_NAME, testTblName, partVals);
     eventsProcessor_.processEvents();
+
+    // Collect some other types of events to test scenarios where an event cuts batching
+    Map<String, String> miscEventTypesToMessage = new HashMap<>();
+    createDatabaseFromImpala("database_to_be_dropped", "whatever");
+    eventsProcessor_.processEvents();
+
+    // Get ALTER_DATABASE event
+    String currentLocation =
+        catalog_.getDb("database_to_be_dropped").getMetaStoreDb().getLocationUri();
+    String newLocation = currentLocation + File.separatorChar + "newTestLocation";
+    Database alteredDb =
+        catalog_.getDb("database_to_be_dropped").getMetaStoreDb().deepCopy();
+    alteredDb.setLocationUri(newLocation);
+    alterDatabase(alteredDb);
+    List<NotificationEvent> alterDbEvents = eventsProcessor_.getNextMetastoreEvents();
+    assertEquals(1, alterDbEvents.size());
+    assertEquals("ALTER_DATABASE", alterDbEvents.get(0).getEventType());
+    miscEventTypesToMessage.put("ALTER_DATABASE", alterDbEvents.get(0).getMessage());
+    eventsProcessor_.processEvents();
+
+    // Get DROP_DATABASE event
+    dropDatabaseCascade("database_to_be_dropped");
+    List<NotificationEvent> dropDbEvents = eventsProcessor_.getNextMetastoreEvents();
+    assertEquals(1, dropDbEvents.size());
+    assertEquals("DROP_DATABASE", dropDbEvents.get(0).getEventType());
+    miscEventTypesToMessage.put("DROP_DATABASE", dropDbEvents.get(0).getMessage());
+    eventsProcessor_.processEvents();
+
+    // Get ALTER_TABLE rename event
+    createTable(TEST_DB_NAME, "table_before_rename", false);
+    eventsProcessor_.processEvents();
+    alterTableRename("table_before_rename", "table_after_rename", null);
+    List<NotificationEvent> alterTblEvents = eventsProcessor_.getNextMetastoreEvents();
+    assertEquals(1, alterTblEvents.size());
+    assertEquals("ALTER_TABLE", alterTblEvents.get(0).getEventType());
+    miscEventTypesToMessage.put("ALTER_TABLE", alterTblEvents.get(0).getMessage());
+    eventsProcessor_.processEvents();
+    dropTable("table_after_rename");
+    eventsProcessor_.processEvents();
+
     alterPartitionsParams(TEST_DB_NAME, testTblName, "testkey", "val", partVals);
     // we fetch a real alter partition event so that we can generate mocks using its
     // contents below
@@ -2482,12 +2524,13 @@ public class MetastoreEventsProcessorTest {
     Map<String, String> eventTypeToMessage = new HashMap<>();
     eventTypeToMessage.put("ALTER_PARTITION", alterPartMessage);
     eventTypeToMessage.put("INSERT", insertEvent.getMessage());
-    runEventBatchingTest(testTblName, eventTypeToMessage);
+    runEventBatchingTest(testTblName, eventTypeToMessage, miscEventTypesToMessage);
   }
 
   @SuppressWarnings({"rawtypes", "unchecked"})
   private void runEventBatchingTest(String testTblName,
-      Map<String, String> eventTypeToMessage) throws DatabaseNotFoundException,
+      Map<String, String> eventTypeToMessage,
+      Map<String, String> miscEventTypesToMessage) throws DatabaseNotFoundException,
       MetastoreNotificationException {
     Table tbl = catalog_.getTable(TEST_DB_NAME, testTblName);
     long lastSyncedEventId = tbl.getLastSyncedEventId();
@@ -2504,35 +2547,66 @@ public class MetastoreEventsProcessorTest {
       assertTrue(batch.get(0) instanceof BatchPartitionEvent);
       BatchPartitionEvent batchEvent = (BatchPartitionEvent) batch.get(0);
       assertEquals(10, batchEvent.getBatchEvents().size());
-      // create a batch which consists of some other events
-      // only contiguous events should be batched
-      // 13-15 mock events which can be batched
-      mockEvents = createMockEvents(lastSyncedEventId + 113, 3,
-          eventType, TEST_DB_NAME, testTblName, eventMessage);
-      // 17-18 can be batched
+
+      // We support batching some non-contiguous events on the same table
+      // as long as the events are independent. Test successful batching
+      // of interleaved events from two tables.
+      // Events on Table 1: 13-15,17-18,20 => 13-20 (batch size = 6)
+      // Events on Table 2: 10-12,16,19,21-22 => 10-22 (batch size = 7)
+      // Table 2
+      mockEvents = createMockEvents(lastSyncedEventId + 110, 3,
+          eventType, TEST_DB_NAME, "table2", eventMessage);
+      // Table 1
+      mockEvents.addAll(createMockEvents(lastSyncedEventId + 113, 3,
+          eventType, TEST_DB_NAME, "table1", eventMessage));
+      // Table 2
+      mockEvents.addAll(createMockEvents(lastSyncedEventId + 116, 1,
+          eventType, TEST_DB_NAME, "table2", eventMessage));
+      // Table 1
       mockEvents.addAll(createMockEvents(lastSyncedEventId + 117, 2,
-          eventType, TEST_DB_NAME, testTblName, eventMessage));
-      // event id 20 should not be batched
+          eventType, TEST_DB_NAME, "table1", eventMessage));
+      // Table 2
+      mockEvents.addAll(createMockEvents(lastSyncedEventId + 119, 1,
+          eventType, TEST_DB_NAME, "table2", eventMessage));
+      // Table 1
       mockEvents.addAll(createMockEvents(lastSyncedEventId + 120, 1,
-          eventType, TEST_DB_NAME, testTblName, eventMessage));
-      // events 22-24 should be batched
-      mockEvents.addAll(createMockEvents(lastSyncedEventId + 122, 3,
-          eventType, TEST_DB_NAME, testTblName, eventMessage));
-
+          eventType, TEST_DB_NAME, "table1", eventMessage));
+      // Table 2
+      mockEvents.addAll(createMockEvents(lastSyncedEventId + 121, 2,
+          eventType, TEST_DB_NAME, "table2", eventMessage));
       batch = eventFactory.createBatchEvents(mockEvents, eventsProcessor_.getMetrics());
-      assertEquals(4, batch.size());
-      MetastoreEvent batch1 = batch.get(0);
-      assertEquals(3, ((BatchPartitionEvent) batch1).getBatchEvents().size());
-      MetastoreEvent batch2 = batch.get(1);
-      assertEquals(2, ((BatchPartitionEvent) batch2).getBatchEvents().size());
-      MetastoreEvent batch3 = batch.get(2);
-      if (eventType.equalsIgnoreCase("ALTER_PARTITION")) {
-        assertTrue(batch3 instanceof AlterPartitionEvent);
-      } else {
-        assertTrue(batch3 instanceof InsertEvent);
+      assertEquals(2, batch.size());
+      // Batch 1 must be the batch with the lower Event ID (i.e. table 1)
+      BatchPartitionEvent batch1 = (BatchPartitionEvent) batch.get(0);
+      assertEquals(6, batch1.getNumberOfEvents());
+      assertEquals(lastSyncedEventId + 120, batch1.getEventId());
+      assertEquals("table1", batch1.getTableName());
+      // Batch 2 must be the batch with the higher Event ID (i.e. table 2)
+      BatchPartitionEvent batch2 = (BatchPartitionEvent) batch.get(1);
+      assertEquals(7, batch2.getNumberOfEvents());
+      assertEquals(lastSyncedEventId + 122, batch2.getEventId());
+      assertEquals("table2", batch2.getTableName());
+
+      // test to make sure that events on different tables are still monotonic
+      mockEvents = createMockEvents(lastSyncedEventId + 110, 1,
+          eventType, TEST_DB_NAME, "table1", eventMessage);
+      mockEvents.addAll(createMockEvents(lastSyncedEventId + 111, 1,
+          eventType, TEST_DB_NAME, "table2", eventMessage));
+      mockEvents.addAll(createMockEvents(lastSyncedEventId + 112, 1,
+          eventType, TEST_DB_NAME, "table3", eventMessage));
+      mockEvents.addAll(createMockEvents(lastSyncedEventId + 113, 1,
+          eventType, TEST_DB_NAME, "table4", eventMessage));
+      mockEvents.addAll(createMockEvents(lastSyncedEventId + 114, 1,
+          eventType, TEST_DB_NAME, "table5", eventMessage));
+      batch = eventFactory.createBatchEvents(mockEvents, eventsProcessor_.getMetrics());
+      assertEquals(5, batch.size());
+      for (int i = 0; i < batch.size(); i++) {
+        MetastoreEvent monotonicEvent = batch.get(i);
+        assertEquals(1, monotonicEvent.getNumberOfEvents());
+        assertEquals(lastSyncedEventId + 110+i, monotonicEvent.getEventId());
+        assertEquals("table"+(i+1), monotonicEvent.getTableName());
       }
-      MetastoreEvent batch4 = batch.get(3);
-      assertEquals(3, ((BatchPartitionEvent) batch4).getBatchEvents().size());
+
       // test to make sure that events which have different database name are not
       // batched
       mockEvents = createMockEvents(lastSyncedEventId + 100, 1, eventType, TEST_DB_NAME,
@@ -2566,6 +2640,104 @@ public class MetastoreEventsProcessorTest {
           assertTrue(event instanceof InsertEvent);
         }
       }
+
+      // Test that alter database cuts batches for tables in that database
+      // This test case may not be strictly necessary. Alter database should not
+      // impact batchable events. Out of an abundance of caution, we test it
+      // anyways.
+      mockEvents = createMockEvents(lastSyncedEventId + 100, 1, eventType,
+          "database_to_be_dropped", testTblName, eventMessage);
+      mockEvents.addAll(
+          createMockEvents(lastSyncedEventId + 101, 1, eventType, "other_database",
+              testTblName, eventMessage));
+      mockEvents.addAll(
+          createMockEvents(lastSyncedEventId + 102, 1, "ALTER_DATABASE",
+              "database_to_be_dropped", null,
+              miscEventTypesToMessage.get("ALTER_DATABASE")));
+      mockEvents.addAll(
+          createMockEvents(lastSyncedEventId + 103, 1, eventType,
+              "database_to_be_dropped", testTblName, eventMessage));
+      mockEvents.addAll(
+          createMockEvents(lastSyncedEventId + 104, 1, eventType, "other_database",
+              testTblName, eventMessage));
+      batchEvents = eventFactory.createBatchEvents(mockEvents,
+          eventsProcessor_.getMetrics());
+      assertEquals(4, batchEvents.size());
+      assertTrue(batchEvents.get(1) instanceof AlterDatabaseEvent);
+      assertEquals("database_to_be_dropped", batchEvents.get(0).getDbName());
+      assertEquals("database_to_be_dropped", batchEvents.get(2).getDbName());
+      assertEquals("other_database", batchEvents.get(3).getDbName());
+
+      // Test that drop database cuts batches for tables in that database
+      // This is a synthetic sequence that should not occur in the wild.
+      mockEvents = createMockEvents(lastSyncedEventId + 100, 1, eventType,
+          "database_to_be_dropped", testTblName, eventMessage);
+      mockEvents.addAll(
+          createMockEvents(lastSyncedEventId + 101, 1, eventType, "other_database",
+              testTblName, eventMessage));
+      mockEvents.addAll(
+          createMockEvents(lastSyncedEventId + 102, 1, "DROP_DATABASE",
+              "database_to_be_dropped", null,
+              miscEventTypesToMessage.get("DROP_DATABASE")));
+      mockEvents.addAll(
+          createMockEvents(lastSyncedEventId + 103, 1, eventType,
+              "database_to_be_dropped", testTblName, eventMessage));
+      mockEvents.addAll(
+          createMockEvents(lastSyncedEventId + 104, 1, eventType, "other_database",
+              testTblName, eventMessage));
+      batchEvents = eventFactory.createBatchEvents(mockEvents,
+          eventsProcessor_.getMetrics());
+      assertEquals(4, batchEvents.size());
+      assertTrue(batchEvents.get(1) instanceof DropDatabaseEvent);
+      assertEquals("database_to_be_dropped", batchEvents.get(0).getDbName());
+      assertEquals("database_to_be_dropped", batchEvents.get(2).getDbName());
+      assertEquals("other_database", batchEvents.get(3).getDbName());
+
+      // Test that alter table rename cuts batches for the two table
+      // names involved (but not an unrelated table).
+      // This is a synthetic sequence that should not occur in the wild.
+      mockEvents = createMockEvents(lastSyncedEventId + 100, 2, eventType, TEST_DB_NAME,
+          "table_before_rename", eventMessage);
+      mockEvents.addAll(
+          createMockEvents(lastSyncedEventId + 102, 2, eventType, TEST_DB_NAME,
+              "table_after_rename", eventMessage));
+      mockEvents.addAll(
+          createMockEvents(lastSyncedEventId + 104, 1, eventType, TEST_DB_NAME,
+              "other_table", eventMessage));
+      mockEvents.addAll(
+          createMockEvents(lastSyncedEventId + 105, 1, "ALTER_TABLE", TEST_DB_NAME,
+              "table_before_rename", miscEventTypesToMessage.get("ALTER_TABLE")));
+      mockEvents.addAll(
+          createMockEvents(lastSyncedEventId + 106, 2, eventType, TEST_DB_NAME,
+              "table_before_rename", eventMessage));
+      mockEvents.addAll(
+          createMockEvents(lastSyncedEventId + 108, 2, eventType, TEST_DB_NAME,
+              "table_after_rename", eventMessage));
+      mockEvents.addAll(
+          createMockEvents(lastSyncedEventId + 110, 1, eventType, TEST_DB_NAME,
+              "other_table", eventMessage));
+      batchEvents = eventFactory.createBatchEvents(mockEvents,
+          eventsProcessor_.getMetrics());
+      // The table_before_rename and table_after_rename tables keep their
+      // events in order (with batching only for adjacent events).
+      // The other_table can batch across the alter table event.
+      assertEquals(6, batchEvents.size());
+      BatchPartitionEvent altTblBatch0 = (BatchPartitionEvent) batchEvents.get(0);
+      assertEquals(2, altTblBatch0.getNumberOfEvents());
+      assertEquals("table_before_rename", altTblBatch0.getTableName());
+      BatchPartitionEvent altTblBatch1 = (BatchPartitionEvent) batchEvents.get(1);
+      assertEquals(2, altTblBatch1.getNumberOfEvents());
+      assertEquals("table_after_rename", altTblBatch1.getTableName());
+      assertTrue(batchEvents.get(2) instanceof AlterTableEvent);
+      BatchPartitionEvent altTblBatch3 = (BatchPartitionEvent) batchEvents.get(3);
+      assertEquals(2, altTblBatch3.getNumberOfEvents());
+      assertEquals("table_before_rename", altTblBatch3.getTableName());
+      BatchPartitionEvent altTblBatch4 = (BatchPartitionEvent) batchEvents.get(4);
+      assertEquals(2, altTblBatch4.getNumberOfEvents());
+      assertEquals("table_after_rename", altTblBatch4.getTableName());
+      BatchPartitionEvent altTblBatch5 = (BatchPartitionEvent) batchEvents.get(5);
+      assertEquals(2, altTblBatch5.getNumberOfEvents());
+      assertEquals("other_table", altTblBatch5.getTableName());
     }
     // make sure 2 events of different event types are not batched together
     long startEventId = lastSyncedEventId + 117;