You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bi...@apache.org on 2021/10/07 01:39:11 UTC

[impala] 01/04: IMPALA-9857: Batching of consecutive partition events

This is an automated email from the ASF dual-hosted git repository.

bikram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d8d44f3f147ce0f98bdd9e0387ae080010f55965
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Wed Sep 8 14:24:49 2021 -0700

    IMPALA-9857: Batching of consecutive partition events
    
    This patch improves the performance of events processor
    by batching together consecutive ALTER_PARTITION or
    INSERT events. Currently, without this patch, if
    the events stream consists of a lot of consecutive
    ALTER_PARTITION events which cannot be skipped,
    events processor will refresh partition from each
    event one by one. Similarly, in case of INSERT events
    in a partition events processor refresh one partition
    at a time.
    
    By batching together such consecutive ALTER_PARTITION or
    INSERT events, events processor needs to take lock on the table
    only once per batch and can refresh all the partitions from
    the events using multiple threads. For transactional (acid)
    tables, this provides even significant performance gain
    since currently we refresh the whole table in case of
    ALTER_PARTITION or INSERT partition events. By batching them
    together, events processor will refresh the table once per
    batch.
    
    The batch of eligible ALTER_PARTITION and INSERT events will
    be processed as ALTER_PARTITIONS and INSERT_PARTITIONS event
    respectively.
    
    Performance tests:
    In order to simulate bunch of ALTER_PARTITION and INSERT
    events, a simple test was performed by running the following
    query from hive:
    insert into store_sales_copy partition(ss_sold_date_sk)
    select * from store_sales;
    
    This query generates 1824 ALTER_PARTITION and 1824 INSERT
    events and time taken to process all the events generated
    was measured before and after the patch for external and
    ACID table.
    
    Table Type              Before          After
    ======================================================
    External table          75 sec          25 sec
    ACID tables             313 sec         47 sec
    
    Additionally, the patch also fixes a minor bug in
    evaluateSelfEvent() method which should return false when
    serviceId does not match.
    
    Testing Done:
    1. Added new tests which cover the batching logic of events.
    2. Exhaustive tests.
    
    Change-Id: I5d27a68a64436d31731e9a219b1efd6fc842de73
    Reviewed-on: http://gerrit.cloudera.org:8080/17848
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Sourabh Goyal <so...@cloudera.com>
    Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
---
 .../impala/catalog/CatalogServiceCatalog.java      |  72 ++--
 .../org/apache/impala/catalog/HdfsPartition.java   |   7 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  | 101 ++++-
 .../impala/catalog/events/MetastoreEvents.java     | 477 +++++++++++++++++----
 .../catalog/events/MetastoreEventsProcessor.java   |   5 +-
 .../impala/catalog/events/SelfEventContext.java    |  38 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  63 +--
 .../events/MetastoreEventsProcessorTest.java       | 249 ++++++++++-
 tests/custom_cluster/test_events_custom_configs.py | 135 ++++--
 tests/metadata/test_event_processing.py            |   1 +
 tests/util/event_processor_utils.py                |   9 +-
 11 files changed, 941 insertions(+), 216 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 5129134..34eea1c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -20,6 +20,7 @@ package org.apache.impala.catalog;
 import static org.apache.impala.thrift.TCatalogObjectType.HDFS_PARTITION;
 import static org.apache.impala.thrift.TCatalogObjectType.TABLE;
 
+import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -48,9 +49,9 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.authorization.AuthorizationDelta;
@@ -62,15 +63,12 @@ import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.events.ExternalEventsProcessor;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
-import org.apache.impala.catalog.events.NoOpEventProcessor;
 import org.apache.impala.catalog.events.SelfEventContext;
 import org.apache.impala.catalog.monitor.CatalogMonitor;
 import org.apache.impala.catalog.monitor.CatalogTableMetrics;
-import org.apache.impala.catalog.monitor.CatalogMonitor;
 import org.apache.impala.catalog.metastore.CatalogMetastoreServer;
 import org.apache.impala.catalog.metastore.HmsApiNameEnum;
 import org.apache.impala.catalog.metastore.ICatalogMetastoreServer;
-import org.apache.impala.catalog.metastore.NoOpCatalogMetastoreServer;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
@@ -936,31 +934,32 @@ public class CatalogServiceCatalog extends Catalog {
    * evaluate if this is a self-event or not
    * @return true if given event information evaluates to a self-event, false otherwise
    */
-  public boolean evaluateSelfEvent(boolean isInsertEvent, SelfEventContext ctx)
-      throws CatalogException {
+  public boolean evaluateSelfEvent(SelfEventContext ctx) throws CatalogException {
     Preconditions.checkState(isEventProcessingActive(),
         "Event processing should be enabled when calling this method");
+    boolean isInsertEvent = ctx.isInsertEventContext();
     long versionNumber =
-        isInsertEvent ? ctx.getIdFromEvent() : ctx.getVersionNumberFromEvent();
+        isInsertEvent ? ctx.getInsertEventId(0) : ctx.getVersionNumberFromEvent();
     String serviceIdFromEvent = ctx.getServiceIdFromEvent();
 
     if (!isInsertEvent) {
       // no version info or service id in the event
       if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) {
-        LOG.info("Not a self-event since the given version is {} and service id is {}",
+        LOG.debug("Not a self-event since the given version is {} and service id is {}",
             versionNumber, serviceIdFromEvent.isEmpty() ? "empty" : serviceIdFromEvent);
         return false;
       }
       // if the service id from event doesn't match with our service id this is not a
       // self-event
       if (!getCatalogServiceId().equals(serviceIdFromEvent)) {
-        LOG.info("Not a self-event because service id of this catalog {} does not match "
+        LOG.debug("Not a self-event because service id of this catalog {} does not match "
                 + "with one in event {}.",
             getCatalogServiceId(), serviceIdFromEvent);
+        return false;
       }
     } else if (versionNumber == -1) {
       // if insert event, we only compare eventId
-      LOG.info("Not a self-event because eventId is {}", versionNumber);
+      LOG.debug("Not a self-event because eventId is {}", versionNumber);
       return false;
     }
     Db db = getDb(ctx.getDbName());
@@ -978,7 +977,7 @@ public class CatalogServiceCatalog extends Catalog {
       try {
         boolean removed = db.removeFromVersionsForInflightEvents(versionNumber);
         if (!removed) {
-          LOG.info("Could not find version {} in the in-flight event list of database "
+          LOG.debug("Could not find version {} in the in-flight event list of database "
               + "{}", versionNumber, db.getName());
         }
         return removed;
@@ -1005,27 +1004,30 @@ public class CatalogServiceCatalog extends Catalog {
         boolean removed =
             tbl.removeFromVersionsForInflightEvents(isInsertEvent, versionNumber);
         if (!removed) {
-          LOG.info("Could not find {} {} in in-flight event list of table {}",
+          LOG.debug("Could not find {} {} in in-flight event list of table {}",
               isInsertEvent ? "eventId" : "version", versionNumber, tbl.getFullName());
         }
         return removed;
       }
       if (tbl instanceof HdfsTable) {
         List<String> failingPartitions = new ArrayList<>();
-        for (List<TPartitionKeyValue> partitionKeyValue : partitionKeyValues) {
+        int len = partitionKeyValues.size();
+        for (int i=0; i<len; ++i) {
+          List<TPartitionKeyValue> partitionKeyValue = partitionKeyValues.get(i);
+          versionNumber = isInsertEvent ? ctx.getInsertEventId(i) : versionNumber;
           HdfsPartition hdfsPartition =
               ((HdfsTable) tbl).getPartitionFromThriftPartitionSpec(partitionKeyValue);
           if (hdfsPartition == null
-              || !hdfsPartition.removeFromVersionsForInflightEvents(
-                     isInsertEvent, versionNumber)) {
+              || !hdfsPartition.removeFromVersionsForInflightEvents(isInsertEvent,
+                  versionNumber)) {
             // even if this is an error condition we should not bail out early since we
             // should clean up the self-event state on the rest of the partitions
             String partName = HdfsTable.constructPartitionName(partitionKeyValue);
             if (hdfsPartition == null) {
-              LOG.info("Partition {} not found during self-event "
-                + "evaluation for the table {}", partName, tbl.getFullName());
+              LOG.debug("Partition {} not found during self-event "
+                  + "evaluation for the table {}", partName, tbl.getFullName());
             } else {
-              LOG.info("Could not find {} in in-flight event list of the partition {} "
+              LOG.trace("Could not find {} in in-flight event list of the partition {} "
                   + "of table {}", versionNumber, partName, tbl.getFullName());
             }
             failingPartitions.add(partName);
@@ -1045,8 +1047,8 @@ public class CatalogServiceCatalog extends Catalog {
    * @param isInsertEvent if false add versionNumber for DDL Event, otherwise add eventId
    * for Insert Event.
    * @param tbl Catalog table
-   * @param versionNumber when isInsertEvent is true, it is eventId to add
-   * when isInsertEvent is false, it is version number to add
+   * @param versionNumber when isInsertEventContext is true, it is eventId to add
+   * when isInsertEventContext is false, it is version number to add
    */
   public void addVersionsForInflightEvents(
       boolean isInsertEvent, Table tbl, long versionNumber) {
@@ -2568,30 +2570,6 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
-   * Refresh partition if it exists.
-   *
-   * @return true if partition was reloaded, else false.
-   * @throws CatalogException if partition reload threw an error.
-   * @throws DatabaseNotFoundException if Db doesn't exist.
-   * @throws TableNotFoundException if table doesn't exist.
-   * @throws TableNotLoadedException if table is not loaded in Catalog.
-   */
-  public boolean reloadPartitionIfExists(String dbName, String tblName,
-      List<TPartitionKeyValue> tPartSpec, String reason) throws CatalogException {
-    Table table = getTable(dbName, tblName);
-    if (table == null) {
-      throw new TableNotFoundException(dbName + "." + tblName + " not found");
-    }
-    if (table instanceof IncompleteTable) {
-      throw new TableNotLoadedException(dbName + "." + tblName + " is not loaded");
-    }
-    Reference<Boolean> wasPartitionRefreshed = new Reference<>(false);
-    reloadPartition(table, tPartSpec, wasPartitionRefreshed,
-        CatalogObject.ThriftObjectType.NONE, reason);
-    return wasPartitionRefreshed.getRef();
-  }
-
-  /**
    * Refresh table if exists. Returns true if reloadTable() succeeds, false
    * otherwise.
    */
@@ -2986,7 +2964,11 @@ public class CatalogServiceCatalog extends Catalog {
         throw new CatalogException("Error loading metadata for partition: "
             + hdfsTable.getFullName() + " " + partitionName, e);
       }
-      hdfsTable.reloadPartition(msClient.getHiveClient(), hdfsPartition, hmsPartition);
+      Map<Partition, HdfsPartition> hmsPartToHdfsPart = new HashMap<>();
+      // note that hdfsPartition can be null here which is a valid input argument
+      // in such a case a new hdfsPartition is added and nothing is removed.
+      hmsPartToHdfsPart.put(hmsPartition, hdfsPartition);
+      hdfsTable.reloadPartitions(msClient.getHiveClient(), hmsPartToHdfsPart);
     }
     hdfsTable.setCatalogVersion(newCatalogVersion);
     wasPartitionReloaded.setRef(true);
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 81920be..25f8aea 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -875,7 +875,8 @@ public class HdfsPartition extends CatalogObjectImpl
    * Insert events. If false, remove version number from list of versions for in-flight
    * DDL events.
    * @param versionNumber when isInsertEvent is true, it's eventId to remove
-   *                      when isInsertEvent is false, it's version number to remove
+   *                      when isInsertEvent is false, it's version number to
+   *                      remove.
    * @return true if the versionNumber was removed, false if it didn't exist
    */
   public boolean removeFromVersionsForInflightEvents(
@@ -885,8 +886,8 @@ public class HdfsPartition extends CatalogObjectImpl
             + "partition " + getPartitionName() + " of table " + table_.getFullName());
     boolean ret = inFlightEvents_.remove(isInsertEvent, versionNumber);
     if (!ret) {
-      LOG.debug("Remove of in-flight version number failed for {}: {}", versionNumber,
-          inFlightEvents_.print());
+      LOG.trace("Failed to remove in-flight version number {}: in-flight events: {}",
+          versionNumber, inFlightEvents_.print());
     }
     return ret;
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 4424149..8959344 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -2674,25 +2676,90 @@ public class HdfsTable extends Table implements FeFsTable {
   }
 
   /**
-   * Reloads the metadata of partition 'oldPartition' by removing
-   * it from the table and reconstructing it from the HMS partition object
-   * 'hmsPartition'. If old partition is null then nothing is removed and
-   * and partition constructed from 'hmsPartition' is simply added.
+   * Generates a debug string useful for logging purposes. It returns a string consisting
+   * of names from the given list until the limit and then appends the count of the
+   * remaining items.
    */
-  public void reloadPartition(IMetaStoreClient client, HdfsPartition oldPartition,
-      Partition hmsPartition) throws CatalogException {
-    HdfsPartition.Builder partBuilder = createPartitionBuilder(
-        hmsPartition.getSd(), hmsPartition, new FsPermissionCache());
-    Preconditions.checkArgument(oldPartition == null
-        || HdfsPartition.comparePartitionKeyValues(
-            oldPartition.getPartitionValues(), partBuilder.getPartitionValues()) == 0);
-    if (oldPartition != null) {
-      partBuilder.setFileDescriptors(oldPartition);
+  private static String generateDebugStr(List<String> items, int limit) {
+    String result = Joiner.on(',').join(Iterables.limit(items, limit));
+    if (items.size() > limit) {
+      result = String.format("%s... and %s others", result, items.size()-limit);
+    }
+    return result;
+  }
+
+  /**
+   * Reloads the HdfsPartitions which correspond to the given partNames. Returns the
+   * number of partitions which were reloaded.
+   */
+  public int reloadPartitionsFromNames(IMetaStoreClient client,
+      List<String> partNames, String reason) throws CatalogException {
+    Preconditions.checkState(partNames != null && !partNames.isEmpty());
+    LOG.info(String.format("Reloading partition metadata: %s %s (%s)",
+        getFullName(), generateDebugStr(partNames, 3), reason));
+    List<Partition> hmsPartitions;
+    Map<Partition, HdfsPartition> hmsPartToHdfsPart = new HashMap<>();
+    try {
+      hmsPartitions = client.getPartitionsByNames(getDb().getName(),
+          getName(), partNames);
+      for (Partition partition : hmsPartitions) {
+        List<LiteralExpr> partExprs = getTypeCompatiblePartValues(partition.getValues());
+        HdfsPartition hdfsPartition = getPartition(partExprs);
+        if (hdfsPartition != null) {
+          hmsPartToHdfsPart.put(partition, hdfsPartition);
+        }
+      }
+      reloadPartitions(client, hmsPartToHdfsPart);
+      return hmsPartToHdfsPart.size();
+    } catch (NoSuchObjectException e) {
+      // HMS throws a NoSuchObjectException if the table does not exist
+      // in HMS anymore. In case the partitions don't exist in HMS it does not include
+      // them in the result of getPartitionsByNames.
+      throw new TableLoadingException(
+          "Error when reloading partitions for table " + getFullName(), e);
+    } catch (TException | UnsupportedEncodingException e2) {
+      throw new CatalogException(
+          "Unexpected error while retrieving partitions for table " + getFullName(), e2);
+    }
+  }
+
+  /**
+   * Reloads the metadata of partitions given by a map of HMS Partitions to existing (old)
+   * HdfsPartitions.
+   * @param hmsPartsToHdfsParts The map of HMS partition object to the old HdfsPartition.
+   *                            Every key-value in this map represents the HdfsPartition
+   *                            which needs to be removed from the table and
+   *                            reconstructed from the HMS partition key. If the
+   *                            value for a given partition key is null then nothing is
+   *                            removed and a new HdfsPartition is simply added.
+   */
+  public void reloadPartitions(IMetaStoreClient client,
+      Map<Partition, HdfsPartition> hmsPartsToHdfsParts) throws CatalogException {
+    FsPermissionCache permissionCache = new FsPermissionCache();
+    Map<HdfsPartition.Builder, HdfsPartition> partBuilderToPartitions = new HashMap<>();
+    for (Map.Entry<Partition, HdfsPartition> entry : hmsPartsToHdfsParts.entrySet()) {
+      Partition hmsPartition = entry.getKey();
+      HdfsPartition oldPartition = entry.getValue();
+      HdfsPartition.Builder partBuilder = createPartitionBuilder(
+          hmsPartition.getSd(), hmsPartition, permissionCache);
+      Preconditions.checkArgument(oldPartition == null
+          || HdfsPartition.comparePartitionKeyValues(
+          oldPartition.getPartitionValues(), partBuilder.getPartitionValues()) == 0);
+      if (oldPartition != null) {
+        partBuilder.setFileDescriptors(oldPartition);
+      }
+      partBuilderToPartitions.put(partBuilder, oldPartition);
+    }
+    // load file metadata in parallel
+    loadFileMetadataForPartitions(client,
+        partBuilderToPartitions.keySet(),/*isRefresh=*/true);
+    for (Map.Entry<HdfsPartition.Builder, HdfsPartition> entry :
+        partBuilderToPartitions.entrySet()) {
+      if (entry.getValue() != null) {
+        dropPartition(entry.getValue(), false);
+      }
+      addPartition(entry.getKey().build());
     }
-    loadFileMetadataForPartitions(client, ImmutableList.of(partBuilder),
-        /*isRefresh=*/true);
-    dropPartition(oldPartition, false);
-    addPartition(partBuilder.build());
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index f9e83f7..93d3af0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -102,8 +102,10 @@ public class MetastoreEvents {
     ALTER_DATABASE("ALTER_DATABASE"),
     ADD_PARTITION("ADD_PARTITION"),
     ALTER_PARTITION("ALTER_PARTITION"),
+    ALTER_PARTITIONS("ALTER_PARTITIONS"),
     DROP_PARTITION("DROP_PARTITION"),
     INSERT("INSERT"),
+    INSERT_PARTITIONS("INSERT_PARTITIONS"),
     OTHER("OTHER");
 
     private final String eventType_;
@@ -252,9 +254,52 @@ public class MetastoreEvents {
           + "filtered out: %d", sizeBefore, numFilteredEvents));
       metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
               .inc(numFilteredEvents);
-      LOG.info("Incremented skipped metric to " + metrics_
+      LOG.debug("Incremented skipped metric to " + metrics_
           .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
-      return metastoreEvents;
+      return createBatchEvents(metastoreEvents);
+    }
+
+    /**
+     * This method batches together any eligible consecutive elements from the given
+     * list of {@code MetastoreEvent}. The returned list may or may not contain batch
+     * events depending on whether it finds any events which could be batched together.
+     */
+    @VisibleForTesting
+    List<MetastoreEvent> createBatchEvents(List<MetastoreEvent> events) {
+      if (events.size() < 2) return events;
+      int i = 0, j = 1;
+      List<MetastoreEvent> batchedEventList = new ArrayList<>();
+      MetastoreEvent current = events.get(i);
+      // startEventId points to the current event's or the start of the batch
+      // in case current is a batch event.
+      long startEventId = current.getEventId();
+      while (j < events.size()) {
+        MetastoreEvent next = events.get(j);
+        // check if the current metastore event and the next can be batched together
+        if (!current.canBeBatched(next)) {
+          // events cannot be batched, add the current event under consideration to the
+          // list and update current to the next
+          if (current.getNumberOfEvents() > 1) {
+            current.infoLog("Created a batch event for {} events from {} to {}",
+                current.getNumberOfEvents(), startEventId, current.getEventId());
+            metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_BATCH_EVENTS).inc();
+          }
+          batchedEventList.add(current);
+          current = next;
+          startEventId = next.getEventId();
+        } else {
+          // next can be batched into current event
+          current = Preconditions.checkNotNull(current.addToBatchEvents(next));
+        }
+        j++;
+      }
+      if (current.getNumberOfEvents() > 1) {
+        current.infoLog("Created a batch event for {} events from {} to {}",
+            current.getNumberOfEvents(), startEventId, current.getEventId());
+        metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_BATCH_EVENTS).inc();
+      }
+      batchedEventList.add(current);
+      return batchedEventList;
     }
   }
 
@@ -293,10 +338,11 @@ public class MetastoreEvents {
     protected final String tblName_;
 
     // eventId of the event. Used instead of calling getter on event_ everytime
-    protected long eventId_;
+    private long eventId_;
 
-    // eventType from the NotificationEvent
-    protected final MetastoreEventType eventType_;
+    // eventType from the NotificationEvent or in case of batch events set using
+    // the setter for this field
+    private MetastoreEventType eventType_;
 
     // Actual notificationEvent object received from Metastore
     protected final NotificationEvent metastoreNotificationEvent_;
@@ -321,6 +367,17 @@ public class MetastoreEvents {
 
     public long getEventId() { return eventId_; }
 
+    public MetastoreEventType getEventType() { return eventType_; }
+
+    /**
+     * Certain events like {@link BatchPartitionEvent} batch a group of events
+     * to create a batch event type. This method is used to override the event type
+     * in such cases since the event type is not really derived from NotificationEvent.
+     */
+    public void setEventType(MetastoreEventType type) {
+      this.eventType_ = type;
+    }
+
     public String getCatalogName() { return catalogName_; }
 
     public String getDbName() { return dbName_; }
@@ -336,9 +393,9 @@ public class MetastoreEvents {
     public void processIfEnabled()
         throws CatalogException, MetastoreNotificationException {
       if (isEventProcessingDisabled()) {
-        LOG.info(debugString("Skipping this event because of flag evaluation"));
+        infoLog("Skipping this event because of flag evaluation");
         metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-        infoLog("Incremented skipped metric to " + metrics_
+        debugLog("Incremented skipped metric to " + metrics_
             .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
         return;
       }
@@ -346,6 +403,45 @@ public class MetastoreEvents {
     }
 
     /**
+     * Checks if the given event can be batched into this event. Default behavior is
+     * to return false which can be overridden by a sub-class.
+     *
+     * @param event The event under consideration to be batched into this event.
+     * @return false if event cannot be batched into this event; otherwise true.
+     */
+    protected boolean canBeBatched(MetastoreEvent event) { return false; }
+
+    /**
+     * Adds the given event into the batch of events represented by this event. Default
+     * implementation is to return null. Sub-classes must override this method to
+     * implement batching.
+     *
+     * @param event The event which needs to be added to the batch.
+     * @return The batch event which represents all the events batched into this event
+     * until now including the given event.
+     */
+    protected MetastoreEvent addToBatchEvents(MetastoreEvent event) { return null; }
+
+    /**
+     * Returns the number of events represented by this event. For most events this is
+     * 1. In case of batch events this could be more than 1.
+     */
+    protected int getNumberOfEvents() { return 1; }
+
+    /**
+     * Certain events like ALTER_TABLE or ALTER_PARTITION implement logic to ignore
+     * some events because they are not interesting from catalogd's perspective.
+     * @return true if this event can be skipped.
+     */
+    protected boolean canBeSkipped() { return false; }
+
+    /**
+     * In case of batch events, this method can be used override the {@code eventType_}
+     * field which is used for logging purposes.
+     */
+    protected MetastoreEventType getBatchEventType() { return null; }
+
+    /**
      * Process the information available in the NotificationEvent to take appropriate
      * action on Catalog
      *
@@ -376,8 +472,8 @@ public class MetastoreEvents {
      */
     private Object[] getLogFormatArgs(Object[] args) {
       Object[] formatArgs = new Object[args.length + 2];
-      formatArgs[0] = eventId_;
-      formatArgs[1] = eventType_;
+      formatArgs[0] = getEventId();
+      formatArgs[1] = getEventType();
       int i = 2;
       for (Object arg : args) {
         formatArgs[i] = arg;
@@ -412,6 +508,17 @@ public class MetastoreEvents {
     }
 
     /**
+     * Similar to infoLog excepts logs at trace level
+     */
+    protected void traceLog(String logFormattedStr, Object... args) {
+      if (!LOG.isTraceEnabled()) return;
+      String formatString =
+          new StringBuilder(LOG_FORMAT_EVENT_ID_TYPE).append(logFormattedStr).toString();
+      Object[] formatArgs = getLogFormatArgs(args);
+      LOG.trace(formatString, formatArgs);
+    }
+
+    /**
      * Search for a inverse event (for example drop_table is a inverse event for
      * create_table) for this event from a given list of notificationEvents starting for
      * the startIndex. This is useful for skipping certain events from processing
@@ -450,16 +557,15 @@ public class MetastoreEvents {
      * serviceId and version. More details on complete flow of self-event handling
      * logic can be read in <code>MetastoreEventsProcessor</code> documentation.
      *
-     * @param isInsertEvent if true, check in flight events list of Insert event
-     * if false, check events list of DDL
      * @return True if this event is a self-generated event. If the returned value is
      * true, this method also clears the version number from the catalog database/table.
      * Returns false if the version numbers or service id don't match
      */
-    protected boolean isSelfEvent(boolean isInsertEvent) {
+    protected boolean isSelfEvent() {
       try {
-        if (catalog_.evaluateSelfEvent(isInsertEvent, getSelfEventContext())) {
-          metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+        if (catalog_.evaluateSelfEvent(getSelfEventContext())) {
+          metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+              .inc(getNumberOfEvents());
           infoLog("Incremented events skipped counter to {}",
               metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
                   .getCount());
@@ -471,8 +577,6 @@ public class MetastoreEvents {
       }
       return false;
     }
-
-    protected boolean isSelfEvent() { return isSelfEvent(false); }
   }
 
   public static String getStringProperty(
@@ -492,18 +596,15 @@ public class MetastoreEvents {
     // case of alter events
     protected org.apache.hadoop.hive.metastore.api.Table msTbl_;
 
+    // in case of partition batch events, this method can be overridden to return
+    // the partition object from the events which are batched together.
+    protected Partition getPartitionForBatching() { return null; }
+
     private MetastoreTableEvent(CatalogOpExecutor catalogOpExecutor,
         Metrics metrics, NotificationEvent event) {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkNotNull(dbName_, debugString("Database name cannot be null"));
+      Preconditions.checkNotNull(dbName_, "Database name cannot be null");
       tblName_ = Preconditions.checkNotNull(event.getTableName());
-      if (MetastoreEventType.OTHER.equals(eventType_)) {
-        debugLog("Creating event {} of type {} ({}) on table {}", eventId_, eventType_,
-            event.getEventType(), getFullyQualifiedTblName());
-      } else {
-        debugLog("Creating event {} of type {} on table {}", eventId_, eventType_,
-            getFullyQualifiedTblName());
-      }
     }
 
 
@@ -618,23 +719,21 @@ public class MetastoreEvents {
     }
 
     /**
-     * Refreshes a partition provided by given spec only if the table is loaded
-     * @param partition the Partition object which needs to be reloaded.
-     * @param reason Event type which caused the refresh, used for logging by catalog
-     * @return false if the table or database did not exist or was not loaded, else
-     * returns true.
-     * @throws CatalogException
+     * Reloads the partitions provided, only if the table is loaded and if the partitions
+     * exist in catalogd.
+     * @param partitions the list of Partition objects which need to be reloaded.
+     * @param reason The reason for reload operation which is used for logging by
+     *               catalogd.
      */
-    protected boolean reloadPartition(Partition partition, String reason)
+    protected void reloadPartitions(List<Partition> partitions, String reason)
         throws CatalogException {
       try {
-        boolean result = catalogOpExecutor_
-            .reloadPartitionIfExists(eventId_, dbName_, tblName_, partition, reason);
-        if (result) {
+        int numPartsRefreshed = catalogOpExecutor_.reloadPartitionsIfExist(getEventId(),
+            dbName_, tblName_, partitions, reason);
+        if (numPartsRefreshed > 0) {
           metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_PARTITION_REFRESHES)
-              .inc();
+              .inc(numPartsRefreshed);
         }
-        return result;
       } catch (TableNotLoadedException e) {
         debugLog("Ignoring the event since table {} is not loaded",
             getFullyQualifiedTblName());
@@ -642,7 +741,6 @@ public class MetastoreEvents {
         debugLog("Ignoring the event since table {} is not found",
             getFullyQualifiedTblName());
       }
-      return false;
     }
   }
 
@@ -654,8 +752,8 @@ public class MetastoreEvents {
         NotificationEvent event) {
       super(catalogOpExecutor, metrics, event);
       Preconditions.checkNotNull(dbName_, debugString("Database name cannot be null"));
-      debugLog("Creating event {} of type {} on database {}", eventId_,
-              eventType_, dbName_);
+      debugLog("Creating event {} of type {} on database {}", getEventId(),
+              getEventType(), dbName_);
     }
 
     /**
@@ -685,7 +783,7 @@ public class MetastoreEvents {
     private CreateTableEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkArgument(MetastoreEventType.CREATE_TABLE.equals(eventType_));
+      Preconditions.checkArgument(MetastoreEventType.CREATE_TABLE.equals(getEventType()));
       Preconditions
           .checkNotNull(event.getMessage(), debugString("Event message is null"));
       CreateTableMessage createTableMessage =
@@ -719,12 +817,12 @@ public class MetastoreEvents {
       // a self-event (see description of self-event in the class documentation of
       // MetastoreEventsProcessor)
       try {
-        if (catalogOpExecutor_.addTableIfNotRemovedLater(eventId_, msTbl_)) {
+        if (catalogOpExecutor_.addTableIfNotRemovedLater(getEventId(), msTbl_)) {
           infoLog("Successfully added table {}", getFullyQualifiedTblName());
           metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_ADDED).inc();
         } else {
           metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-          infoLog("Incremented skipped metric to " + metrics_
+          debugLog("Incremented skipped metric to " + metrics_
               .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
         }
       } catch (CatalogException e) {
@@ -745,7 +843,7 @@ public class MetastoreEvents {
           if (dbName_.equalsIgnoreCase(dropTableEvent.dbName_) && tblName_
               .equalsIgnoreCase(dropTableEvent.tblName_)) {
             infoLog("Found table {} is removed later in event {} type {}", tblName_,
-                dropTableEvent.eventId_, dropTableEvent.eventType_);
+                dropTableEvent.getEventId(), dropTableEvent.getEventType());
             return true;
           }
         } else if (event.eventType_.equals(MetastoreEventType.ALTER_TABLE)) {
@@ -762,7 +860,7 @@ public class MetastoreEvents {
               && dbName_.equalsIgnoreCase(alterTableEvent.msTbl_.getDbName())
               && tblName_.equalsIgnoreCase(alterTableEvent.msTbl_.getTableName())) {
             infoLog("Found table {} is renamed later in event {} type {}", tblName_,
-                alterTableEvent.eventId_, alterTableEvent.eventType_);
+                alterTableEvent.getEventId(), alterTableEvent.getEventType());
             return true;
           }
         }
@@ -791,7 +889,7 @@ public class MetastoreEvents {
     InsertEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkArgument(MetastoreEventType.INSERT.equals(eventType_));
+      Preconditions.checkArgument(MetastoreEventType.INSERT.equals(getEventType()));
       InsertMessage insertMessage =
           MetastoreEventsProcessor.getMessageDeserializer()
               .getInsertMessage(event.getMessage());
@@ -805,23 +903,60 @@ public class MetastoreEvents {
     }
 
     @Override
+    protected MetastoreEventType getBatchEventType() {
+      return MetastoreEventType.INSERT_PARTITIONS;
+    }
+
+    @Override
+    protected Partition getPartitionForBatching() { return insertPartition_; }
+
+    @Override
+    public boolean canBeBatched(MetastoreEvent event) {
+      if (!(event instanceof InsertEvent)) return false;
+      InsertEvent insertEvent = (InsertEvent) event;
+      // batched events must have consecutive event ids
+      if (event.getEventId() != 1 + getEventId()) return false;
+      // make sure that the event is on the same table
+      if (!getFullyQualifiedTblName().equalsIgnoreCase(
+          insertEvent.getFullyQualifiedTblName())) {
+        return false;
+      }
+      // we currently only batch partition level insert events
+      if (this.insertPartition_ == null || insertEvent.insertPartition_ == null) {
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public MetastoreEvent addToBatchEvents(MetastoreEvent event) {
+      if (!(event instanceof InsertEvent)) return null;
+      BatchPartitionEvent<InsertEvent> batchEvent = new BatchPartitionEvent<>(
+          this);
+      Preconditions.checkState(batchEvent.canBeBatched(event));
+      batchEvent.addToBatchEvents(event);
+      return batchEvent;
+    }
+
+    @Override
     public SelfEventContext getSelfEventContext() {
       if (insertPartition_ != null) {
         // create selfEventContext for insert partition event
         List<TPartitionKeyValue> tPartSpec =
             getTPartitionSpecFromHmsPartition(msTbl_, insertPartition_);
         return new SelfEventContext(dbName_, tblName_, Arrays.asList(tPartSpec),
-            insertPartition_.getParameters(), eventId_);
+            insertPartition_.getParameters(), Arrays.asList(getEventId()));
       } else {
         // create selfEventContext for insert table event
         return new SelfEventContext(
-            dbName_, tblName_, null, msTbl_.getParameters(), eventId_);
+            dbName_, tblName_, null, msTbl_.getParameters(),
+            Arrays.asList(getEventId()));
       }
     }
 
     @Override
     public void process() throws MetastoreNotificationException {
-      if (isSelfEvent(true)) {
+      if (isSelfEvent()) {
         infoLog("Not processing the event as it is a self-event");
         return;
       }
@@ -847,7 +982,7 @@ public class MetastoreEvents {
         // Ignore event if table or database is not in catalog. Throw exception if
         // refresh fails. If the partition does not exist in metastore the reload
         // method below removes it from the catalog
-        reloadPartition(insertPartition_, "INSERT");
+        reloadPartitions(Arrays.asList(insertPartition_), "INSERT event");
       } catch (CatalogException e) {
         throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
                 + "partition on table {} partition {} failed. Event processing cannot "
@@ -862,9 +997,9 @@ public class MetastoreEvents {
      */
     private void processTableInserts() throws MetastoreNotificationException {
       // For non-partitioned tables, refresh the whole table.
-      Preconditions.checkArgument(insertPartition_ == null);
+      Preconditions.checkState(insertPartition_ == null);
       try {
-        reloadTableFromCatalog("INSERT", false);
+        reloadTableFromCatalog("INSERT event", false);
       } catch (CatalogException e) {
         throw new MetastoreNotificationNeedsInvalidateException(
             debugString("Refresh table {} failed. Event processing "
@@ -897,7 +1032,7 @@ public class MetastoreEvents {
     AlterTableEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkArgument(MetastoreEventType.ALTER_TABLE.equals(eventType_));
+      Preconditions.checkArgument(MetastoreEventType.ALTER_TABLE.equals(getEventType()));
       JSONAlterTableMessage alterTableMessage =
           (JSONAlterTableMessage) MetastoreEventsProcessor.getMessageDeserializer()
               .getAlterTableMessage(event.getMessage());
@@ -937,7 +1072,7 @@ public class MetastoreEvents {
       Reference<Boolean> oldTblRemoved = new Reference<>();
       Reference<Boolean> newTblAdded = new Reference<>();
       catalogOpExecutor_
-          .renameTableFromEvent(eventId_, tableBefore_, tableAfter_, oldTblRemoved,
+          .renameTableFromEvent(getEventId(), tableBefore_, tableAfter_, oldTblRemoved,
               newTblAdded);
 
       if (oldTblRemoved.getRef()) {
@@ -948,7 +1083,7 @@ public class MetastoreEvents {
       }
       if (!oldTblRemoved.getRef() || !newTblAdded.getRef()) {
         metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-        infoLog("Incremented skipped metric to " + metrics_
+        debugLog("Incremented skipped metric to " + metrics_
             .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
       }
     }
@@ -1011,7 +1146,8 @@ public class MetastoreEvents {
       return false;
     }
 
-    private boolean canBeSkipped() {
+    @Override
+    protected boolean canBeSkipped() {
       // Certain alter events just modify some parameters such as
       // "transient_lastDdlTime" in Hive. For eg: the alter table event generated
       // along with insert events. Check if the alter table event is such a trivial
@@ -1070,7 +1206,7 @@ public class MetastoreEvents {
     private DropTableEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(eventType_));
+      Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(getEventType()));
       JSONDropTableMessage dropTableMessage =
           (JSONDropTableMessage) MetastoreEventsProcessor.getMessageDeserializer()
               .getDropTableMessage(event.getMessage());
@@ -1109,14 +1245,14 @@ public class MetastoreEvents {
       Reference<Boolean> tblRemovedLater = new Reference<>();
       boolean removedTable;
       removedTable = catalogOpExecutor_
-          .removeTableIfNotAddedLater(eventId_, msTbl_.getDbName(),
+          .removeTableIfNotAddedLater(getEventId(), msTbl_.getDbName(),
               msTbl_.getTableName(), tblRemovedLater);
       if (removedTable) {
         infoLog("Successfully removed table {}", getFullyQualifiedTblName());
         metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_REMOVED).inc();
       } else {
         metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-        infoLog("Incremented skipped metric to " + metrics_
+        debugLog("Incremented skipped metric to " + metrics_
             .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
       }
     }
@@ -1137,7 +1273,8 @@ public class MetastoreEvents {
     private CreateDatabaseEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkArgument(MetastoreEventType.CREATE_DATABASE.equals(eventType_));
+      Preconditions.checkArgument(
+          MetastoreEventType.CREATE_DATABASE.equals(getEventType()));
       JSONCreateDatabaseMessage createDatabaseMessage =
           (JSONCreateDatabaseMessage) MetastoreEventsProcessor.getMessageDeserializer()
               .getCreateDatabaseMessage(event.getMessage());
@@ -1168,16 +1305,16 @@ public class MetastoreEvents {
     @Override
     public void process() {
       boolean dbAdded = catalogOpExecutor_
-          .addDbIfNotRemovedLater(eventId_, createdDatabase_);
+          .addDbIfNotRemovedLater(getEventId(), createdDatabase_);
       if (!dbAdded) {
         debugLog(
             "Database {} was not added since it either exists or was "
                 + "removed since the event was generated", dbName_);
         metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-        infoLog("Incremented skipped metric to " + metrics_
+        debugLog("Incremented skipped metric to " + metrics_
             .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
       } else {
-        debugLog("Successfully added database {}", dbName_);
+        infoLog("Successfully added database {}", dbName_);
         metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_DATABASES_ADDED).inc();
       }
     }
@@ -1196,7 +1333,8 @@ public class MetastoreEvents {
     private AlterDatabaseEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkArgument(MetastoreEventType.ALTER_DATABASE.equals(eventType_));
+      Preconditions.checkArgument(
+          MetastoreEventType.ALTER_DATABASE.equals(getEventType()));
       JSONAlterDatabaseMessage alterDatabaseMessage =
           (JSONAlterDatabaseMessage) MetastoreEventsProcessor.getMessageDeserializer()
               .getAlterDatabaseMessage(event.getMessage());
@@ -1252,7 +1390,8 @@ public class MetastoreEvents {
         CatalogOpExecutor catalogOpExecutor, Metrics metrics, NotificationEvent event)
         throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkArgument(MetastoreEventType.DROP_DATABASE.equals(eventType_));
+      Preconditions.checkArgument(
+          MetastoreEventType.DROP_DATABASE.equals(getEventType()));
       JSONDropDatabaseMessage dropDatabaseMessage =
           (JSONDropDatabaseMessage) MetastoreEventsProcessor.getMessageDeserializer()
               .getDropDatabaseMessage(event.getMessage());
@@ -1294,13 +1433,13 @@ public class MetastoreEvents {
     @Override
     public void process() {
       boolean dbRemoved = catalogOpExecutor_
-          .removeDbIfNotAddedLater(eventId_, droppedDatabase_.getName());
+          .removeDbIfNotAddedLater(getEventId(), droppedDatabase_.getName());
       if (dbRemoved) {
         infoLog("Removed Database {} ", dbName_);
         metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_DATABASES_REMOVED).inc();
       } else {
         metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-        infoLog("Incremented skipped metric to " + metrics_
+        debugLog("Incremented skipped metric to " + metrics_
             .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
       }
     }
@@ -1346,7 +1485,7 @@ public class MetastoreEvents {
     private AddPartitionEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkState(eventType_.equals(MetastoreEventType.ADD_PARTITION));
+      Preconditions.checkState(getEventType().equals(MetastoreEventType.ADD_PARTITION));
       if (event.getMessage() == null) {
         throw new IllegalStateException(debugString("Event message is null"));
       }
@@ -1405,7 +1544,7 @@ public class MetastoreEvents {
           // we throw MetastoreNotificationNeedsInvalidateException exception. We skip
           // refresh of the partitions if the table is not present in the catalog.
           int numPartsAdded = catalogOpExecutor_
-              .addPartitionsIfNotRemovedLater(eventId_, dbName_, tblName_,
+              .addPartitionsIfNotRemovedLater(getEventId(), dbName_, tblName_,
                   addedPartitions_, "ADD_PARTITION");
           if (numPartsAdded != 0) {
             infoLog("Successfully added {} partitions to table {}",
@@ -1414,7 +1553,7 @@ public class MetastoreEvents {
                 .inc(numPartsAdded);
           } else {
             metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-            infoLog("Incremented skipped metric to " + metrics_
+            debugLog("Incremented skipped metric to " + metrics_
                 .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
           }
         }
@@ -1436,6 +1575,10 @@ public class MetastoreEvents {
     private final org.apache.hadoop.hive.metastore.api.Partition partitionBefore_;
     // the Partition object after alter operation, as parsed from the NotificationEvent
     private final org.apache.hadoop.hive.metastore.api.Partition partitionAfter_;
+    // the version number from the partition parameters of the event.
+    private final long versionNumberFromEvent_;
+    // the service id from the partition parameters of the event.
+    private final String serviceIdFromEvent_;
 
     /**
      * Prevent instantiation from outside should use MetastoreEventFactory instead
@@ -1443,7 +1586,7 @@ public class MetastoreEvents {
     private AlterPartitionEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkState(eventType_.equals(MetastoreEventType.ALTER_PARTITION));
+      Preconditions.checkState(getEventType().equals(MetastoreEventType.ALTER_PARTITION));
       Preconditions.checkNotNull(event.getMessage());
       AlterPartitionMessage alterPartitionMessage =
           MetastoreEventsProcessor.getMessageDeserializer()
@@ -1455,6 +1598,12 @@ public class MetastoreEvents {
         partitionAfter_ =
             Preconditions.checkNotNull(alterPartitionMessage.getPtnObjAfter());
         msTbl_ = alterPartitionMessage.getTableObj();
+        Map<String, String> parameters = partitionAfter_.getParameters();
+        versionNumberFromEvent_ = Long.parseLong(
+            MetastoreEvents.getStringProperty(parameters,
+                MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
+        serviceIdFromEvent_ = MetastoreEvents.getStringProperty(
+            parameters, MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
       } catch (Exception e) {
         throw new MetastoreNotificationException(
             debugString("Unable to parse the alter partition message"), e);
@@ -1462,6 +1611,50 @@ public class MetastoreEvents {
     }
 
     @Override
+    protected MetastoreEventType getBatchEventType() {
+      return MetastoreEventType.ALTER_PARTITIONS;
+    }
+
+    @Override
+    protected Partition getPartitionForBatching() { return partitionAfter_; }
+
+    @Override
+    public boolean canBeBatched(MetastoreEvent event) {
+      if (!(event instanceof AlterPartitionEvent)) return false;
+      AlterPartitionEvent alterPartitionEvent = (AlterPartitionEvent) event;
+      if (event.getEventId() != 1 + getEventId()) return false;
+      // make sure that the event is on the same table
+      if (!getFullyQualifiedTblName().equalsIgnoreCase(
+          alterPartitionEvent.getFullyQualifiedTblName())) {
+        return false;
+      }
+
+      // in case of ALTER_PARTITION we only batch together the events
+      // which have same versionNumber and serviceId from the event. This simplifies
+      // the self-event evaluation for the batch since either the whole batch is
+      // self-events or not.
+      Map<String, String> parametersFromEvent =
+          alterPartitionEvent.partitionAfter_.getParameters();
+      long versionNumberOfEvent = Long.parseLong(
+          MetastoreEvents.getStringProperty(parametersFromEvent,
+              MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
+      String serviceIdOfEvent = MetastoreEvents.getStringProperty(parametersFromEvent,
+          MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
+      return versionNumberFromEvent_ == versionNumberOfEvent
+          && serviceIdFromEvent_.equals(serviceIdOfEvent);
+    }
+
+    @Override
+    public MetastoreEvent addToBatchEvents(MetastoreEvent event) {
+      if (!(event instanceof AlterPartitionEvent)) return null;
+      BatchPartitionEvent<AlterPartitionEvent> batchEvent = new BatchPartitionEvent<>(
+          this);
+      Preconditions.checkState(batchEvent.canBeBatched(event));
+      batchEvent.addToBatchEvents(event);
+      return batchEvent;
+    }
+
+    @Override
     public void process() throws MetastoreNotificationException, CatalogException {
       if (isSelfEvent()) {
         infoLog("Not processing the event as it is a self-event");
@@ -1485,7 +1678,7 @@ public class MetastoreEvents {
         List<TPartitionKeyValue> tPartSpec = getTPartitionSpecFromHmsPartition(msTbl_,
             partitionAfter_);
         try {
-          reloadPartition(partitionAfter_, "ALTER_PARTITION");
+          reloadPartitions(Arrays.asList(partitionAfter_), "ALTER_PARTITION event");
         } catch (CatalogException e) {
           throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
                   + "partition on table {} partition {} failed. Event processing cannot "
@@ -1496,7 +1689,8 @@ public class MetastoreEvents {
       }
     }
 
-    private boolean canBeSkipped() {
+    @Override
+    protected boolean canBeSkipped() {
       // Certain alter events just modify some parameters such as
       // "transient_lastDdlTime" in Hive. For eg: the alter table event generated
       // along with insert events. Check if the alter table event is such a trivial
@@ -1518,6 +1712,139 @@ public class MetastoreEvents {
     }
   }
 
+  /**
+   * This event represents a batch of events of type T. The batch of events is
+   * initialized from a single initial event called baseEvent. More events can be added
+   * to the batch using {@code addToBatchEvents} method. Currently we only support
+   * ALTER_PARTITION and INSERT partition events for batching.
+   * @param <T> The type of event which is batched by this event.
+   */
+  public static class BatchPartitionEvent<T extends MetastoreTableEvent> extends
+      MetastoreTableEvent {
+    private final T baseEvent_;
+    private final List<T> batchedEvents_ = new ArrayList<>();
+
+    private BatchPartitionEvent(T baseEvent) {
+      super(baseEvent.catalogOpExecutor_, baseEvent.metrics_, baseEvent.event_);
+      this.msTbl_ = baseEvent.msTbl_;
+      this.baseEvent_ = baseEvent;
+      batchedEvents_.add(baseEvent);
+      // override the eventType_ to represent that this is a batch of events.
+      setEventType(baseEvent.getBatchEventType());
+    }
+
+    @Override
+    public MetastoreEvent addToBatchEvents(MetastoreEvent event) {
+      Preconditions.checkState(canBeBatched(event));
+      batchedEvents_.add((T) event);
+      return this;
+    }
+
+    @Override
+    public int getNumberOfEvents() { return batchedEvents_.size(); }
+
+    /**
+     * Return the event id of this batch event. We return the last eventId
+     * from this batch which is important since it is used to determined the event
+     * id for fetching next set of events from metastore.
+     */
+    @Override
+    public long getEventId() {
+      Preconditions.checkState(!batchedEvents_.isEmpty());
+      return batchedEvents_.get(batchedEvents_.size()-1).getEventId();
+    }
+
+    /**
+     *
+     * @param event The event under consideration to be batched into this event. It can
+     *              be added to the batch if it can be batched into the last event of the
+     *              current batch.
+     * @return true if we can add the event to the current batch; else false.
+     */
+    @Override
+    public boolean canBeBatched(MetastoreEvent event) {
+      Preconditions.checkState(!batchedEvents_.isEmpty());
+      return batchedEvents_.get(batchedEvents_.size()-1).canBeBatched(event);
+    }
+
+    @VisibleForTesting
+    List<T> getBatchEvents() { return batchedEvents_; }
+
+    @Override
+    protected void process() throws MetastoreNotificationException, CatalogException {
+      if (isSelfEvent()) {
+        infoLog("Not processing the event as it is a self-event");
+        return;
+      }
+
+      // Ignore the event if this is a trivial event. See javadoc for
+      // isTrivialAlterPartitionEvent() for examples.
+      List<T> eventsToProcess = new ArrayList<>();
+      for (T event : batchedEvents_) {
+        if (!event.canBeSkipped()) {
+          eventsToProcess.add(event);
+        }
+      }
+      if (eventsToProcess.isEmpty()) {
+        LOG.info(
+            "Ignoring events from event id {} to {} since they modify parameters "
+            + " which can be ignored", getFirstEventId(), getLastEventId());
+        return;
+      }
+
+      // Reload the whole table if it's a transactional table.
+      if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
+        reloadTableFromCatalog(getEventType().toString(), true);
+      } else {
+        // Reload the partitions from the batch.
+        List<Partition> partitions = new ArrayList<>();
+        for (T event : eventsToProcess) {
+          partitions.add(event.getPartitionForBatching());
+        }
+        try {
+          reloadPartitions(partitions, getEventType().toString() + " event");
+        } catch (CatalogException e) {
+          throw new MetastoreNotificationNeedsInvalidateException(String.format(
+              "Refresh partitions on table %s failed when processing event ids %s-%s. "
+              + "Issue an invalidate command to reset the event processor state.",
+              getFullyQualifiedTblName(), getFirstEventId(), getLastEventId()), e);
+        }
+      }
+    }
+
+    /**
+     * Gets the event id of the first event in the batch.
+     */
+    private long getFirstEventId() {
+      return batchedEvents_.get(0).getEventId();
+    }
+
+    /**
+     * Gets the event id of the last event in the batch.
+     */
+    private long getLastEventId() {
+      return batchedEvents_.get(batchedEvents_.size()-1).getEventId();
+    }
+
+    @Override
+    protected SelfEventContext getSelfEventContext() {
+      List<List<TPartitionKeyValue>> partitionKeyValues = new ArrayList<>();
+      List<Long> eventIds = new ArrayList<>();
+      // We treat insert event as a special case since the self-event context for an
+      // insert event is generated differently using the eventIds.
+      boolean isInsertEvent = baseEvent_ instanceof InsertEvent;
+      for (T event : batchedEvents_) {
+        partitionKeyValues.add(
+            getTPartitionSpecFromHmsPartition(event.msTbl_,
+                event.getPartitionForBatching()));
+        eventIds.add(event.getEventId());
+      }
+      return new SelfEventContext(dbName_, tblName_, partitionKeyValues,
+          baseEvent_.getPartitionForBatching().getParameters(),
+          isInsertEvent ? eventIds : null);
+    }
+  }
+
   public static class DropPartitionEvent extends MetastoreTableEvent {
     private final List<Map<String, String>> droppedPartitions_;
     public static final String EVENT_TYPE = "DROP_PARTITION";
@@ -1528,7 +1855,7 @@ public class MetastoreEvents {
     private DropPartitionEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) throws MetastoreNotificationException {
       super(catalogOpExecutor, metrics, event);
-      Preconditions.checkState(eventType_.equals(MetastoreEventType.DROP_PARTITION));
+      Preconditions.checkState(getEventType().equals(MetastoreEventType.DROP_PARTITION));
       Preconditions.checkNotNull(event.getMessage());
       DropPartitionMessage dropPartitionMessage =
           MetastoreEventsProcessor.getMessageDeserializer()
@@ -1566,7 +1893,7 @@ public class MetastoreEvents {
           reloadTableFromCatalog("DROP_PARTITION", true);
         } else {
           int numPartsRemoved = catalogOpExecutor_
-              .removePartitionsIfNotAddedLater(eventId_, dbName_, tblName_,
+              .removePartitionsIfNotAddedLater(getEventId(), dbName_, tblName_,
                   droppedPartitions_, "DROP_PARTITION");
           if (numPartsRemoved > 0) {
             infoLog("{} partitions dropped from table {}", numPartsRemoved,
@@ -1575,7 +1902,7 @@ public class MetastoreEvents {
                 .inc(numPartsRemoved);
           } else {
             metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-            infoLog("Incremented skipped metric to " + metrics_
+            debugLog("Incremented skipped metric to " + metrics_
                 .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
           }
         }
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 1823898..c602053 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -244,6 +244,8 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
   public static final String NUMBER_OF_PARTITIONS_REMOVED = "partitions-removed";
   // number of entries in the delete event log
   public static final String DELETE_EVENT_LOG_SIZE = "delete-event-log-size";
+  // number of batch events generated
+  public static final String NUMBER_OF_BATCH_EVENTS = "batch-events-created";
 
   /**
    * Wrapper around {@link MetastoreEventsProcessor#getNextMetastoreEvents} which passes
@@ -415,6 +417,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
     metrics_.addCounter(NUMBER_OF_PARTITIONS_REMOVED);
     metrics_
         .addGauge(DELETE_EVENT_LOG_SIZE, (Gauge<Integer>) deleteEventLog_::size);
+    metrics_.addCounter(NUMBER_OF_BATCH_EVENTS);
   }
 
   /**
@@ -746,7 +749,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
           lastProcessedEvent = event.metastoreNotificationEvent_;
           event.processIfEnabled();
           deleteEventLog_.garbageCollect(event.getEventId());
-          lastSyncedEventId_.set(event.eventId_);
+          lastSyncedEventId_.set(event.getEventId());
         }
       }
     } catch (CatalogException e) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java b/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
index 10dd4eb..56eedfe 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
@@ -32,21 +32,27 @@ import org.apache.impala.thrift.TPartitionKeyValue;
 public class SelfEventContext {
   private final String dbName_;
   private final String tblName_;
-  private final long insertEventId_;
+  // eventids for the insert events. In case of an un-partitioned table insert
+  // this list contains only one event id for the insert. In case of partition level
+  // inserts, this contains events 1 to 1 with the partition key values
+  // partitionKeyValues_. For example insertEventIds_.get(0) is the insert event for
+  // partition with values partitionKeyValues_.get(0) and so on
+  private final List<Long> insertEventIds_;
+  // the partition key values for self-event evaluation at the partition level.
+  private final List<List<TPartitionKeyValue>> partitionKeyValues_;
   // version number from the event object parameters used for self-event detection
   private final long versionNumberFromEvent_;
   // service id from the event object parameters used for self-event detection
   private final String serviceidFromEvent_;
-  private final List<List<TPartitionKeyValue>> partitionKeyValues_;
 
   SelfEventContext(String dbName, String tblName,
       Map<String, String> parameters) {
-    this(dbName, tblName, null, parameters, -1);
+    this(dbName, tblName, null, parameters, null);
   }
 
   SelfEventContext(String dbName, String tblName,
       List<List<TPartitionKeyValue>> partitionKeyValues, Map<String, String> parameters) {
-    this(dbName, tblName, partitionKeyValues, parameters, -1);
+    this(dbName, tblName, partitionKeyValues, parameters, null);
   }
 
   /**
@@ -58,19 +64,27 @@ public class SelfEventContext {
    * @param partitionKeyValues Partition key-values in case of self-event
    * context is for partition.
    * @param parameters this could be database, table or partition parameters.
+   * @param insertEventIds In case this is self-event context for an insert event, this
+   *                       parameter provides the list of insert event ids which map to
+   *                       the partitions. In case of unpartitioned table must contain
+   *                       only one event id.
    */
   SelfEventContext(String dbName, @Nullable String tblName,
       @Nullable List<List<TPartitionKeyValue>> partitionKeyValues,
-      Map<String, String> parameters, long eventId) {
+      Map<String, String> parameters, List<Long> insertEventIds) {
     Preconditions.checkNotNull(parameters);
+    // if this is for an insert event on partitions then the size of insertEventIds
+    // must be equal to number of TPartitionKeyValues
+    Preconditions.checkArgument((partitionKeyValues == null ||
+        insertEventIds == null) || partitionKeyValues.size() == insertEventIds.size());
     this.dbName_ = Preconditions.checkNotNull(dbName);
     this.tblName_ = tblName;
     this.partitionKeyValues_ = partitionKeyValues;
-    insertEventId_ = eventId;
-    versionNumberFromEvent_ = Long.parseLong(
+    this.insertEventIds_ = insertEventIds;
+    this.versionNumberFromEvent_ = Long.parseLong(
         MetastoreEvents.getStringProperty(parameters,
             MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
-    serviceidFromEvent_ = MetastoreEvents.getStringProperty(
+    this.serviceidFromEvent_ = MetastoreEvents.getStringProperty(
         parameters, MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
   }
 
@@ -82,7 +96,9 @@ public class SelfEventContext {
     return tblName_;
   }
 
-  public long getIdFromEvent() { return insertEventId_; }
+  public long getInsertEventId(int idx) {
+    return insertEventIds_.get(idx);
+  }
 
   public long getVersionNumberFromEvent() {
     return versionNumberFromEvent_;
@@ -94,4 +110,8 @@ public class SelfEventContext {
     return partitionKeyValues_ == null ?
      null : Collections.unmodifiableList(partitionKeyValues_);
   }
+
+  public boolean isInsertEventContext() {
+    return insertEventIds_ != null;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 2625b1d..5cc4bfd 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -4074,17 +4074,21 @@ public class CatalogOpExecutor {
   }
 
   /**
-   * Reloads a partition if exists and has not been removed since the event was generated.
+   * Reloads the given partitions if the exists and have not been removed since the event
+   * was generated.
+   *
    * @param eventId EventId being processed.
    * @param dbName Database name for the partition
    * @param tblName Table name for the partition
-   * @param partition {@link Partition} object from the event.
+   * @param partsFromEvent List of {@link Partition} objects from the events to be
+   *                       reloaded.
    * @param reason Reason for reloading the partitions for logging purposes.
-   * @return True if the partition was reloaded; else false.
-   * @throws CatalogException
+   * @return the number of partitions which were reloaded. If the table does not exist,
+   * returns 0. Some partitions could be skipped if they don't exist anymore.
    */
-  public boolean reloadPartitionIfExists(long eventId, String dbName,
-      String tblName, Partition partition, String reason) throws CatalogException {
+  public int reloadPartitionsIfExist(long eventId, String dbName,
+      String tblName, List<Partition> partsFromEvent, String reason)
+      throws CatalogException {
     Table table = catalog_.getTable(dbName, tblName);
     if (table == null) {
       DeleteEventLog deleteEventLog = catalog_.getMetastoreEventProcessor()
@@ -4094,7 +4098,7 @@ public class CatalogOpExecutor {
         LOG.info(
             "Not reloading the partition of table {} since it was removed "
                 + "later in catalog", new TableName(dbName, tblName));
-        return false;
+        return 0;
       } else {
         throw new TableNotFoundException(
             "Table " + dbName + "." + tblName + " not found");
@@ -4103,7 +4107,7 @@ public class CatalogOpExecutor {
     if (table instanceof IncompleteTable) {
       LOG.info("Table {} is not loaded. Skipping drop partition event {}",
           table.getFullName(), eventId);
-      return false;
+      return 0;
     }
     if (!(table instanceof HdfsTable)) {
       throw new CatalogException("Partition event received on a non-hdfs table");
@@ -4113,33 +4117,32 @@ public class CatalogOpExecutor {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
       HdfsTable hdfsTable = (HdfsTable) table;
-      List<LiteralExpr> partExprs = hdfsTable
-          .getTypeCompatiblePartValues(partition.getValues());
-      HdfsPartition hdfsPartition = hdfsTable.getPartition(partExprs);
-      if (hdfsPartition == null) {
-        LOG.info("Not reloading the partition {} since it does not exist anymore",
-            FileUtils.makePartName(hdfsTable.getClusteringColNames(),
-                partition.getValues()));
-        return false;
-      }
-      Reference<Boolean> wasPartitionRefreshed = new Reference<>();
-      catalog_.reloadHdfsPartition(hdfsTable, hdfsPartition.getPartitionName(),
-          wasPartitionRefreshed, ThriftObjectType.NONE, reason, newCatalogVersion,
-          hdfsPartition);
-      if (wasPartitionRefreshed.getRef()) {
-        LOG.info("EventId: {} Table {} partition {} has been refreshed", eventId,
-            hdfsTable.getFullName(),
-            FileUtils.makePartName(hdfsTable.getClusteringColNames(),
-                partition.getValues()));
-      }
-      return wasPartitionRefreshed.getRef();
-    } catch (InternalException | UnsupportedEncodingException e) {
+      // some partitions from the event or the table itself
+      // may not exist in HMS anymore. Hence, we collect the names here and re-fetch
+      // the partitions from HMS.
+      List<String> partNames = new ArrayList<>();
+      for (Partition part : partsFromEvent) {
+        partNames.add(FileUtils.makePartName(hdfsTable.getClusteringColNames(),
+            part.getValues(), null));
+      }
+      int numOfPartsReloaded;
+      try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+        numOfPartsReloaded = hdfsTable.reloadPartitionsFromNames(
+            metaStoreClient.getHiveClient(), partNames, reason);
+      }
+      hdfsTable.setCatalogVersion(newCatalogVersion);
+      return numOfPartsReloaded;
+    } catch (TableLoadingException e) {
+      LOG.info("Could not reload {} partitions of table {}", partsFromEvent.size(),
+          table.getFullName(), e);
+    } catch (InternalException e) {
       throw new CatalogException(
-          "Unable to add partition for table " + table.getFullName(), e);
+          "Could not acquire lock on the table " + table.getFullName(), e);
     } finally {
       UnlockWriteLockIfErronouslyLocked();
       table.releaseWriteLock();
     }
+    return 0;
   }
 
   public ReentrantLock getMetastoreDdlLock() {
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 5dfdc1b..51ba024 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
@@ -33,6 +34,7 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -46,7 +48,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.metastore.api.Catalog;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionsRequest;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
@@ -84,7 +86,12 @@ import org.apache.impala.catalog.ScalarFunction;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.events.ConfigValidator.ValidationResult;
+import org.apache.impala.catalog.events.MetastoreEvents.AlterPartitionEvent;
+import org.apache.impala.catalog.events.MetastoreEvents.BatchPartitionEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.AlterTableEvent;
+import org.apache.impala.catalog.events.MetastoreEvents.InsertEvent;
+import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
+import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
@@ -151,14 +158,11 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
 
 import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Main test class to cover the functionality of MetastoreEventProcessor. In order to make
@@ -275,10 +279,10 @@ public class MetastoreEventsProcessorTest {
         Mockito.spy(eventsProcessor_);
     for (MetastoreEventProcessorConfig config: MetastoreEventProcessorConfig.values()) {
       String configKey = config.getValidator().getConfigKey();
-      Mockito.when(mockMetastoreEventsProcessor.getConfigValueFromMetastore(configKey,
+      when(mockMetastoreEventsProcessor.getConfigValueFromMetastore(configKey,
           "")).thenReturn("false");
       if (config.equals(MetastoreEventProcessorConfig.METASTORE_DEFAULT_CATALOG_NAME)) {
-        Mockito.when(mockMetastoreEventsProcessor.getConfigValueFromMetastore(configKey,
+        when(mockMetastoreEventsProcessor.getConfigValueFromMetastore(configKey,
             "")).thenReturn("test_custom_catalog");
       }
     }
@@ -1063,6 +1067,9 @@ public class MetastoreEventsProcessorTest {
       InsertEventRequestData insertEventRequestData = new InsertEventRequestData();
       insertEventRequestData.setFilesAdded(newFiles);
       insertEventRequestData.setReplace(isOverwrite);
+      if (partition != null) {
+        insertEventRequestData.setPartitionVal(partition.getValues());
+      }
       partitionInsertEventInfos
           .add(insertEventRequestData);
       MetastoreShim.fireInsertEventHelper(metaStoreClient.getHiveClient(),
@@ -2223,6 +2230,220 @@ public class MetastoreEventsProcessorTest {
         numberOfSelfEventsBefore + 9, selfEventsCountAfter);
   }
 
+  @Test
+  public void testEventBatching() throws Exception {
+    // create test table and generate a real ALTER_PARTITION and INSERT event on it
+    String testTblName = "testEventBatching";
+    createDatabaseFromImpala(TEST_DB_NAME, "test");
+    createTable(TEST_DB_NAME, testTblName, true);
+    List<List<String>> partVals = new ArrayList<>();
+    partVals.add(Collections.singletonList("1"));
+    partVals.add(Collections.singletonList("2"));
+    addPartitions(TEST_DB_NAME, testTblName, partVals);
+    eventsProcessor_.processEvents();
+    alterPartitionsParams(TEST_DB_NAME, testTblName, "testkey", "val", partVals);
+    // we fetch a real alter partition event so that we can generate mocks using its
+    // contents below
+    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+    NotificationEvent alterPartEvent = null;
+    String alterPartitionEventType = "ALTER_PARTITION";
+    for (NotificationEvent event : events) {
+      if (event.getEventType().equalsIgnoreCase(alterPartitionEventType)) {
+        alterPartEvent = event;
+        break;
+      }
+    }
+    assertNotNull(alterPartEvent);
+    String alterPartMessage = alterPartEvent.getMessage();
+
+    // test insert event batching
+    org.apache.hadoop.hive.metastore.api.Table msTbl;
+    List<Partition> partitions;
+    PartitionsRequest req = new PartitionsRequest();
+    req.setDbName(TEST_DB_NAME);
+    req.setTblName(testTblName);
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      msTbl = metaStoreClient.getHiveClient().getTable(TEST_DB_NAME, testTblName);
+      partitions = metaStoreClient.getHiveClient().getPartitionsRequest(req)
+          .getPartitions();
+    }
+    assertNotNull(msTbl);
+    assertNotNull(partitions);
+    eventsProcessor_.processEvents();
+    // generate 2 insert event on each of the partition
+    for (Partition part : partitions) {
+      simulateInsertIntoTableFromFS(msTbl, 1, part, false);
+    }
+    List<NotificationEvent> insertEvents = eventsProcessor_.getNextMetastoreEvents();
+    NotificationEvent insertEvent = null;
+    for (NotificationEvent event : insertEvents) {
+      if (event.getEventType().equalsIgnoreCase("INSERT")) {
+        insertEvent = event;
+        break;
+      }
+    }
+    assertNotNull(insertEvent);
+    Map<String, String> eventTypeToMessage = new HashMap<>();
+    eventTypeToMessage.put("ALTER_PARTITION", alterPartMessage);
+    eventTypeToMessage.put("INSERT", insertEvent.getMessage());
+    runEventBatchingTest(testTblName, eventTypeToMessage);
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private void runEventBatchingTest(String testTblName,
+      Map<String, String> eventTypeToMessage) throws MetastoreNotificationException {
+    for (String eventType : eventTypeToMessage.keySet()) {
+      String eventMessage = eventTypeToMessage.get(eventType);
+      // we have 10 mock batchable events which should be batched into 1
+      List<MetastoreEvent> mockEvents = createMockEvents(100, 10,
+          eventType, TEST_DB_NAME, testTblName, eventMessage);
+      MetastoreEventFactory eventFactory = eventsProcessor_.getEventsFactory();
+      List<MetastoreEvent> batch = eventFactory.createBatchEvents(mockEvents);
+      assertEquals(1, batch.size());
+      assertTrue(batch.get(0) instanceof BatchPartitionEvent);
+      BatchPartitionEvent batchEvent = (BatchPartitionEvent) batch.get(0);
+      assertEquals(10, batchEvent.getBatchEvents().size());
+      // create a batch which consists of some other events
+      // only contiguous events should be batched
+      // 13-15 mock events which can be batched
+      mockEvents = createMockEvents(13, 3,
+          eventType, TEST_DB_NAME, testTblName, eventMessage);
+      // 17-18 can be batched
+      mockEvents.addAll(createMockEvents(17, 2,
+          eventType, TEST_DB_NAME, testTblName, eventMessage));
+      // event id 20 should not be batched
+      mockEvents.addAll(createMockEvents(20, 1,
+          eventType, TEST_DB_NAME, testTblName, eventMessage));
+      // events 22-24 should be batched
+      mockEvents.addAll(createMockEvents(22, 3,
+          eventType, TEST_DB_NAME, testTblName, eventMessage));
+
+      batch = eventFactory.createBatchEvents(mockEvents);
+      assertEquals(4, batch.size());
+      MetastoreEvent batch1 = batch.get(0);
+      assertEquals(3, ((BatchPartitionEvent) batch1).getBatchEvents().size());
+      MetastoreEvent batch2 = batch.get(1);
+      assertEquals(2, ((BatchPartitionEvent) batch2).getBatchEvents().size());
+      MetastoreEvent batch3 = batch.get(2);
+      if (eventType.equalsIgnoreCase("ALTER_PARTITION")) {
+        assertTrue(batch3 instanceof AlterPartitionEvent);
+      } else {
+        assertTrue(batch3 instanceof InsertEvent);
+      }
+      MetastoreEvent batch4 = batch.get(3);
+      assertEquals(3, ((BatchPartitionEvent) batch4).getBatchEvents().size());
+      // test to make sure that events which have different database name are not
+      // batched
+      mockEvents = createMockEvents(100, 1, eventType, TEST_DB_NAME,
+          testTblName, eventMessage);
+      mockEvents.addAll(createMockEvents(101, 1, eventType, "db1",
+          testTblName, eventMessage));
+
+      List<MetastoreEvent> batchEvents = eventFactory.createBatchEvents(mockEvents);
+      assertEquals(2, batchEvents.size());
+      for (MetastoreEvent event : batchEvents) {
+        if (eventType.equalsIgnoreCase("ALTER_PARTITION")) {
+          assertTrue(event instanceof AlterPartitionEvent);
+        } else {
+          assertTrue(event instanceof InsertEvent);
+        }
+      }
+
+      // test no batching when table name is different
+      mockEvents = createMockEvents(100, 1, eventType, TEST_DB_NAME,
+          testTblName, eventMessage);
+      mockEvents.addAll(createMockEvents(101, 1, eventType, TEST_DB_NAME,
+          "testtbl", eventMessage));
+      batchEvents = eventFactory.createBatchEvents(mockEvents);
+      assertEquals(2, batchEvents.size());
+      for (MetastoreEvent event : batchEvents) {
+        if (eventType.equalsIgnoreCase("ALTER_PARTITION")) {
+          assertTrue(event instanceof AlterPartitionEvent);
+        } else {
+          assertTrue(event instanceof InsertEvent);
+        }
+      }
+    }
+    // make sure 2 events of different event types are not batched together
+    long startEventId = 17;
+    // batch 1
+    List<MetastoreEvent> mockEvents = createMockEvents(startEventId, 3,
+        "ALTER_PARTITION", TEST_DB_NAME, testTblName,
+        eventTypeToMessage.get("ALTER_PARTITION"));
+    // batch 2
+    mockEvents.addAll(createMockEvents(startEventId + mockEvents.size(), 3, "INSERT",
+        TEST_DB_NAME, testTblName, eventTypeToMessage.get("INSERT")));
+    // batch 3 : single event not-batched
+    mockEvents.addAll(
+        createMockEvents(startEventId + mockEvents.size(), 1, "ALTER_PARTITION",
+        TEST_DB_NAME, testTblName, eventTypeToMessage.get("ALTER_PARTITION")));
+    // batch 4 : single event not-batched
+    mockEvents.addAll(createMockEvents(startEventId + mockEvents.size(), 1, "INSERT",
+        TEST_DB_NAME, testTblName, eventTypeToMessage.get("INSERT")));
+    // batch 5 : single event not-batched
+    mockEvents.addAll(
+        createMockEvents(startEventId + mockEvents.size(), 1, "ALTER_PARTITION",
+            TEST_DB_NAME, testTblName, eventTypeToMessage.get("ALTER_PARTITION")));
+    // batch 6
+    mockEvents.addAll(createMockEvents(startEventId + mockEvents.size(), 5, "INSERT",
+        TEST_DB_NAME, testTblName, eventTypeToMessage.get("INSERT")));
+    // batch 7
+    mockEvents.addAll(
+        createMockEvents(startEventId + mockEvents.size(), 5, "ALTER_PARTITION",
+            TEST_DB_NAME, testTblName, eventTypeToMessage.get("ALTER_PARTITION")));
+    List<MetastoreEvent> batchedEvents = eventsProcessor_.getEventsFactory()
+        .createBatchEvents(mockEvents);
+    assertEquals(7, batchedEvents.size());
+    // batch 1 should contain 3 AlterPartitionEvent
+    BatchPartitionEvent<AlterPartitionEvent> batch1 = (BatchPartitionEvent
+        <AlterPartitionEvent>) batchedEvents.get(0);
+    assertEquals(3, batch1.getNumberOfEvents());
+    // batch 2 should contain 3 InsertEvent
+    BatchPartitionEvent<InsertEvent> batch2 = (BatchPartitionEvent
+        <InsertEvent>) batchedEvents.get(1);
+    assertEquals(3, batch2.getNumberOfEvents());
+    // 3rd is a single non-batch event
+    assertTrue(batchedEvents.get(2) instanceof AlterPartitionEvent);
+    // 4th is a single non-batch insert event
+    assertTrue(batchedEvents.get(3) instanceof InsertEvent);
+    // 5th is a single non-batch alter partition event
+    assertTrue(batchedEvents.get(4) instanceof AlterPartitionEvent);
+    // 6th is batch insert event of size 5
+    BatchPartitionEvent<InsertEvent> batch6 = (BatchPartitionEvent
+        <InsertEvent>) batchedEvents.get(5);
+    assertEquals(5, batch6.getNumberOfEvents());
+    // 7th is batch of alter partitions of size 5
+    BatchPartitionEvent<AlterPartitionEvent> batch7 = (BatchPartitionEvent
+        <AlterPartitionEvent>) batchedEvents.get(6);
+    assertEquals(5, batch7.getNumberOfEvents());
+  }
+
+
+  /**
+   * Creates mock notification event from the given list of parameters. The message
+   * should be a legal parsable message from a real notification event.
+   */
+  private List<MetastoreEvent> createMockEvents(long startEventId, int numEvents,
+      String eventType, String dbName, String tblName, String message)
+      throws MetastoreNotificationException {
+    List<NotificationEvent> mockEvents = new ArrayList<>();
+    for (int i=0; i<numEvents; i++) {
+      NotificationEvent mock = Mockito.mock(NotificationEvent.class);
+      when(mock.getEventId()).thenReturn(startEventId);
+      when(mock.getEventType()).thenReturn(eventType);
+      when(mock.getDbName()).thenReturn(dbName);
+      when(mock.getTableName()).thenReturn(tblName);
+      when(mock.getMessage()).thenReturn(message);
+      mockEvents.add(mock);
+      startEventId++;
+    }
+    List<MetastoreEvent> metastoreEvents = new ArrayList<>();
+    for (NotificationEvent notificationEvent : mockEvents) {
+      metastoreEvents.add(eventsProcessor_.getEventsFactory().get(notificationEvent));
+    }
+    return metastoreEvents;
+  }
+
   private abstract class AlterTableExecutor {
     protected abstract void execute() throws Exception;
 
@@ -2391,7 +2612,7 @@ public class MetastoreEventsProcessorTest {
             + "self-event", hdfsPartition,
         catalog_.getHdfsPartition(TEST_DB_NAME, testTblName, partKeyVals));
 
-    // compute stats on the table and make sure that the table and its partittions are
+    // compute stats on the table and make sure that the table and its partitions are
     // not refreshed due to the events
     alterTableComputeStats(testTblName, Arrays.asList(Arrays.asList("1"),
         Arrays.asList("2")));
@@ -3148,6 +3369,20 @@ public class MetastoreEventsProcessorTest {
     }
   }
 
+  private void alterPartitionsParams(String db, String tblName, String key,
+      String val, List<List<String>> partVals) throws Exception {
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      List<Partition> partitions = new ArrayList<>();
+      for (List<String> partVal : partVals) {
+        Partition partition = metaStoreClient.getHiveClient().getPartition(TEST_DB_NAME,
+            tblName, partVal);
+        partition.getParameters().put(key, val);
+        partitions.add(partition);
+      }
+      metaStoreClient.getHiveClient().alter_partitions(TEST_DB_NAME, tblName, partitions);
+    }
+  }
+
   /**
    * Alters trivial partition properties which must be ignored by the event processor
    */
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index e4c2089..c90275e 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -197,12 +197,9 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
 
     # get the metric before values
     EventProcessorUtils.wait_for_event_processing(self)
-    create_metric_val_before = EventProcessorUtils. \
-      get_event_processor_metric(create_metric_name, 0)
-    removed_metric_val_before = EventProcessorUtils. \
-      get_event_processor_metric(removed_metric_name, 0)
-    events_skipped_before = EventProcessorUtils. \
-      get_event_processor_metric('events-skipped', 0)
+    create_metric_val_before = EventProcessorUtils.get_int_metric(create_metric_name, 0)
+    removed_metric_val_before = EventProcessorUtils.get_int_metric(removed_metric_name, 0)
+    events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0)
     num_iters = 100
     for iter in xrange(num_iters):
       for q in queries:
@@ -212,22 +209,19 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
           print("Failed in {} iterations. Error {}".format(iter, str(e)))
           raise
     EventProcessorUtils.wait_for_event_processing(self)
-    create_metric_val_after = EventProcessorUtils. \
-      get_event_processor_metric(create_metric_name, 0)
-    removed_metric_val_after = EventProcessorUtils. \
-      get_event_processor_metric(removed_metric_name, 0)
-    events_skipped_after = EventProcessorUtils. \
-      get_event_processor_metric('events-skipped', 0)
-    num_delete_event_entries = EventProcessorUtils. \
-      get_event_processor_metric('delete-event-log-size', 0)
+    create_metric_val_after = EventProcessorUtils.get_int_metric(create_metric_name, 0)
+    removed_metric_val_after = EventProcessorUtils.get_int_metric(removed_metric_name, 0)
+    events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0)
+    num_delete_event_entries = EventProcessorUtils.\
+        get_int_metric('delete-event-log-size', 0)
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
     # None of the queries above should actually trigger a add/remove object from events
-    assert int(create_metric_val_after) == int(create_metric_val_before)
-    assert int(removed_metric_val_after) == int(removed_metric_val_before)
+    assert create_metric_val_after == create_metric_val_before
+    assert removed_metric_val_after == removed_metric_val_before
     # each query set generates 2 events and both of them should be skipped
-    assert int(events_skipped_after) == num_iters * 2 + int(events_skipped_before)
+    assert events_skipped_after == num_iters * 2 + events_skipped_before
     # make sure that there are no more entries in the delete event log
-    assert int(num_delete_event_entries) == 0
+    assert num_delete_event_entries == 0
 
   @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
   def test_self_events(self, unique_database):
@@ -238,6 +232,99 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     self.__run_self_events_test(unique_database, True)
     self.__run_self_events_test(unique_database, False)
 
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
+  def test_event_batching(self, unique_database):
+    """Runs queries which generate multiple ALTER_PARTITION events which must be
+    batched by events processor. Runs as a custom cluster test to isolate the metric
+    values from other tests."""
+    testtbl = "test_event_batching"
+    test_acid_tbl = "test_event_batching_acid"
+    acid_props = self.__get_transactional_tblproperties(True)
+    # create test tables
+    self.client.execute(
+      "create table {}.{} like functional.alltypes".format(unique_database, testtbl))
+    self.client.execute(
+      "insert into {}.{} partition (year,month) select * from functional.alltypes".format(
+        unique_database, testtbl))
+    self.client.execute(
+      "create table {}.{} (id int) partitioned by (year int, month int) {}".format(
+        unique_database, test_acid_tbl, acid_props))
+    self.client.execute(
+      "insert into {}.{} partition (year, month) "
+      "select id, year, month from functional.alltypes".format(unique_database,
+        test_acid_tbl))
+    # run compute stats from impala; this should generate 24 ALTER_PARTITION events which
+    # should be batched together into 1 or more number of events.
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_metric = "batch-events-created"
+    batch_events_1 = EventProcessorUtils.get_int_metric(batch_events_metric)
+    self.client.execute("compute stats {}.{}".format(unique_database, testtbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_2 = EventProcessorUtils.get_int_metric(batch_events_metric)
+    assert batch_events_2 > batch_events_1
+    # run analyze stats event from hive which generates ALTER_PARTITION event on each
+    # partition of the table
+    self.run_stmt_in_hive(
+      "analyze table {}.{} compute statistics".format(unique_database, testtbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_3 = EventProcessorUtils.get_int_metric(batch_events_metric)
+    assert batch_events_3 > batch_events_2
+    # in case of transactional table since we batch the events together, the number of
+    # tables refreshed must be far lower than number of events generated
+    num_table_refreshes_1 = EventProcessorUtils.get_int_metric(
+      "tables-refreshed")
+    self.client.execute("compute stats {}.{}".format(unique_database, test_acid_tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_4 = EventProcessorUtils.get_int_metric(batch_events_metric)
+    num_table_refreshes_2 = EventProcessorUtils.get_int_metric(
+      "tables-refreshed")
+    # we should generate atleast 1 batch event if not more due to the 24 consecutive
+    # ALTER_PARTITION events
+    assert batch_events_4 > batch_events_3
+    # table should not be refreshed since this is a self-event
+    assert num_table_refreshes_2 == num_table_refreshes_1
+    self.run_stmt_in_hive(
+      "analyze table {}.{} compute statistics".format(unique_database, test_acid_tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_5 = EventProcessorUtils.get_int_metric(batch_events_metric)
+    assert batch_events_5 > batch_events_4
+    num_table_refreshes_2 = EventProcessorUtils.get_int_metric("tables-refreshed")
+    # the analyze table from hive generates 24 ALTER_PARTITION events which should be
+    # batched into 1-2 batches (depending on timing of the event poll thread).
+    assert num_table_refreshes_2 > num_table_refreshes_1
+    assert int(num_table_refreshes_2) - int(num_table_refreshes_1) < 24
+    EventProcessorUtils.wait_for_event_processing(self)
+    # test for batching of insert events
+    batch_events_insert = EventProcessorUtils.get_int_metric(batch_events_metric)
+    tables_refreshed_insert = EventProcessorUtils.get_int_metric("tables-refreshed")
+    partitions_refreshed_insert = EventProcessorUtils.get_int_metric(
+      "partitions-refreshed")
+    self.client.execute(
+      "insert into {}.{} partition (year,month) select * from functional.alltypes".format(
+        unique_database, testtbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_after_insert = EventProcessorUtils.get_int_metric(batch_events_metric)
+    tables_refreshed_after_insert = EventProcessorUtils.get_int_metric("tables-refreshed")
+    partitions_refreshed_after_insert = EventProcessorUtils.get_int_metric(
+      "partitions-refreshed")
+    # this is a self-event tables or partitions should not be refreshed
+    assert batch_events_after_insert > batch_events_insert
+    assert tables_refreshed_after_insert == tables_refreshed_insert
+    assert partitions_refreshed_after_insert == partitions_refreshed_insert
+    # run the insert from hive to make sure that batch event is refreshing all the
+    # partitions
+    self.run_stmt_in_hive(
+      "SET hive.exec.dynamic.partition.mode=nonstrict; insert into {}.{} partition"
+      " (year,month) select * from functional.alltypes".format(
+        unique_database, testtbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    batch_events_after_hive = EventProcessorUtils.get_int_metric(batch_events_metric)
+    partitions_refreshed_after_hive = EventProcessorUtils.get_int_metric(
+      "partitions-refreshed")
+    assert batch_events_after_hive > batch_events_insert
+    # 24 partitions inserted and hence we must refresh 24 partitions once.
+    assert int(partitions_refreshed_after_hive) == int(partitions_refreshed_insert) + 24
+
   def __run_self_events_test(self, db_name, use_impala):
     recover_tbl_name = ImpalaTestSuite.get_random_name("tbl_")
     # create a table similar to alltypes so that we can recover the partitions on it
@@ -412,8 +499,8 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     self.client.execute("refresh {0}.{1}".format(db_name, tbl_name))
     self_event_test_queries = [
       # ALTER_DATABASE cases
-      "alter database {0} set dbproperties ('comment'='self-event test database')".format(
-        db_name),
+      "alter database {0} set dbproperties ('comment'='self-event test "
+      "database')".format(db_name),
       "alter database {0} set owner user `test-user`".format(db_name),
       # ALTER_TABLE case
       "alter table {0}.{1} set tblproperties ('k'='v')".format(db_name, tbl_name),
@@ -455,12 +542,10 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     Gets the tables-refreshed, partitions-refreshed and events-skipped metric values
     from Metastore EventsProcessor
     """
-    tbls_refreshed_count = EventProcessorUtils.get_event_processor_metric(
-      'tables-refreshed', 0)
-    partitions_refreshed_count = EventProcessorUtils.get_event_processor_metric(
+    tbls_refreshed_count = EventProcessorUtils.get_int_metric('tables-refreshed', 0)
+    partitions_refreshed_count = EventProcessorUtils.get_int_metric(
       'partitions-refreshed', 0)
-    events_skipped_count = EventProcessorUtils.get_event_processor_metric(
-      'events-skipped', 0)
+    events_skipped_count = EventProcessorUtils.get_int_metric('events-skipped', 0)
     return int(tbls_refreshed_count), int(partitions_refreshed_count), \
       int(events_skipped_count)
 
diff --git a/tests/metadata/test_event_processing.py b/tests/metadata/test_event_processing.py
index 7dd2cab..f3a9380 100644
--- a/tests/metadata/test_event_processing.py
+++ b/tests/metadata/test_event_processing.py
@@ -181,6 +181,7 @@ class TestEventProcessing(ImpalaTestSuite):
   def test_empty_partition_events(self, unique_database):
     self._run_test_empty_partition_events(unique_database, False)
 
+  @pytest.mark.xfail(run=False, reason="IMPALA-9057")
   def test_event_based_replication(self):
     self.__run_event_based_replication_tests()
 
diff --git a/tests/util/event_processor_utils.py b/tests/util/event_processor_utils.py
index 4a40508..81a44c4 100644
--- a/tests/util/event_processor_utils.py
+++ b/tests/util/event_processor_utils.py
@@ -87,12 +87,13 @@ class EventProcessorUtils(object):
      return dict(pairs)
 
   @staticmethod
-  def get_event_processor_metric(metric_key, default_val=None):
-    """Returns the event processor metric from the /events catalog debug page"""
+  def get_int_metric(metric_key, default_val=None):
+    """Returns the int value of event processor metric from the /events catalogd debug
+     page"""
     metrics = EventProcessorUtils.get_event_processor_metrics()
     if metric_key not in metrics:
-      return default_val
-    return metrics[metric_key]
+      return int(default_val)
+    return int(metrics[metric_key])
 
   @staticmethod
   def get_last_synced_event_id():