You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by vi...@apache.org on 2020/04/09 04:59:26 UTC

[impala] 02/02: IMPALA-8632: Add support for self-event detection for insert events

This is an automated email from the ASF dual-hosted git repository.

vihangk1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e0ed7d321c1dca85a0f1482842f8db6db517c909
Author: xiaomeng <xi...@cloudera.com>
AuthorDate: Thu Feb 20 16:32:48 2020 -0800

    IMPALA-8632: Add support for self-event detection for insert events
    
    In case of INSERT_EVENTS if Impala inserts into a table it causes a
    refresh to the underlying table/partition. This could be unnecessary
    when there is only one Impala cluster in the system.
    We can detect a self-event in such cases when the HMS API to fire a
    listener event returns the event id. This is used by EventProcessor
    to ignore the event when it is fetched later in the next polling cycle.
    
    Testing:
    Add testInsertFromImpala() in MetastoreEventsProcessorTest.java to test
    insert event self-event detection when insert into table and partition.
    
    Change-Id: I7873fbb2c159343690f93b9d120f6b425b983dcf
    Reviewed-on: http://gerrit.cloudera.org:8080/15648
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc                      |   6 +
 be/src/util/backend-gflag-util.cc                  |   2 +
 common/thrift/BackendGflags.thrift                 |   2 +
 .../org/apache/impala/compat/MetastoreShim.java    |  69 ++++++++++++
 .../org/apache/impala/compat/MetastoreShim.java    |  72 ++++++++++++
 .../impala/catalog/CatalogServiceCatalog.java      |  64 ++++++-----
 fe/src/main/java/org/apache/impala/catalog/Db.java |   4 +-
 .../org/apache/impala/catalog/HdfsPartition.java   |  25 +++--
 .../main/java/org/apache/impala/catalog/Table.java |  21 ++--
 .../impala/catalog/events/InFlightEvents.java      |  91 +++++++++++----
 .../impala/catalog/events/MetastoreEvents.java     |  34 ++++--
 .../impala/catalog/events/SelfEventContext.java    |  24 ++--
 .../org/apache/impala/service/BackendConfig.java   |   2 +
 .../apache/impala/service/CatalogOpExecutor.java   |  67 +++++------
 .../java/org/apache/impala/util/MetaStoreUtil.java |  57 +++-------
 .../events/MetastoreEventsProcessorTest.java       | 124 ++++++++++++++++++++-
 tests/custom_cluster/test_event_processing.py      |  11 +-
 17 files changed, 504 insertions(+), 171 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 79084ff..69e856e 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -266,6 +266,12 @@ DEFINE_int32(hms_event_polling_interval_s, 0,
     "feature and not recommended to be deployed on production systems until it is "
     "made generally available.");
 
+DEFINE_bool(enable_insert_events, true,
+    "Enables insert events in the events processor. When this configuration is set to "
+    "true Impala will generate INSERT event types which when received by other Impala "
+    "clusters can be used to automatically refresh the tables or partitions. Event "
+    "processing must be turned on for this flag to have any effect.");
+
 DEFINE_string(blacklisted_dbs, "sys,information_schema",
     "Comma separated list for blacklisted databases. Configure which databases to be "
     "skipped for loading (in startup and global INVALIDATE METADATA). Users can't access,"
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index f3e3755..90122d2 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -67,6 +67,7 @@ DECLARE_int64(exchg_node_buffer_size_bytes);
 DECLARE_int32(kudu_mutation_buffer_size);
 DECLARE_int32(kudu_error_buffer_size);
 DECLARE_int32(hms_event_polling_interval_s);
+DECLARE_bool(enable_insert_events);
 DECLARE_string(authorization_factory_class);
 DECLARE_bool(unlock_mt_dop);
 DECLARE_bool(mt_dop_auto_fallback);
@@ -155,6 +156,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   cfg.__set_kudu_mutation_buffer_size(FLAGS_kudu_mutation_buffer_size);
   cfg.__set_kudu_error_buffer_size(FLAGS_kudu_error_buffer_size);
   cfg.__set_hms_event_polling_interval_s(FLAGS_hms_event_polling_interval_s);
+  cfg.__set_enable_insert_events(FLAGS_enable_insert_events);
   cfg.__set_impala_build_version(::GetDaemonBuildVersion());
   cfg.__set_authorization_factory_class(FLAGS_authorization_factory_class);
   cfg.__set_unlock_mt_dop(FLAGS_unlock_mt_dop);
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 90fad1a..c9b7d47 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -155,4 +155,6 @@ struct TBackendGflags {
   65: required bool use_customized_user_groups_mapper_for_ranger
 
   66: required bool enable_column_masking
+
+  67: required bool enable_insert_events
 }
diff --git a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
index 87e8ecf..39cd4ff 100644
--- a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
@@ -23,8 +23,14 @@ import static org.apache.impala.service.MetadataOp.TABLE_TYPE_VIEW;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
@@ -51,6 +57,9 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
+import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage;
@@ -62,6 +71,9 @@ import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
 import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
 import org.apache.hive.service.rpc.thrift.TGetTablesReq;
 import org.apache.impala.authorization.User;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
@@ -71,6 +83,8 @@ import org.apache.impala.service.MetadataOp;
 import org.apache.impala.thrift.TMetadataOpRequest;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.util.AcidUtils.TblTransaction;
+import org.apache.impala.util.MetaStoreUtil.InsertEventInfo;
+import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
 /**
@@ -78,6 +92,7 @@ import org.apache.thrift.TException;
  * between major versions of Hive. This implements the shimmed methods for Hive 2.
  */
 public class MetastoreShim {
+  private static final Logger LOG = Logger.getLogger(MetastoreShim.class);
 
   public static TblTransaction createTblTransaction(
      IMetaStoreClient client, Table tbl, long txnId) {
@@ -484,4 +499,58 @@ public class MetastoreShim {
       throws MetaException {
     return new Path(db.getLocationUri(), tbl.getTableName().toLowerCase()).toString();
   }
+
+  /**
+   * Fire insert events asynchronously. This creates a single thread to execute the
+   * fireInsertEvent method and shuts down the thread after it has finished.
+   * In case of any exception, we just log the failure of firing insert events.
+   */
+  public static List<Long> fireInsertEvents(MetaStoreClient msClient,
+      List<InsertEventInfo> insertEventInfos, String dbName, String tableName) {
+    ExecutorService fireInsertEventThread = Executors.newSingleThreadExecutor();
+    CompletableFuture.runAsync(() -> {
+      try {
+        fireInsertEventHelper(msClient.getHiveClient(), insertEventInfos, dbName, tableName);
+      } catch (Exception e) {
+        LOG.error("Failed to fire insert event. Some tables might not be"
+                + " refreshed on other impala clusters.", e);
+      } finally {
+        msClient.close();
+      }
+    }, Executors.newSingleThreadExecutor()).thenRun(() ->
+        fireInsertEventThread.shutdown());
+    return Collections.emptyList();
+  }
+
+  /**
+   *  Fires an insert event to HMS notification log. In Hive-2 for partitioned table,
+   *  each existing partition touched by the insert will fire a separate insert event.
+   * @param msClient Metastore client,
+   * @param insertEventInfos A list of insert event encapsulating the information needed
+   * to fire insert
+   * @param dbName
+   * @param tableName
+   */
+  @VisibleForTesting
+  public static void fireInsertEventHelper(IMetaStoreClient msClient,
+      List<InsertEventInfo> insertEventInfos, String dbName, String tableName)
+      throws TException {
+    Preconditions.checkNotNull(msClient);
+    Preconditions.checkNotNull(dbName);
+    Preconditions.checkNotNull(tableName);
+    for (InsertEventInfo info : insertEventInfos) {
+      Preconditions.checkNotNull(info.getNewFiles());
+      LOG.debug("Firing an insert event for " + tableName);
+      FireEventRequestData data = new FireEventRequestData();
+      InsertEventRequestData insertData = new InsertEventRequestData();
+      data.setInsertData(insertData);
+      FireEventRequest rqst = new FireEventRequest(true, data);
+      rqst.setDbName(dbName);
+      rqst.setTableName(tableName);
+      insertData.setFilesAdded(new ArrayList<>(info.getNewFiles()));
+      insertData.setReplace(info.isOverwrite());
+      if (info.getPartVals() != null) rqst.setPartitionVals(info.getPartVals());
+      msClient.fireListenerEvent(rqst);
+    }
+  }
 }
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 43f174c..5981b2c 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -32,6 +32,7 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 
@@ -52,6 +53,10 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
+import org.apache.hadoop.hive.metastore.api.FireEventResponse;
+import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.InvalidInputException;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -83,6 +88,9 @@ import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
 import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
 import org.apache.hive.service.rpc.thrift.TGetTablesReq;
 import org.apache.impala.authorization.User;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
@@ -94,6 +102,7 @@ import org.apache.impala.thrift.TMetadataOpRequest;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.AcidUtils.TblTransaction;
+import org.apache.impala.util.MetaStoreUtil.InsertEventInfo;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
@@ -984,4 +993,67 @@ public class MetastoreShim {
     return wh.getDefaultTablePath(db, tbl.getTableName().toLowerCase(), isExternal)
         .toString();
   }
+
+  /**
+   * Fire insert events for table and partition.
+   * In case of any exception, we just log the failure of firing insert events.
+   */
+  public static List<Long> fireInsertEvents(MetaStoreClient msClient,
+      List<InsertEventInfo> insertEventInfos, String dbName, String tableName) {
+    try {
+      return fireInsertEventHelper(msClient.getHiveClient(), insertEventInfos, dbName, tableName);
+    } catch (Exception e) {
+      LOG.error("Failed to fire insert event. Some tables might not be"
+              + " refreshed on other impala clusters.", e);
+    } finally {
+      msClient.close();
+    }
+    return Collections.emptyList();
+  }
+
+  /**
+   *  Fires an insert event to HMS notification log. In Hive-3 for partitioned table,
+   *  all partition insert events will be fired by a bulk API.
+   *
+   * @param msClient Metastore client,
+   * @param insertEventInfos A list of insert event encapsulating the information needed
+   * to fire insert
+   * @param dbName
+   * @param tableName
+   * @return a list of eventIds for the insert events
+   */
+  @VisibleForTesting
+  public static List<Long> fireInsertEventHelper(IMetaStoreClient msClient,
+      List<InsertEventInfo> insertEventInfos, String dbName, String tableName)
+      throws TException {
+    Preconditions.checkNotNull(msClient);
+    Preconditions.checkNotNull(dbName);
+    Preconditions.checkNotNull(tableName);
+    LOG.debug(String.format(
+        "Firing %s insert event for %s", insertEventInfos.size(), tableName));
+    FireEventRequestData data = new FireEventRequestData();
+    FireEventRequest rqst = new FireEventRequest(true, data);
+    rqst.setDbName(dbName);
+    rqst.setTableName(tableName);
+    List<InsertEventRequestData> insertDatas = new ArrayList<>();
+    for (InsertEventInfo info : insertEventInfos) {
+      InsertEventRequestData insertData = new InsertEventRequestData();
+      Preconditions.checkNotNull(info.getNewFiles());
+      insertData.setFilesAdded(new ArrayList<>(info.getNewFiles()));
+      insertData.setReplace(info.isOverwrite());
+      if (info.getPartVals() != null) insertData.setPartitionVal(info.getPartVals());
+      insertDatas.add(insertData);
+    }
+    if (insertDatas.size() == 1) {
+      if (insertEventInfos.get(0).getPartVals() != null) {
+        rqst.setPartitionVals(insertEventInfos.get(0).getPartVals());
+      }
+      data.setInsertData(insertDatas.get(0));
+    } else {
+      data.setInsertDatas(insertDatas);
+    }
+    FireEventResponse response = msClient.fireListenerEvent(rqst);
+
+    return response.getEventIds();
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index fb7ef4d..87465b6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -817,29 +817,36 @@ public class CatalogServiceCatalog extends Catalog {
    * Evaluates if the information from an event (serviceId and versionNumber) matches to
    * the catalog object. If there is match, the in-flight version for that object is
    * removed and method returns true. If it does not match, returns false
+
    * @param ctx self context which provides all the information needed to
    * evaluate if this is a self-event or not
    * @return true if given event information evaluates to a self-event, false otherwise
    */
-  public boolean evaluateSelfEvent(SelfEventContext ctx)
+  public boolean evaluateSelfEvent(boolean isInsertEvent, SelfEventContext ctx)
       throws CatalogException {
     Preconditions.checkState(isEventProcessingActive(),
         "Event processing should be enabled when calling this method");
-    long versionNumber = ctx.getVersionNumberFromEvent();
+    long versionNumber =
+        isInsertEvent ? ctx.getIdFromEvent() : ctx.getVersionNumberFromEvent();
     String serviceIdFromEvent = ctx.getServiceIdFromEvent();
-    LOG.debug("Input arguments for self-event evaluation: {} {}",versionNumber,
-        serviceIdFromEvent);
-    // no version info or service id in the event
-    if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) {
-      LOG.info("Not a self-event since the given version is {} and service id is {}",
-          versionNumber, serviceIdFromEvent);
-      return false;
-    }
-    // if the service id from event doesn't match with our service id this is not a
-    // self-event
-    if (!getCatalogServiceId().equals(serviceIdFromEvent)) {
-      LOG.info("Not a self-event because service id of this catalog {} does not match "
-          + "with one in event {}.", getCatalogServiceId(), serviceIdFromEvent);
+
+    if (!isInsertEvent) {
+      // no version info or service id in the event
+      if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) {
+        LOG.info("Not a self-event since the given version is {} and service id is {}",
+            versionNumber, serviceIdFromEvent);
+        return false;
+      }
+      // if the service id from event doesn't match with our service id this is not a
+      // self-event
+      if (!getCatalogServiceId().equals(serviceIdFromEvent)) {
+        LOG.info("Not a self-event because service id of this catalog {} does not match "
+                + "with one in event {}.",
+            getCatalogServiceId(), serviceIdFromEvent);
+      }
+    } else if (versionNumber == -1) {
+      // if insert event, we only compare eventId
+      LOG.info("Not a self-event because eventId is {}", versionNumber);
       return false;
     }
     Db db = getDb(ctx.getDbName());
@@ -881,10 +888,11 @@ public class CatalogServiceCatalog extends Catalog {
       List<List<TPartitionKeyValue>> partitionKeyValues = ctx.getPartitionKeyValues();
       // if the partitionKeyValues is null, we look for tbl's in-flight events
       if (partitionKeyValues == null) {
-        boolean removed = tbl.removeFromVersionsForInflightEvents(versionNumber);
+        boolean removed =
+            tbl.removeFromVersionsForInflightEvents(isInsertEvent, versionNumber);
         if (!removed) {
-          LOG.info("Could not find version {} in in-flight event list of table {}",
-              versionNumber, tbl.getFullName());
+          LOG.info("Could not find {} {} in in-flight event list of table {}",
+              isInsertEvent ? "eventId" : "version", versionNumber, tbl.getFullName());
         }
         return removed;
       }
@@ -893,8 +901,9 @@ public class CatalogServiceCatalog extends Catalog {
         for (List<TPartitionKeyValue> partitionKeyValue : partitionKeyValues) {
           HdfsPartition hdfsPartition =
               ((HdfsTable) tbl).getPartitionFromThriftPartitionSpec(partitionKeyValue);
-          if (hdfsPartition == null || !hdfsPartition
-              .removeFromVersionsForInflightEvents(versionNumber)) {
+          if (hdfsPartition == null
+              || !hdfsPartition.removeFromVersionsForInflightEvents(
+                     isInsertEvent, versionNumber)) {
             // even if this is an error condition we should not bail out early since we
             // should clean up the self-event state on the rest of the partitions
             String partName = HdfsTable.constructPartitionName(partitionKeyValue);
@@ -919,15 +928,18 @@ public class CatalogServiceCatalog extends Catalog {
   /**
    * Adds a given version number from the catalog table's list of versions for in-flight
    * events. Applicable only when external event processing is enabled.
-   *
+   * @param isInsertEvent if false add versionNumber for DDL Event, otherwise add eventId
+   * for Insert Event.
    * @param tbl Catalog table
-   * @param versionNumber version number to be added
+   * @param versionNumber when isInsertEvent is true, it is eventId to add
+   * when isInsertEvent is false, it is version number to add
    */
-  public void addVersionsForInflightEvents(Table tbl, long versionNumber) {
+  public void addVersionsForInflightEvents(
+      boolean isInsertEvent, Table tbl, long versionNumber) {
     if (!isEventProcessingActive()) return;
-    tbl.addToVersionsForInflightEvents(versionNumber);
-    LOG.info("Added catalog version {} in table's {} in-flight events",
-        versionNumber, tbl.getFullName());
+    tbl.addToVersionsForInflightEvents(isInsertEvent, versionNumber);
+    LOG.info("Added {} {} in table's {} in-flight events",
+        isInsertEvent ? "eventId" : "catalog version", versionNumber, tbl.getFullName());
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 330227c..8a465eb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -511,7 +511,7 @@ public class Db extends CatalogObjectImpl implements FeDb {
     Preconditions.checkState(dbLock_.isHeldByCurrentThread(),
         "removeFromVersionsForInflightEvents called without getting the db lock for "
             + getName() + " database.");
-    return inFlightEvents_.remove(versionNumber);
+    return inFlightEvents_.remove(false, versionNumber);
   }
 
   /**
@@ -525,7 +525,7 @@ public class Db extends CatalogObjectImpl implements FeDb {
     Preconditions.checkState(dbLock_.isHeldByCurrentThread(),
         "addToVersionsForInFlightEvents called without getting the db lock for "
             + getName() + " database.");
-    if (!inFlightEvents_.add(versionNumber)) {
+    if (!inFlightEvents_.add(false, versionNumber)) {
       LOG.warn(String.format("Could not add version %s to the list of in-flight "
           + "events. This could cause unnecessary database %s invalidation when the "
           + "event is processed", versionNumber, getName()));
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 2069756..e7a5bb8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -852,25 +852,33 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
 
   /**
    * Removes a given version from the in-flight events
-   * @param versionNumber version number to remove
+   * @param isInsertEvent If true, remove eventId from list of eventIds for in-flight
+   * Insert events. If false, remove version number from list of versions for in-flight
+   * DDL events.
+   * @param versionNumber when isInsertEvent is true, it's eventId to remove
+   *                      when isInsertEvent is false, it's version number to remove
    * @return true if the versionNumber was removed, false if it didn't exist
    */
-  public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+  public boolean removeFromVersionsForInflightEvents(
+      boolean isInsertEvent, long versionNumber) {
     Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
         "removeFromVersionsForInflightEvents called without holding the table lock on "
             + "partition " + getPartitionName() + " of table " + table_.getFullName());
-    return inFlightEvents_.remove(versionNumber);
+    return inFlightEvents_.remove(isInsertEvent, versionNumber);
   }
 
   /**
    * Adds a version number to the in-flight events of this partition
-   * @param versionNumber version number to add
+   * @param isInsertEvent if true, add eventId to list of eventIds for in-flight Insert
+   * events if false, add version number to list of versions for in-flight DDL events
+   * @param versionNumber when isInsertEvent is true, it's eventId to add
+   *                      when isInsertEvent is false, it's version number to add
    */
-  public void addToVersionsForInflightEvents(long versionNumber) {
+  public void addToVersionsForInflightEvents(boolean isInsertEvent, long versionNumber) {
     Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
         "addToVersionsForInflightEvents called without holding the table lock on "
             + "partition " + getPartitionName() + " of table " + table_.getFullName());
-    if (!inFlightEvents_.add(versionNumber)) {
+    if (!inFlightEvents_.add(isInsertEvent, versionNumber)) {
       LOG.warn(String.format("Could not add %s version to the partition %s of table %s. "
           + "This could cause unnecessary refresh of the partition when the event is"
           + "received by the Events processor.", versionNumber, getPartitionName(),
@@ -886,13 +894,14 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
    */
   private void addInflightVersionsFromParameters() {
     Preconditions.checkNotNull(hmsParameters_);
-    Preconditions.checkState(inFlightEvents_.size() == 0);
+    Preconditions.checkState(inFlightEvents_.size(false) == 0);
     // we should not check for table lock being held here since there are certain code
     // paths which call this method without holding the table lock (eg. getOrLoadTable())
     if (!hmsParameters_.containsKey(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())) {
       return;
     }
-    inFlightEvents_.add(Long.parseLong(
+    inFlightEvents_.add(false,
+        Long.parseLong(
             hmsParameters_.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())));
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 0c6ac31..8f0f5ad 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -63,7 +63,6 @@ import org.apache.log4j.Logger;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 /**
@@ -801,14 +800,19 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
 
   /**
    * Removes a given version from the collection of version numbers for in-flight events
-   * @param versionNumber version number to remove from the collection
+   * @param isInsertEvent If true, remove eventId from list of eventIds for in-flight
+   * Insert events. If false, remove versionNumber from list of versions for in-flight DDL
+   * events
+   * @param versionNumber when isInsertEvent is true, it's eventId to remove
+   * when isInsertEvent is false, it's version number to remove
    * @return true if version was successfully removed, false if didn't exist
    */
-  public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+  public boolean removeFromVersionsForInflightEvents(
+      boolean isInsertEvent, long versionNumber) {
     Preconditions.checkState(tableLock_.isHeldByCurrentThread(),
         "removeFromVersionsForInFlightEvents called without taking the table lock on "
             + getFullName());
-    return inFlightEvents.remove(versionNumber);
+    return inFlightEvents.remove(isInsertEvent, versionNumber);
   }
 
   /**
@@ -816,16 +820,19 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
    * collection is already at the max size defined by
    * <code>MAX_NUMBER_OF_INFLIGHT_EVENTS</code>, then it ignores the given version and
    * does not add it
-   * @param versionNumber version number to add
+   * @param isInsertEvent if true, add eventId to list of eventIds for in-flight Insert
+   * events. If false, add versionNumber to list of versions for in-flight DDL events
+   * @param versionNumber when isInsertEvent is true, it's eventId to add
+   * when isInsertEvent is false, it's version number to add
    * @return True if version number was added, false if the collection is at its max
    * capacity
    */
-  public void addToVersionsForInflightEvents(long versionNumber) {
+  public void addToVersionsForInflightEvents(boolean isInsertEvent, long versionNumber) {
     // we generally don't take locks on Incomplete tables since they are atomically
     // replaced during load
     Preconditions.checkState(
         this instanceof IncompleteTable || tableLock_.isHeldByCurrentThread());
-    if (!inFlightEvents.add(versionNumber)) {
+    if (!inFlightEvents.add(isInsertEvent, versionNumber)) {
       LOG.warn(String.format("Could not add %s version to the table %s. This could "
           + "cause unnecessary refresh of the table when the event is received by the "
               + "Events processor.", versionNumber, getFullName()));
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java
index 1aed6d1..11d38f6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java
@@ -34,41 +34,66 @@ public class InFlightEvents {
   // maximum number of catalog versions to store for in-flight events for this table
   private static final int DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS = 10;
 
+  // maximum number of eventIds to store for in-flight events for this table
+  private static final int DEFAULT_MAX_NUMBER_OF_INFLIGHT_INSERT_EVENTS = 100;
+
   private static final Logger LOG = LoggerFactory.getLogger(InFlightEvents.class);
-  // FIFO list of versions for all the in-flight metastore events in this table
+  // FIFO list of versions for all the in-flight metastore DDL events in this table
   // This queue can only grow up to MAX_NUMBER_OF_INFLIGHT_EVENTS size. Anything which
   // is attempted to be added to this list when its at maximum capacity is ignored
   private final LinkedList<Long> versionsForInflightEvents_ = new LinkedList<>();
 
+  // FIFO list of eventIds for all the in-flight metastore Insert events in this table
+  // This queue can only grow up to MAX_NUMBER_OF_INFLIGHT_INSERT_EVENTS size. Anything
+  // which is attempted to be added to this list when its at maximum capacity is ignored
+  private final LinkedList<Long> idsForInflightDmlEvents_ = new LinkedList<>();
+
   // maximum number of versions to store
-  private final int capacity_;
+  private final int capacity_for_versions_;
+
+  // maximum number of eventIds to store
+  private final int capacity_for_eventIds_;
 
   public InFlightEvents() {
-    this.capacity_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS;
+    this.capacity_for_versions_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS;
+    this.capacity_for_eventIds_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_INSERT_EVENTS;
   }
 
   public InFlightEvents(int capacity) {
     Preconditions.checkState(capacity > 0);
-    this.capacity_ = capacity;
+    this.capacity_for_versions_ = capacity;
+    this.capacity_for_eventIds_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_INSERT_EVENTS;
   }
-
   /**
    * Gets the current list of versions for in-flight events for this table
+   * @param isInsertEvent if true, return list of eventIds for in-flight Insert events
+   * if false, return list of versions for in-flight DDL events
    */
-  public List<Long> getAll() {
-    return ImmutableList.copyOf(versionsForInflightEvents_);
+  public List<Long> getAll(boolean isInsertEvent) {
+    if (isInsertEvent) {
+      return ImmutableList.copyOf(idsForInflightDmlEvents_);
+    } else {
+      return ImmutableList.copyOf(versionsForInflightEvents_);
+    }
   }
 
   /**
    * Removes a given version from the collection of version numbers for in-flight
    * events.
-   *
-   * @param versionNumber version number to remove from the collection
+   * @param isInsertEvent If true, remove eventId from list of eventIds for in-flight
+   * Insert events. If false, remove version number from list of versions for in-flight
+   * DDL events.
+   * @param versionNumber when isInsertEvent is true, it's eventId to remove
+   * when isInsertEvent is false, it's version number to remove
    * @return true if the version was found and successfully removed, false
    * otherwise
    */
-  public boolean remove(long versionNumber) {
-    return versionsForInflightEvents_.remove(versionNumber);
+  public boolean remove(boolean isInsertEvent, long versionNumber) {
+    if (isInsertEvent) {
+      return idsForInflightDmlEvents_.remove(versionNumber);
+    } else {
+      return versionsForInflightEvents_.remove(versionNumber);
+    }
   }
 
   /**
@@ -76,23 +101,45 @@ public class InFlightEvents {
    * collection is already at the max size defined by
    * <code>MAX_NUMBER_OF_INFLIGHT_EVENTS</code>, then it ignores the given version and
    * does not add it
-   *
-   * @param versionNumber version number to add
+   * @param isInsertEvent If true, add eventId to list of eventIds for in-flight Insert
+   * events. If false, add versionNumber to list of versions for in-flight DDL events.
+   * @param versionNumber when isInsertEvent is true, it's eventId to add
+   * when isInsertEvent is false, it's version number to add
    * @return True if version number was added, false if the collection is at its max
    * capacity
    */
-  public boolean add(long versionNumber) {
-    if (versionsForInflightEvents_.size() == capacity_) {
-      LOG.warn(String.format("Number of versions to be stored is at "
-              + " its max capacity %d. Ignoring add request for version number %d.",
-          DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber));
-      return false;
+  public boolean add(boolean isInsertEvent, long versionNumber) {
+    if (isInsertEvent) {
+      if (idsForInflightDmlEvents_.size() == capacity_for_eventIds_) {
+        LOG.warn(String.format("Number of Insert events to be stored is at "
+                + " its max capacity %d. Ignoring add request for eventId %d.",
+            DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber));
+        return false;
+      }
+      idsForInflightDmlEvents_.add(versionNumber);
+    } else {
+      if (versionsForInflightEvents_.size() == capacity_for_versions_) {
+        LOG.warn(String.format("Number of DDL events to be stored is at "
+                + "its max capacity %d. Ignoring add request for version %d.",
+            DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber));
+        return false;
+      }
+      versionsForInflightEvents_.add(versionNumber);
     }
-    versionsForInflightEvents_.add(versionNumber);
     return true;
   }
 
-  public int size() {
-    return versionsForInflightEvents_.size();
+  /**
+   * Get the size of in-flight DDL or DML events list
+   * @param isInsertEvent if true, return size of Insert events list
+   *              if false, return size of DDL events list
+   * @return size of events list
+   */
+  public int size(boolean isInsertEvent) {
+    if (isInsertEvent) {
+      return idsForInflightDmlEvents_.size();
+    } else {
+      return versionsForInflightEvents_.size();
+    }
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 2be56e9..e64578b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -269,7 +269,7 @@ public class MetastoreEvents {
     protected final String tblName_;
 
     // eventId of the event. Used instead of calling getter on event_ everytime
-    protected final long eventId_;
+    protected long eventId_;
 
     // eventType from the NotificationEvent
     protected final MetastoreEventType eventType_;
@@ -419,13 +419,15 @@ public class MetastoreEvents {
      * serviceId and version. More details on complete flow of self-event handling
      * logic can be read in <code>MetastoreEventsProcessor</code> documentation.
      *
+     * @param isInsertEvent if true, check in flight events list of Insert event
+     * if false, check events list of DDL
      * @return True if this event is a self-generated event. If the returned value is
      * true, this method also clears the version number from the catalog database/table.
      * Returns false if the version numbers or service id don't match
      */
-    protected boolean isSelfEvent() {
+    protected boolean isSelfEvent(boolean isInsertEvent) {
       try {
-        if (catalog_.evaluateSelfEvent(getSelfEventContext())) {
+        if (catalog_.evaluateSelfEvent(isInsertEvent, getSelfEventContext())) {
           metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS).inc();
           return true;
         }
@@ -434,6 +436,8 @@ public class MetastoreEvents {
       }
       return false;
     }
+
+    protected boolean isSelfEvent() { return isSelfEvent(false); }
   }
 
   public static String getStringProperty(
@@ -766,19 +770,25 @@ public class MetastoreEvents {
 
     @Override
     public SelfEventContext getSelfEventContext() {
-      throw new UnsupportedOperationException("Self-event evaluation is not implemented"
-          + " for insert event type");
+      if (insertPartition_ != null) {
+        // create selfEventContext for insert partition event
+        List<TPartitionKeyValue> tPartSpec =
+            getTPartitionSpecFromHmsPartition(msTbl_, insertPartition_);
+        return new SelfEventContext(dbName_, tblName_, Arrays.asList(tPartSpec),
+            insertPartition_.getParameters(), eventId_);
+      } else {
+        // create selfEventContext for insert table event
+        return new SelfEventContext(
+            dbName_, tblName_, null, msTbl_.getParameters(), eventId_);
+      }
     }
 
-    /**
-     * Currently we do not check for self-events in Inserts. Existing self-events logic
-     * cannot be used for insert events since firing insert event does not allow us to
-     * modify table parameters in HMS. Hence, we cannot get CatalogServiceIdentifiers in
-     * Insert Events.
-     * TODO: Handle self-events for insert case.
-     */
     @Override
     public void process() throws MetastoreNotificationException {
+      if (isSelfEvent(true)) {
+        infoLog("Not processing the event as it is a self-event");
+        return;
+      }
       // Reload the whole table if it's a transactional table.
       if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
         insertPartition_ = null;
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java b/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
index 83aa69b..10dd4eb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
@@ -32,15 +32,21 @@ import org.apache.impala.thrift.TPartitionKeyValue;
 public class SelfEventContext {
   private final String dbName_;
   private final String tblName_;
+  private final long insertEventId_;
   // version number from the event object parameters used for self-event detection
   private final long versionNumberFromEvent_;
   // service id from the event object parameters used for self-event detection
-  private final String serviceIdFromEvent_;
+  private final String serviceidFromEvent_;
   private final List<List<TPartitionKeyValue>> partitionKeyValues_;
 
   SelfEventContext(String dbName, String tblName,
       Map<String, String> parameters) {
-    this(dbName, tblName, null, parameters);
+    this(dbName, tblName, null, parameters, -1);
+  }
+
+  SelfEventContext(String dbName, String tblName,
+      List<List<TPartitionKeyValue>> partitionKeyValues, Map<String, String> parameters) {
+    this(dbName, tblName, partitionKeyValues, parameters, -1);
   }
 
   /**
@@ -55,17 +61,17 @@ public class SelfEventContext {
    */
   SelfEventContext(String dbName, @Nullable String tblName,
       @Nullable List<List<TPartitionKeyValue>> partitionKeyValues,
-      Map<String, String> parameters) {
+      Map<String, String> parameters, long eventId) {
     Preconditions.checkNotNull(parameters);
     this.dbName_ = Preconditions.checkNotNull(dbName);
     this.tblName_ = tblName;
     this.partitionKeyValues_ = partitionKeyValues;
+    insertEventId_ = eventId;
     versionNumberFromEvent_ = Long.parseLong(
         MetastoreEvents.getStringProperty(parameters,
             MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
-    serviceIdFromEvent_ =
-        MetastoreEvents.getStringProperty(parameters,
-            MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
+    serviceidFromEvent_ = MetastoreEvents.getStringProperty(
+        parameters, MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
   }
 
   public String getDbName() {
@@ -76,13 +82,13 @@ public class SelfEventContext {
     return tblName_;
   }
 
+  public long getIdFromEvent() { return insertEventId_; }
+
   public long getVersionNumberFromEvent() {
     return versionNumberFromEvent_;
   }
 
-  public String getServiceIdFromEvent() {
-    return serviceIdFromEvent_;
-  }
+  public String getServiceIdFromEvent() { return serviceidFromEvent_; }
 
   public List<List<TPartitionKeyValue>> getPartitionKeyValues() {
     return partitionKeyValues_ == null ?
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index c3957b9..c62ac6f 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -126,6 +126,8 @@ public class BackendConfig {
     return backendCfg_.hms_event_polling_interval_s;
   }
 
+  public boolean isInsertEventsEnabled() { return backendCfg_.enable_insert_events; }
+
   public boolean isOrcScannerEnabled() {
     return backendCfg_.enable_orc_scanner;
   }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 63a7b81..3ac2a05 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -711,7 +711,7 @@ public class CatalogOpExecutor {
             // pre-existing there is no alter table event generated. Hence we should
             // only add the versions for in-flight events when we are sure that the
             // partition was really added.
-            catalog_.addVersionsForInflightEvents(tbl, newCatalogVersion);
+            catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
             addTableToCatalogUpdate(refreshedTable, response.result);
           }
           reloadMetadata = false;
@@ -851,7 +851,7 @@ public class CatalogOpExecutor {
             reloadTableSchema, null, "ALTER TABLE " + params.getAlter_type().name());
         // now that HMS alter operation has succeeded, add this version to list of
         // inflight events in catalog table if event processing is enabled
-        catalog_.addVersionsForInflightEvents(tbl, newCatalogVersion);
+        catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
         addTableToCatalogUpdate(tbl, response.result);
       }
     } finally {
@@ -3098,7 +3098,7 @@ public class CatalogOpExecutor {
           "'%s' and the new table name '%s' may fix the problem." , tableName.toString(),
           newTableName.toString()));
     }
-    catalog_.addVersionsForInflightEvents(result.second, newCatalogVersion);
+    catalog_.addVersionsForInflightEvents(false, result.second, newCatalogVersion);
     // TODO(todd): if client is a 'v2' impalad, only send back invalidation
     response.result.addToRemoved_catalog_objects(result.first.toMinimalTCatalogObject());
     response.result.addToUpdated_catalog_objects(result.second.toTCatalogObject());
@@ -3678,7 +3678,7 @@ public class CatalogOpExecutor {
     // catalog service identifiers
     if (catalog_.getCatalogServiceId().equals(serviceId)) {
       Preconditions.checkNotNull(version);
-      hdfsPartition.addToVersionsForInflightEvents(Long.parseLong(version));
+      hdfsPartition.addToVersionsForInflightEvents(false, Long.parseLong(version));
     }
   }
 
@@ -4430,7 +4430,7 @@ public class CatalogOpExecutor {
   }
 
   /**
-   * Populates insert event data and calls fireInsertEventAysnc() if external event
+   * Populates insert event data and calls fireInsertEvents() if external event
    * processing is enabled. This is no-op if event processing is disabled or there are
    * no existing partitions affected by this insert.
    *
@@ -4440,11 +4440,16 @@ public class CatalogOpExecutor {
    */
   private void createInsertEvents(Table table,
       List<FeFsPartition> affectedExistingPartitions, boolean isInsertOverwrite) {
-    if (!catalog_.isEventProcessingActive() ||
-        affectedExistingPartitions.size() == 0) return;
+    boolean isInsertEventsEnabled = BackendConfig.INSTANCE.isInsertEventsEnabled();
+    if (!catalog_.isEventProcessingActive() || !isInsertEventsEnabled
+        || affectedExistingPartitions.size() == 0) {
+      return;
+    }
 
     // List of all insert events that we call HMS fireInsertEvent() on.
     List<InsertEventInfo> insertEventInfos = new ArrayList<>();
+    // List of all partitions that we insert into
+    List<HdfsPartition> partitions = new ArrayList<>();
 
     // Map of partition names to file names of all existing partitions touched by the
     // insert.
@@ -4471,58 +4476,54 @@ public class CatalogOpExecutor {
       // Find the delta of the files added by the insert if it is not an overwrite
       // operation. HMS fireListenerEvent() expects an empty list if no new files are
       // added or if the operation is an insert overwrite.
+      HdfsPartition hdfsPartition = (HdfsPartition) part;
       Set<String> deltaFiles = new HashSet<>();
       List<String> partVals = null;
       if (!isInsertOverwrite) {
-        String partitionName = part.getPartitionName() + "/";
+        String partitionName = hdfsPartition.getPartitionName() + "/";
         Set<String> filesPostInsert =
             partitionFilesMapPostInsert.get(partitionName);
         if (table.getNumClusteringCols() > 0) {
           Set<String> filesBeforeInsert =
               partitionFilesMapBeforeInsert.get(partitionName);
           deltaFiles = Sets.difference(filesBeforeInsert, filesPostInsert);
-          partVals = part.getPartitionValuesAsStrings(true);
+          partVals = hdfsPartition.getPartitionValuesAsStrings(true);
         } else {
           Map.Entry<String, Set<String>> entry =
               partitionFilesMapBeforeInsert.entrySet().iterator().next();
           deltaFiles = Sets.difference(entry.getValue(), filesPostInsert);
         }
         LOG.info("{} new files detected for table {} partition {}.",
-            filesPostInsert.size(), table.getTableName(), part.getPartitionName());
+            filesPostInsert.size(), table.getTableName(),
+            hdfsPartition.getPartitionName());
       }
       if (deltaFiles != null || isInsertOverwrite) {
         // Collect all the insert events.
-        insertEventInfos.add(new InsertEventInfo(table.getDb().getName(),
-            table.getName(), partVals, deltaFiles, isInsertOverwrite));
+        insertEventInfos.add(
+            new InsertEventInfo(partVals, deltaFiles, isInsertOverwrite));
+        if (partVals != null) {
+          // insert into partition
+          partitions.add(hdfsPartition);
+        }
       } else {
         LOG.info("No new files were created, and is not a replace. Skipping "
             + "generating INSERT event.");
       }
     }
 
-    // Firing insert events by making calls to HMS APIs can be slow for tables with
-    // large number of partitions. Hence, we fire the insert events asynchronously.
-    fireInsertEventsAsync(insertEventInfos);
-  }
-
-  /**
-   * Helper method to fire insert events asynchronously. This creates a single thread
-   * to execute the fireInsertEvent method and shuts down the thread after it has
-   * finished. In case of any exception, we just log the failure of firing insert events.
-   */
-  private void fireInsertEventsAsync(List<InsertEventInfo> insertEventInfos) {
-    ExecutorService fireInsertEventThread = Executors.newSingleThreadExecutor();
-    CompletableFuture.runAsync(() -> {
-      for (InsertEventInfo info : insertEventInfos) {
-        try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
-          MetaStoreUtil.fireInsertEvent(metaStoreClient.getHiveClient(), info);
-        } catch (Exception e) {
-          LOG.error("Failed to fire insert event. Some tables might not be"
-              + " refreshed on other impala clusters.", e);
+    MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient();
+    List<Long> eventIds = MetastoreShim.fireInsertEvents(metaStoreClient,
+        insertEventInfos, table.getDb().getName(), table.getName());
+    if (!eventIds.isEmpty()) {
+      if (partitions.size() == 0) { // insert into table
+        catalog_.addVersionsForInflightEvents(true, table, eventIds.get(0));
+      } else { // insert into partition
+        for (int par_idx = 0; par_idx < partitions.size(); par_idx++) {
+          partitions.get(par_idx).addToVersionsForInflightEvents(
+              true, eventIds.get(par_idx));
         }
       }
-    }, Executors.newSingleThreadExecutor()).thenRun(() ->
-        fireInsertEventThread.shutdown());
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
index 7bc463f..81b62f6 100644
--- a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
@@ -30,9 +30,6 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.FireEventRequest;
-import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
-import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
@@ -317,13 +314,18 @@ public class MetaStoreUtil {
   }
 
   /**
+   * Check if the hms table is a bucketed table or not
+   */
+  public static boolean isBucketedTable(Table msTbl) {
+    Preconditions.checkNotNull(msTbl);
+    return msTbl.getSd().getNumBuckets() > 0;
+  }
+
+  /**
    * A helper class that encapsulates all the information needed to fire and insert event
    * with HMS.
    */
   public static class InsertEventInfo {
-    private String dbName;
-    private String tableName;
-
     // List of partition values corresponding to the partition keys in
     // a partitioned table. This is null for non-partitioned table.
     private List<String> partVals;
@@ -337,48 +339,15 @@ public class MetaStoreUtil {
     // false otherwise.
     private boolean isOverwrite;
 
-    public InsertEventInfo(String dbName, String tableName, List<String> partVals,
-        Collection<String> newFiles, boolean isOverwrite) {
-      this.dbName = dbName;
-      this.tableName = tableName;
+    public InsertEventInfo(
+        List<String> partVals, Collection<String> newFiles, boolean isOverwrite) {
       this.partVals = partVals;
       this.newFiles = newFiles;
       this.isOverwrite = isOverwrite;
     }
-  }
 
-  /**
-   *  Fires an insert event to HMS notification log. For partitioned table, each
-   *  existing partition touched by the insert will fire a separate insert event.
-   *
-   * @param msClient Metastore client,
-   * @param info A singe insert event encapsulating the information needed to fire insert
-   * event with HMS.
-   */
-  public static void fireInsertEvent(IMetaStoreClient msClient,
-      InsertEventInfo info) throws TException {
-    Preconditions.checkNotNull(msClient);
-    Preconditions.checkNotNull(info.dbName);
-    Preconditions.checkNotNull(info.tableName);
-    Preconditions.checkNotNull(info.newFiles);
-    LOG.debug("Firing an insert event for {}", info.tableName);
-    FireEventRequestData data = new FireEventRequestData();
-    InsertEventRequestData insertData = new InsertEventRequestData();
-    data.setInsertData(insertData);
-    FireEventRequest rqst = new FireEventRequest(true, data);
-    rqst.setDbName(info.dbName);
-    rqst.setTableName(info.tableName);
-    insertData.setFilesAdded(new ArrayList<>(info.newFiles));
-    insertData.setReplace(info.isOverwrite);
-    if (info.partVals != null) rqst.setPartitionVals(info.partVals);
-    msClient.fireListenerEvent(rqst);
-  }
-
-  /**
-   * Check if the hms table is a bucketed table or not
-   */
-  public static boolean isBucketedTable(Table msTbl) {
-    Preconditions.checkNotNull(msTbl);
-    return msTbl.getSd().getNumBuckets() > 0;
+    public List<String> getPartVals() { return this.partVals; }
+    public Collection<String> getNewFiles() { return this.newFiles; }
+    public boolean isOverwrite() { return this.isOverwrite; }
   }
 }
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 41995a8..ba4a0b5 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -90,6 +91,7 @@ import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.thrift.TAlterDbParams;
 import org.apache.impala.thrift.TAlterDbSetOwnerParams;
 import org.apache.impala.thrift.TAlterDbType;
@@ -107,6 +109,7 @@ import org.apache.impala.thrift.TAlterTableSetRowFormatParams;
 import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams;
 import org.apache.impala.thrift.TAlterTableType;
 import org.apache.impala.thrift.TAlterTableUpdateStatsParams;
+import org.apache.impala.thrift.TCatalogServiceRequestHeader;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TColumnType;
 import org.apache.impala.thrift.TCreateDbParams;
@@ -134,12 +137,14 @@ import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.thrift.TTypeNode;
 import org.apache.impala.thrift.TTypeNodeType;
 import org.apache.impala.thrift.TUniqueId;
+import org.apache.impala.thrift.TUpdateCatalogRequest;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.MetaStoreUtil.InsertEventInfo;
 import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -712,6 +717,67 @@ public class MetastoreEventsProcessorTest {
   }
 
   /**
+   * Test insert from impala. Insert into table and partition from impala
+   * should be treated as self-event.
+   */
+  @Test
+  public void testInsertFromImpala() throws ImpalaException {
+    Assume.assumeTrue("Skipping this test because it only works with Hive-3 or greater",
+        TestUtils.getHiveMajorVersion() >= 3);
+    // Test insert into multiple partitions
+    createDatabaseFromImpala(TEST_DB_NAME, null);
+    String tableToInsertPart = "tbl_with_mul_part";
+    createTableFromImpala(TEST_DB_NAME, tableToInsertPart, true);
+    String tableToInsertMulPart = "tbl_to_insert_mul_part";
+    createTableFromImpala(TEST_DB_NAME, tableToInsertMulPart, true);
+    // add first partition
+    TPartitionDef partitionDef = new TPartitionDef();
+    partitionDef.addToPartition_spec(new TPartitionKeyValue("p1", "1"));
+    partitionDef.addToPartition_spec(new TPartitionKeyValue("p2", "100"));
+    alterTableAddPartition(TEST_DB_NAME, tableToInsertPart, partitionDef);
+    alterTableAddPartition(TEST_DB_NAME, tableToInsertMulPart, partitionDef);
+    // add second partition
+    partitionDef = new TPartitionDef();
+    partitionDef.addToPartition_spec(new TPartitionKeyValue("p1", "1"));
+    partitionDef.addToPartition_spec(new TPartitionKeyValue("p2", "200"));
+    alterTableAddPartition(TEST_DB_NAME, tableToInsertPart, partitionDef);
+    alterTableAddPartition(TEST_DB_NAME, tableToInsertMulPart, partitionDef);
+    eventsProcessor_.processEvents();
+    // count self event from here, numberOfSelfEventsBefore=4 as we have 4 ADD PARTITION
+    // events
+    long numberOfSelfEventsBefore =
+        eventsProcessor_.getMetrics()
+            .getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS)
+            .getCount();
+    // insert into partition
+    insertFromImpala(tableToInsertPart, true, "1", "100");
+    insertFromImpala(tableToInsertPart, true, "1", "200");
+    // insert into multiple partition
+    Set<String> created_partitions = new HashSet<String>();
+    String partition1 = "p1=1/p2=100/";
+    String partition2 = "p1=1/p2=200/";
+    created_partitions.add(partition1);
+    created_partitions.add(partition2);
+    insertMulPartFromImpala(tableToInsertMulPart, tableToInsertPart, created_partitions);
+    eventsProcessor_.processEvents();
+
+    // Test insert into table
+    String tableToInsert = "tbl_to_insert";
+    createTableFromImpala(TEST_DB_NAME, tableToInsert, false);
+    insertFromImpala(tableToInsert, false, "", "");
+    eventsProcessor_.processEvents();
+
+    long selfEventsCountAfter =
+        eventsProcessor_.getMetrics()
+            .getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS)
+            .getCount();
+    // 2 single insert partition events, 1 multi insert partitions which includes 2 single
+    // insert events 1 single insert table event
+    assertEquals("Unexpected number of self-events generated",
+        numberOfSelfEventsBefore + 5, selfEventsCountAfter);
+  }
+
+  /**
    * Test generates a sequence of create_table, insert and drop_table in the event stream
    * to make sure when the insert event is processed on a removed table, it doesn't cause
    * any issues with the event processing.
@@ -835,9 +901,10 @@ public class MetastoreEventsProcessorTest {
     List <String> newFiles = addFilesToDirectory(parentPath, "testFile.",
         totalNumberOfFilesToAdd, isOverwrite);
     try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
-      MetaStoreUtil.fireInsertEvent(metaStoreClient.getHiveClient(),
-          new InsertEventInfo(msTbl.getDbName(), msTbl.getTableName(), null,
-          newFiles, isOverwrite));
+      List<InsertEventInfo> insertEventInfos = new ArrayList<>();
+      insertEventInfos.add(new InsertEventInfo(null, newFiles, isOverwrite));
+      MetastoreShim.fireInsertEventHelper(metaStoreClient.getHiveClient(),
+          insertEventInfos, msTbl.getDbName(), msTbl.getTableName());
     }
   }
 
@@ -2711,6 +2778,57 @@ public class MetastoreEventsProcessorTest {
     catalogOpExecutor_.execDdlRequest(req);
   }
 
+  /**
+   * Insert multiple partitions into table from Impala
+   */
+  private void insertMulPartFromImpala(String tblName1, String tblName2,
+      Set<String> created_partitions) throws ImpalaException {
+    String insert_mul_part = String.format(
+        "insert into table %s partition(p1, p2) select * from %s", tblName1, tblName2);
+    TUpdateCatalogRequest testInsertRequest = createTestTUpdateCatalogRequest(
+        TEST_DB_NAME, tblName1, insert_mul_part, created_partitions);
+    catalogOpExecutor_.updateCatalog(testInsertRequest);
+  }
+
+  /**
+   * Insert into table or partition from Impala
+   * @param tblName
+   * @param isPartitioned
+   * @return
+   */
+  private void insertFromImpala(String tblName, boolean isPartitioned, String p1val,
+      String p2val) throws ImpalaException {
+    String partition = String.format("partition (p1=%s, p2='%s')", p1val, p2val);
+    String test_insert_tbl = String.format("insert into table %s %s values ('a','aa') ",
+        tblName, isPartitioned ? partition : "");
+    Set<String> created_partitions = new HashSet<String>();
+    String created_part_str =
+        isPartitioned ? String.format("p1=%s/p2=%s/", p1val, p2val) : "";
+    created_partitions.add(created_part_str);
+    TUpdateCatalogRequest testInsertRequest = createTestTUpdateCatalogRequest(
+        TEST_DB_NAME, tblName, test_insert_tbl, created_partitions);
+    catalogOpExecutor_.updateCatalog(testInsertRequest);
+  }
+
+  /**
+   * Create DML request to Catalog
+   * @param dBName
+   * @param tableName
+   * @param redacted_sql_stmt
+   * @param created_partitions
+   * @return
+   */
+  private TUpdateCatalogRequest createTestTUpdateCatalogRequest(String dBName,
+      String tableName, String redacted_sql_stmt, Set<String> created_partitions) {
+    TUpdateCatalogRequest tUpdateCatalogRequest = new TUpdateCatalogRequest();
+    tUpdateCatalogRequest.setDb_name(dBName);
+    tUpdateCatalogRequest.setTarget_table(tableName);
+    tUpdateCatalogRequest.setCreated_partitions((created_partitions));
+    tUpdateCatalogRequest.setHeader(new TCatalogServiceRequestHeader());
+    tUpdateCatalogRequest.getHeader().setRedacted_sql_stmt(redacted_sql_stmt);
+    return tUpdateCatalogRequest;
+  }
+
   private TColumn getScalarColumn(String colName, TPrimitiveType type) {
     TTypeNode tTypeNode = new TTypeNode(TTypeNodeType.SCALAR);
     tTypeNode.setScalar_type(new TScalarType(type));
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index f39f55f..cdd1a48 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -21,6 +21,7 @@ import pytest
 
 from tests.common.skip import SkipIfHive2
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.environ import HIVE_MAJOR_VERSION
 from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
 from tests.util.hive_utils import HiveDbWrapper
 from tests.util.event_processor_utils import EventProcessorUtils
@@ -268,11 +269,6 @@ class TestEventProcessing(CustomClusterTestSuite):
           # add partition
           "alter table {0}.{1} add if not exists partition (year=1111, month=1)".format(
             db_name, tbl2),
-          # insert into a existing partition; generates ALTER_PARTITION
-          # TODO add support for insert_events (IMPALA-8632)
-          # "insert into table {0}.{1} partition (year, month) "
-          # "select * from functional.alltypessmall where year=2009 and month=1".format(
-          #  db_name, tbl2),
           # compute stats will generates ALTER_PARTITION
           "compute stats {0}.{1}".format(db_name, tbl2),
           "alter table {0}.{1} recover partitions".format(db_name, recover_tbl_name)],
@@ -295,6 +291,11 @@ class TestEventProcessing(CustomClusterTestSuite):
           "alter table {0}.{1} drop if exists partition (year=2100, month=1)".format(
             db_name, tbl_name)]
     }
+    if HIVE_MAJOR_VERSION >= 3:
+      # insert into a existing partition; generates INSERT events
+      self_event_test_queries[True].append("insert into table {0}.{1} partition "
+          "(year, month) select * from functional.alltypessmall where year=2009 "
+          "and month=1".format(db_name, tbl2))
     return self_event_test_queries
 
   def __get_hive_test_queries(self, db_name, recover_tbl_name):