You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by vi...@apache.org on 2020/04/09 04:59:24 UTC

[impala] branch master updated (49cdd78 -> e0ed7d3)

This is an automated email from the ASF dual-hosted git repository.

vihangk1 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 49cdd78  IMPALA-8857: Fix flaky Kudu tests with external inserts
     new 0f85cbd  IMPALA-9602: Fix case-sensitivity for local catalog
     new e0ed7d3  IMPALA-8632: Add support for self-event detection for insert events

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/common/global-flags.cc                      |   6 +
 be/src/util/backend-gflag-util.cc                  |   2 +
 common/thrift/BackendGflags.thrift                 |   2 +
 .../org/apache/impala/compat/MetastoreShim.java    |  69 ++++++++++++
 .../org/apache/impala/compat/MetastoreShim.java    |  72 ++++++++++++
 .../impala/catalog/CatalogServiceCatalog.java      |  64 ++++++-----
 fe/src/main/java/org/apache/impala/catalog/Db.java |   4 +-
 .../org/apache/impala/catalog/HdfsPartition.java   |  25 +++--
 .../main/java/org/apache/impala/catalog/Table.java |  21 ++--
 .../impala/catalog/events/InFlightEvents.java      |  91 +++++++++++----
 .../impala/catalog/events/MetastoreEvents.java     |  34 ++++--
 .../impala/catalog/events/SelfEventContext.java    |  24 ++--
 .../impala/catalog/local/CatalogdMetaProvider.java |  14 +--
 .../org/apache/impala/service/BackendConfig.java   |   2 +
 .../apache/impala/service/CatalogOpExecutor.java   |  67 +++++------
 .../java/org/apache/impala/util/MetaStoreUtil.java |  57 +++-------
 .../events/MetastoreEventsProcessorTest.java       | 124 ++++++++++++++++++++-
 .../catalog/local/CatalogdMetaProviderTest.java    |  59 ++++++++++
 tests/custom_cluster/test_event_processing.py      |  11 +-
 19 files changed, 569 insertions(+), 179 deletions(-)


[impala] 02/02: IMPALA-8632: Add support for self-event detection for insert events

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vihangk1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e0ed7d321c1dca85a0f1482842f8db6db517c909
Author: xiaomeng <xi...@cloudera.com>
AuthorDate: Thu Feb 20 16:32:48 2020 -0800

    IMPALA-8632: Add support for self-event detection for insert events
    
    In case of INSERT_EVENTS if Impala inserts into a table it causes a
    refresh to the underlying table/partition. This could be unnecessary
    when there is only one Impala cluster in the system.
    We can detect a self-event in such cases when the HMS API to fire a
    listener event returns the event id. This is used by EventProcessor
    to ignore the event when it is fetched later in the next polling cycle.
    
    Testing:
    Add testInsertFromImpala() in MetastoreEventsProcessorTest.java to test
    insert event self-event detection when insert into table and partition.
    
    Change-Id: I7873fbb2c159343690f93b9d120f6b425b983dcf
    Reviewed-on: http://gerrit.cloudera.org:8080/15648
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc                      |   6 +
 be/src/util/backend-gflag-util.cc                  |   2 +
 common/thrift/BackendGflags.thrift                 |   2 +
 .../org/apache/impala/compat/MetastoreShim.java    |  69 ++++++++++++
 .../org/apache/impala/compat/MetastoreShim.java    |  72 ++++++++++++
 .../impala/catalog/CatalogServiceCatalog.java      |  64 ++++++-----
 fe/src/main/java/org/apache/impala/catalog/Db.java |   4 +-
 .../org/apache/impala/catalog/HdfsPartition.java   |  25 +++--
 .../main/java/org/apache/impala/catalog/Table.java |  21 ++--
 .../impala/catalog/events/InFlightEvents.java      |  91 +++++++++++----
 .../impala/catalog/events/MetastoreEvents.java     |  34 ++++--
 .../impala/catalog/events/SelfEventContext.java    |  24 ++--
 .../org/apache/impala/service/BackendConfig.java   |   2 +
 .../apache/impala/service/CatalogOpExecutor.java   |  67 +++++------
 .../java/org/apache/impala/util/MetaStoreUtil.java |  57 +++-------
 .../events/MetastoreEventsProcessorTest.java       | 124 ++++++++++++++++++++-
 tests/custom_cluster/test_event_processing.py      |  11 +-
 17 files changed, 504 insertions(+), 171 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 79084ff..69e856e 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -266,6 +266,12 @@ DEFINE_int32(hms_event_polling_interval_s, 0,
     "feature and not recommended to be deployed on production systems until it is "
     "made generally available.");
 
+DEFINE_bool(enable_insert_events, true,
+    "Enables insert events in the events processor. When this configuration is set to "
+    "true Impala will generate INSERT event types which when received by other Impala "
+    "clusters can be used to automatically refresh the tables or partitions. Event "
+    "processing must be turned on for this flag to have any effect.");
+
 DEFINE_string(blacklisted_dbs, "sys,information_schema",
     "Comma separated list for blacklisted databases. Configure which databases to be "
     "skipped for loading (in startup and global INVALIDATE METADATA). Users can't access,"
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index f3e3755..90122d2 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -67,6 +67,7 @@ DECLARE_int64(exchg_node_buffer_size_bytes);
 DECLARE_int32(kudu_mutation_buffer_size);
 DECLARE_int32(kudu_error_buffer_size);
 DECLARE_int32(hms_event_polling_interval_s);
+DECLARE_bool(enable_insert_events);
 DECLARE_string(authorization_factory_class);
 DECLARE_bool(unlock_mt_dop);
 DECLARE_bool(mt_dop_auto_fallback);
@@ -155,6 +156,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   cfg.__set_kudu_mutation_buffer_size(FLAGS_kudu_mutation_buffer_size);
   cfg.__set_kudu_error_buffer_size(FLAGS_kudu_error_buffer_size);
   cfg.__set_hms_event_polling_interval_s(FLAGS_hms_event_polling_interval_s);
+  cfg.__set_enable_insert_events(FLAGS_enable_insert_events);
   cfg.__set_impala_build_version(::GetDaemonBuildVersion());
   cfg.__set_authorization_factory_class(FLAGS_authorization_factory_class);
   cfg.__set_unlock_mt_dop(FLAGS_unlock_mt_dop);
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 90fad1a..c9b7d47 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -155,4 +155,6 @@ struct TBackendGflags {
   65: required bool use_customized_user_groups_mapper_for_ranger
 
   66: required bool enable_column_masking
+
+  67: required bool enable_insert_events
 }
diff --git a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
index 87e8ecf..39cd4ff 100644
--- a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
@@ -23,8 +23,14 @@ import static org.apache.impala.service.MetadataOp.TABLE_TYPE_VIEW;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
@@ -51,6 +57,9 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
+import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage;
@@ -62,6 +71,9 @@ import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
 import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
 import org.apache.hive.service.rpc.thrift.TGetTablesReq;
 import org.apache.impala.authorization.User;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
@@ -71,6 +83,8 @@ import org.apache.impala.service.MetadataOp;
 import org.apache.impala.thrift.TMetadataOpRequest;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.util.AcidUtils.TblTransaction;
+import org.apache.impala.util.MetaStoreUtil.InsertEventInfo;
+import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
 /**
@@ -78,6 +92,7 @@ import org.apache.thrift.TException;
  * between major versions of Hive. This implements the shimmed methods for Hive 2.
  */
 public class MetastoreShim {
+  private static final Logger LOG = Logger.getLogger(MetastoreShim.class);
 
   public static TblTransaction createTblTransaction(
      IMetaStoreClient client, Table tbl, long txnId) {
@@ -484,4 +499,58 @@ public class MetastoreShim {
       throws MetaException {
     return new Path(db.getLocationUri(), tbl.getTableName().toLowerCase()).toString();
   }
+
+  /**
+   * Fire insert events asynchronously. This creates a single thread to execute the
+   * fireInsertEvent method and shuts down the thread after it has finished.
+   * In case of any exception, we just log the failure of firing insert events.
+   */
+  public static List<Long> fireInsertEvents(MetaStoreClient msClient,
+      List<InsertEventInfo> insertEventInfos, String dbName, String tableName) {
+    ExecutorService fireInsertEventThread = Executors.newSingleThreadExecutor();
+    CompletableFuture.runAsync(() -> {
+      try {
+        fireInsertEventHelper(msClient.getHiveClient(), insertEventInfos, dbName, tableName);
+      } catch (Exception e) {
+        LOG.error("Failed to fire insert event. Some tables might not be"
+                + " refreshed on other impala clusters.", e);
+      } finally {
+        msClient.close();
+      }
+    }, Executors.newSingleThreadExecutor()).thenRun(() ->
+        fireInsertEventThread.shutdown());
+    return Collections.emptyList();
+  }
+
+  /**
+   *  Fires an insert event to HMS notification log. In Hive-2 for partitioned table,
+   *  each existing partition touched by the insert will fire a separate insert event.
+   * @param msClient Metastore client,
+   * @param insertEventInfos A list of insert event encapsulating the information needed
+   * to fire insert
+   * @param dbName
+   * @param tableName
+   */
+  @VisibleForTesting
+  public static void fireInsertEventHelper(IMetaStoreClient msClient,
+      List<InsertEventInfo> insertEventInfos, String dbName, String tableName)
+      throws TException {
+    Preconditions.checkNotNull(msClient);
+    Preconditions.checkNotNull(dbName);
+    Preconditions.checkNotNull(tableName);
+    for (InsertEventInfo info : insertEventInfos) {
+      Preconditions.checkNotNull(info.getNewFiles());
+      LOG.debug("Firing an insert event for " + tableName);
+      FireEventRequestData data = new FireEventRequestData();
+      InsertEventRequestData insertData = new InsertEventRequestData();
+      data.setInsertData(insertData);
+      FireEventRequest rqst = new FireEventRequest(true, data);
+      rqst.setDbName(dbName);
+      rqst.setTableName(tableName);
+      insertData.setFilesAdded(new ArrayList<>(info.getNewFiles()));
+      insertData.setReplace(info.isOverwrite());
+      if (info.getPartVals() != null) rqst.setPartitionVals(info.getPartVals());
+      msClient.fireListenerEvent(rqst);
+    }
+  }
 }
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 43f174c..5981b2c 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -32,6 +32,7 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 
@@ -52,6 +53,10 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
+import org.apache.hadoop.hive.metastore.api.FireEventResponse;
+import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.InvalidInputException;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -83,6 +88,9 @@ import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
 import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
 import org.apache.hive.service.rpc.thrift.TGetTablesReq;
 import org.apache.impala.authorization.User;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
@@ -94,6 +102,7 @@ import org.apache.impala.thrift.TMetadataOpRequest;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.AcidUtils.TblTransaction;
+import org.apache.impala.util.MetaStoreUtil.InsertEventInfo;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
@@ -984,4 +993,67 @@ public class MetastoreShim {
     return wh.getDefaultTablePath(db, tbl.getTableName().toLowerCase(), isExternal)
         .toString();
   }
+
+  /**
+   * Fire insert events for table and partition.
+   * In case of any exception, we just log the failure of firing insert events.
+   */
+  public static List<Long> fireInsertEvents(MetaStoreClient msClient,
+      List<InsertEventInfo> insertEventInfos, String dbName, String tableName) {
+    try {
+      return fireInsertEventHelper(msClient.getHiveClient(), insertEventInfos, dbName, tableName);
+    } catch (Exception e) {
+      LOG.error("Failed to fire insert event. Some tables might not be"
+              + " refreshed on other impala clusters.", e);
+    } finally {
+      msClient.close();
+    }
+    return Collections.emptyList();
+  }
+
+  /**
+   *  Fires an insert event to HMS notification log. In Hive-3 for partitioned table,
+   *  all partition insert events will be fired by a bulk API.
+   *
+   * @param msClient Metastore client,
+   * @param insertEventInfos A list of insert event encapsulating the information needed
+   * to fire insert
+   * @param dbName
+   * @param tableName
+   * @return a list of eventIds for the insert events
+   */
+  @VisibleForTesting
+  public static List<Long> fireInsertEventHelper(IMetaStoreClient msClient,
+      List<InsertEventInfo> insertEventInfos, String dbName, String tableName)
+      throws TException {
+    Preconditions.checkNotNull(msClient);
+    Preconditions.checkNotNull(dbName);
+    Preconditions.checkNotNull(tableName);
+    LOG.debug(String.format(
+        "Firing %s insert event for %s", insertEventInfos.size(), tableName));
+    FireEventRequestData data = new FireEventRequestData();
+    FireEventRequest rqst = new FireEventRequest(true, data);
+    rqst.setDbName(dbName);
+    rqst.setTableName(tableName);
+    List<InsertEventRequestData> insertDatas = new ArrayList<>();
+    for (InsertEventInfo info : insertEventInfos) {
+      InsertEventRequestData insertData = new InsertEventRequestData();
+      Preconditions.checkNotNull(info.getNewFiles());
+      insertData.setFilesAdded(new ArrayList<>(info.getNewFiles()));
+      insertData.setReplace(info.isOverwrite());
+      if (info.getPartVals() != null) insertData.setPartitionVal(info.getPartVals());
+      insertDatas.add(insertData);
+    }
+    if (insertDatas.size() == 1) {
+      if (insertEventInfos.get(0).getPartVals() != null) {
+        rqst.setPartitionVals(insertEventInfos.get(0).getPartVals());
+      }
+      data.setInsertData(insertDatas.get(0));
+    } else {
+      data.setInsertDatas(insertDatas);
+    }
+    FireEventResponse response = msClient.fireListenerEvent(rqst);
+
+    return response.getEventIds();
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index fb7ef4d..87465b6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -817,29 +817,36 @@ public class CatalogServiceCatalog extends Catalog {
    * Evaluates if the information from an event (serviceId and versionNumber) matches to
    * the catalog object. If there is match, the in-flight version for that object is
    * removed and method returns true. If it does not match, returns false
+
    * @param ctx self context which provides all the information needed to
    * evaluate if this is a self-event or not
    * @return true if given event information evaluates to a self-event, false otherwise
    */
-  public boolean evaluateSelfEvent(SelfEventContext ctx)
+  public boolean evaluateSelfEvent(boolean isInsertEvent, SelfEventContext ctx)
       throws CatalogException {
     Preconditions.checkState(isEventProcessingActive(),
         "Event processing should be enabled when calling this method");
-    long versionNumber = ctx.getVersionNumberFromEvent();
+    long versionNumber =
+        isInsertEvent ? ctx.getIdFromEvent() : ctx.getVersionNumberFromEvent();
     String serviceIdFromEvent = ctx.getServiceIdFromEvent();
-    LOG.debug("Input arguments for self-event evaluation: {} {}",versionNumber,
-        serviceIdFromEvent);
-    // no version info or service id in the event
-    if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) {
-      LOG.info("Not a self-event since the given version is {} and service id is {}",
-          versionNumber, serviceIdFromEvent);
-      return false;
-    }
-    // if the service id from event doesn't match with our service id this is not a
-    // self-event
-    if (!getCatalogServiceId().equals(serviceIdFromEvent)) {
-      LOG.info("Not a self-event because service id of this catalog {} does not match "
-          + "with one in event {}.", getCatalogServiceId(), serviceIdFromEvent);
+
+    if (!isInsertEvent) {
+      // no version info or service id in the event
+      if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) {
+        LOG.info("Not a self-event since the given version is {} and service id is {}",
+            versionNumber, serviceIdFromEvent);
+        return false;
+      }
+      // if the service id from event doesn't match with our service id this is not a
+      // self-event
+      if (!getCatalogServiceId().equals(serviceIdFromEvent)) {
+        LOG.info("Not a self-event because service id of this catalog {} does not match "
+                + "with one in event {}.",
+            getCatalogServiceId(), serviceIdFromEvent);
+      }
+    } else if (versionNumber == -1) {
+      // if insert event, we only compare eventId
+      LOG.info("Not a self-event because eventId is {}", versionNumber);
       return false;
     }
     Db db = getDb(ctx.getDbName());
@@ -881,10 +888,11 @@ public class CatalogServiceCatalog extends Catalog {
       List<List<TPartitionKeyValue>> partitionKeyValues = ctx.getPartitionKeyValues();
       // if the partitionKeyValues is null, we look for tbl's in-flight events
       if (partitionKeyValues == null) {
-        boolean removed = tbl.removeFromVersionsForInflightEvents(versionNumber);
+        boolean removed =
+            tbl.removeFromVersionsForInflightEvents(isInsertEvent, versionNumber);
         if (!removed) {
-          LOG.info("Could not find version {} in in-flight event list of table {}",
-              versionNumber, tbl.getFullName());
+          LOG.info("Could not find {} {} in in-flight event list of table {}",
+              isInsertEvent ? "eventId" : "version", versionNumber, tbl.getFullName());
         }
         return removed;
       }
@@ -893,8 +901,9 @@ public class CatalogServiceCatalog extends Catalog {
         for (List<TPartitionKeyValue> partitionKeyValue : partitionKeyValues) {
           HdfsPartition hdfsPartition =
               ((HdfsTable) tbl).getPartitionFromThriftPartitionSpec(partitionKeyValue);
-          if (hdfsPartition == null || !hdfsPartition
-              .removeFromVersionsForInflightEvents(versionNumber)) {
+          if (hdfsPartition == null
+              || !hdfsPartition.removeFromVersionsForInflightEvents(
+                     isInsertEvent, versionNumber)) {
             // even if this is an error condition we should not bail out early since we
             // should clean up the self-event state on the rest of the partitions
             String partName = HdfsTable.constructPartitionName(partitionKeyValue);
@@ -919,15 +928,18 @@ public class CatalogServiceCatalog extends Catalog {
   /**
    * Adds a given version number from the catalog table's list of versions for in-flight
    * events. Applicable only when external event processing is enabled.
-   *
+   * @param isInsertEvent if false add versionNumber for DDL Event, otherwise add eventId
+   * for Insert Event.
    * @param tbl Catalog table
-   * @param versionNumber version number to be added
+   * @param versionNumber when isInsertEvent is true, it is eventId to add
+   * when isInsertEvent is false, it is version number to add
    */
-  public void addVersionsForInflightEvents(Table tbl, long versionNumber) {
+  public void addVersionsForInflightEvents(
+      boolean isInsertEvent, Table tbl, long versionNumber) {
     if (!isEventProcessingActive()) return;
-    tbl.addToVersionsForInflightEvents(versionNumber);
-    LOG.info("Added catalog version {} in table's {} in-flight events",
-        versionNumber, tbl.getFullName());
+    tbl.addToVersionsForInflightEvents(isInsertEvent, versionNumber);
+    LOG.info("Added {} {} in table's {} in-flight events",
+        isInsertEvent ? "eventId" : "catalog version", versionNumber, tbl.getFullName());
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index 330227c..8a465eb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -511,7 +511,7 @@ public class Db extends CatalogObjectImpl implements FeDb {
     Preconditions.checkState(dbLock_.isHeldByCurrentThread(),
         "removeFromVersionsForInflightEvents called without getting the db lock for "
             + getName() + " database.");
-    return inFlightEvents_.remove(versionNumber);
+    return inFlightEvents_.remove(false, versionNumber);
   }
 
   /**
@@ -525,7 +525,7 @@ public class Db extends CatalogObjectImpl implements FeDb {
     Preconditions.checkState(dbLock_.isHeldByCurrentThread(),
         "addToVersionsForInFlightEvents called without getting the db lock for "
             + getName() + " database.");
-    if (!inFlightEvents_.add(versionNumber)) {
+    if (!inFlightEvents_.add(false, versionNumber)) {
       LOG.warn(String.format("Could not add version %s to the list of in-flight "
           + "events. This could cause unnecessary database %s invalidation when the "
           + "event is processed", versionNumber, getName()));
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 2069756..e7a5bb8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -852,25 +852,33 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
 
   /**
    * Removes a given version from the in-flight events
-   * @param versionNumber version number to remove
+   * @param isInsertEvent If true, remove eventId from list of eventIds for in-flight
+   * Insert events. If false, remove version number from list of versions for in-flight
+   * DDL events.
+   * @param versionNumber when isInsertEvent is true, it's eventId to remove
+   *                      when isInsertEvent is false, it's version number to remove
    * @return true if the versionNumber was removed, false if it didn't exist
    */
-  public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+  public boolean removeFromVersionsForInflightEvents(
+      boolean isInsertEvent, long versionNumber) {
     Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
         "removeFromVersionsForInflightEvents called without holding the table lock on "
             + "partition " + getPartitionName() + " of table " + table_.getFullName());
-    return inFlightEvents_.remove(versionNumber);
+    return inFlightEvents_.remove(isInsertEvent, versionNumber);
   }
 
   /**
    * Adds a version number to the in-flight events of this partition
-   * @param versionNumber version number to add
+   * @param isInsertEvent if true, add eventId to list of eventIds for in-flight Insert
+   * events if false, add version number to list of versions for in-flight DDL events
+   * @param versionNumber when isInsertEvent is true, it's eventId to add
+   *                      when isInsertEvent is false, it's version number to add
    */
-  public void addToVersionsForInflightEvents(long versionNumber) {
+  public void addToVersionsForInflightEvents(boolean isInsertEvent, long versionNumber) {
     Preconditions.checkState(table_.getLock().isHeldByCurrentThread(),
         "addToVersionsForInflightEvents called without holding the table lock on "
             + "partition " + getPartitionName() + " of table " + table_.getFullName());
-    if (!inFlightEvents_.add(versionNumber)) {
+    if (!inFlightEvents_.add(isInsertEvent, versionNumber)) {
       LOG.warn(String.format("Could not add %s version to the partition %s of table %s. "
           + "This could cause unnecessary refresh of the partition when the event is"
           + "received by the Events processor.", versionNumber, getPartitionName(),
@@ -886,13 +894,14 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
    */
   private void addInflightVersionsFromParameters() {
     Preconditions.checkNotNull(hmsParameters_);
-    Preconditions.checkState(inFlightEvents_.size() == 0);
+    Preconditions.checkState(inFlightEvents_.size(false) == 0);
     // we should not check for table lock being held here since there are certain code
     // paths which call this method without holding the table lock (eg. getOrLoadTable())
     if (!hmsParameters_.containsKey(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())) {
       return;
     }
-    inFlightEvents_.add(Long.parseLong(
+    inFlightEvents_.add(false,
+        Long.parseLong(
             hmsParameters_.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())));
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 0c6ac31..8f0f5ad 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -63,7 +63,6 @@ import org.apache.log4j.Logger;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 /**
@@ -801,14 +800,19 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
 
   /**
    * Removes a given version from the collection of version numbers for in-flight events
-   * @param versionNumber version number to remove from the collection
+   * @param isInsertEvent If true, remove eventId from list of eventIds for in-flight
+   * Insert events. If false, remove versionNumber from list of versions for in-flight DDL
+   * events
+   * @param versionNumber when isInsertEvent is true, it's eventId to remove
+   * when isInsertEvent is false, it's version number to remove
    * @return true if version was successfully removed, false if didn't exist
    */
-  public boolean removeFromVersionsForInflightEvents(long versionNumber) {
+  public boolean removeFromVersionsForInflightEvents(
+      boolean isInsertEvent, long versionNumber) {
     Preconditions.checkState(tableLock_.isHeldByCurrentThread(),
         "removeFromVersionsForInFlightEvents called without taking the table lock on "
             + getFullName());
-    return inFlightEvents.remove(versionNumber);
+    return inFlightEvents.remove(isInsertEvent, versionNumber);
   }
 
   /**
@@ -816,16 +820,19 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
    * collection is already at the max size defined by
    * <code>MAX_NUMBER_OF_INFLIGHT_EVENTS</code>, then it ignores the given version and
    * does not add it
-   * @param versionNumber version number to add
+   * @param isInsertEvent if true, add eventId to list of eventIds for in-flight Insert
+   * events. If false, add versionNumber to list of versions for in-flight DDL events
+   * @param versionNumber when isInsertEvent is true, it's eventId to add
+   * when isInsertEvent is false, it's version number to add
    * @return True if version number was added, false if the collection is at its max
    * capacity
    */
-  public void addToVersionsForInflightEvents(long versionNumber) {
+  public void addToVersionsForInflightEvents(boolean isInsertEvent, long versionNumber) {
     // we generally don't take locks on Incomplete tables since they are atomically
     // replaced during load
     Preconditions.checkState(
         this instanceof IncompleteTable || tableLock_.isHeldByCurrentThread());
-    if (!inFlightEvents.add(versionNumber)) {
+    if (!inFlightEvents.add(isInsertEvent, versionNumber)) {
       LOG.warn(String.format("Could not add %s version to the table %s. This could "
           + "cause unnecessary refresh of the table when the event is received by the "
               + "Events processor.", versionNumber, getFullName()));
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java
index 1aed6d1..11d38f6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java
@@ -34,41 +34,66 @@ public class InFlightEvents {
   // maximum number of catalog versions to store for in-flight events for this table
   private static final int DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS = 10;
 
+  // maximum number of eventIds to store for in-flight events for this table
+  private static final int DEFAULT_MAX_NUMBER_OF_INFLIGHT_INSERT_EVENTS = 100;
+
   private static final Logger LOG = LoggerFactory.getLogger(InFlightEvents.class);
-  // FIFO list of versions for all the in-flight metastore events in this table
+  // FIFO list of versions for all the in-flight metastore DDL events in this table
   // This queue can only grow up to MAX_NUMBER_OF_INFLIGHT_EVENTS size. Anything which
   // is attempted to be added to this list when its at maximum capacity is ignored
   private final LinkedList<Long> versionsForInflightEvents_ = new LinkedList<>();
 
+  // FIFO list of eventIds for all the in-flight metastore Insert events in this table
+  // This queue can only grow up to MAX_NUMBER_OF_INFLIGHT_INSERT_EVENTS size. Anything
+  // which is attempted to be added to this list when its at maximum capacity is ignored
+  private final LinkedList<Long> idsForInflightDmlEvents_ = new LinkedList<>();
+
   // maximum number of versions to store
-  private final int capacity_;
+  private final int capacity_for_versions_;
+
+  // maximum number of eventIds to store
+  private final int capacity_for_eventIds_;
 
   public InFlightEvents() {
-    this.capacity_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS;
+    this.capacity_for_versions_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS;
+    this.capacity_for_eventIds_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_INSERT_EVENTS;
   }
 
   public InFlightEvents(int capacity) {
     Preconditions.checkState(capacity > 0);
-    this.capacity_ = capacity;
+    this.capacity_for_versions_ = capacity;
+    this.capacity_for_eventIds_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_INSERT_EVENTS;
   }
-
   /**
    * Gets the current list of versions for in-flight events for this table
+   * @param isInsertEvent if true, return list of eventIds for in-flight Insert events
+   * if false, return list of versions for in-flight DDL events
    */
-  public List<Long> getAll() {
-    return ImmutableList.copyOf(versionsForInflightEvents_);
+  public List<Long> getAll(boolean isInsertEvent) {
+    if (isInsertEvent) {
+      return ImmutableList.copyOf(idsForInflightDmlEvents_);
+    } else {
+      return ImmutableList.copyOf(versionsForInflightEvents_);
+    }
   }
 
   /**
    * Removes a given version from the collection of version numbers for in-flight
    * events.
-   *
-   * @param versionNumber version number to remove from the collection
+   * @param isInsertEvent If true, remove eventId from list of eventIds for in-flight
+   * Insert events. If false, remove version number from list of versions for in-flight
+   * DDL events.
+   * @param versionNumber when isInsertEvent is true, it's eventId to remove
+   * when isInsertEvent is false, it's version number to remove
    * @return true if the version was found and successfully removed, false
    * otherwise
    */
-  public boolean remove(long versionNumber) {
-    return versionsForInflightEvents_.remove(versionNumber);
+  public boolean remove(boolean isInsertEvent, long versionNumber) {
+    if (isInsertEvent) {
+      return idsForInflightDmlEvents_.remove(versionNumber);
+    } else {
+      return versionsForInflightEvents_.remove(versionNumber);
+    }
   }
 
   /**
@@ -76,23 +101,45 @@ public class InFlightEvents {
    * collection is already at the max size defined by
    * <code>MAX_NUMBER_OF_INFLIGHT_EVENTS</code>, then it ignores the given version and
    * does not add it
-   *
-   * @param versionNumber version number to add
+   * @param isInsertEvent If true, add eventId to list of eventIds for in-flight Insert
+   * events. If false, add versionNumber to list of versions for in-flight DDL events.
+   * @param versionNumber when isInsertEvent is true, it's eventId to add
+   * when isInsertEvent is false, it's version number to add
    * @return True if version number was added, false if the collection is at its max
    * capacity
    */
-  public boolean add(long versionNumber) {
-    if (versionsForInflightEvents_.size() == capacity_) {
-      LOG.warn(String.format("Number of versions to be stored is at "
-              + " its max capacity %d. Ignoring add request for version number %d.",
-          DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber));
-      return false;
+  public boolean add(boolean isInsertEvent, long versionNumber) {
+    if (isInsertEvent) {
+      if (idsForInflightDmlEvents_.size() == capacity_for_eventIds_) {
+        LOG.warn(String.format("Number of Insert events to be stored is at "
+                + " its max capacity %d. Ignoring add request for eventId %d.",
+            DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber));
+        return false;
+      }
+      idsForInflightDmlEvents_.add(versionNumber);
+    } else {
+      if (versionsForInflightEvents_.size() == capacity_for_versions_) {
+        LOG.warn(String.format("Number of DDL events to be stored is at "
+                + "its max capacity %d. Ignoring add request for version %d.",
+            DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber));
+        return false;
+      }
+      versionsForInflightEvents_.add(versionNumber);
     }
-    versionsForInflightEvents_.add(versionNumber);
     return true;
   }
 
-  public int size() {
-    return versionsForInflightEvents_.size();
+  /**
+   * Get the size of in-flight DDL or DML events list
+   * @param isInsertEvent if true, return size of Insert events list
+   *              if false, return size of DDL events list
+   * @return size of events list
+   */
+  public int size(boolean isInsertEvent) {
+    if (isInsertEvent) {
+      return idsForInflightDmlEvents_.size();
+    } else {
+      return versionsForInflightEvents_.size();
+    }
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 2be56e9..e64578b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -269,7 +269,7 @@ public class MetastoreEvents {
     protected final String tblName_;
 
     // eventId of the event. Used instead of calling getter on event_ everytime
-    protected final long eventId_;
+    protected long eventId_;
 
     // eventType from the NotificationEvent
     protected final MetastoreEventType eventType_;
@@ -419,13 +419,15 @@ public class MetastoreEvents {
      * serviceId and version. More details on complete flow of self-event handling
      * logic can be read in <code>MetastoreEventsProcessor</code> documentation.
      *
+     * @param isInsertEvent if true, check in flight events list of Insert event
+     * if false, check events list of DDL
      * @return True if this event is a self-generated event. If the returned value is
      * true, this method also clears the version number from the catalog database/table.
      * Returns false if the version numbers or service id don't match
      */
-    protected boolean isSelfEvent() {
+    protected boolean isSelfEvent(boolean isInsertEvent) {
       try {
-        if (catalog_.evaluateSelfEvent(getSelfEventContext())) {
+        if (catalog_.evaluateSelfEvent(isInsertEvent, getSelfEventContext())) {
           metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS).inc();
           return true;
         }
@@ -434,6 +436,8 @@ public class MetastoreEvents {
       }
       return false;
     }
+
+    protected boolean isSelfEvent() { return isSelfEvent(false); }
   }
 
   public static String getStringProperty(
@@ -766,19 +770,25 @@ public class MetastoreEvents {
 
     @Override
     public SelfEventContext getSelfEventContext() {
-      throw new UnsupportedOperationException("Self-event evaluation is not implemented"
-          + " for insert event type");
+      if (insertPartition_ != null) {
+        // create selfEventContext for insert partition event
+        List<TPartitionKeyValue> tPartSpec =
+            getTPartitionSpecFromHmsPartition(msTbl_, insertPartition_);
+        return new SelfEventContext(dbName_, tblName_, Arrays.asList(tPartSpec),
+            insertPartition_.getParameters(), eventId_);
+      } else {
+        // create selfEventContext for insert table event
+        return new SelfEventContext(
+            dbName_, tblName_, null, msTbl_.getParameters(), eventId_);
+      }
     }
 
-    /**
-     * Currently we do not check for self-events in Inserts. Existing self-events logic
-     * cannot be used for insert events since firing insert event does not allow us to
-     * modify table parameters in HMS. Hence, we cannot get CatalogServiceIdentifiers in
-     * Insert Events.
-     * TODO: Handle self-events for insert case.
-     */
     @Override
     public void process() throws MetastoreNotificationException {
+      if (isSelfEvent(true)) {
+        infoLog("Not processing the event as it is a self-event");
+        return;
+      }
       // Reload the whole table if it's a transactional table.
       if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
         insertPartition_ = null;
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java b/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
index 83aa69b..10dd4eb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java
@@ -32,15 +32,21 @@ import org.apache.impala.thrift.TPartitionKeyValue;
 public class SelfEventContext {
   private final String dbName_;
   private final String tblName_;
+  private final long insertEventId_;
   // version number from the event object parameters used for self-event detection
   private final long versionNumberFromEvent_;
   // service id from the event object parameters used for self-event detection
-  private final String serviceIdFromEvent_;
+  private final String serviceidFromEvent_;
   private final List<List<TPartitionKeyValue>> partitionKeyValues_;
 
   SelfEventContext(String dbName, String tblName,
       Map<String, String> parameters) {
-    this(dbName, tblName, null, parameters);
+    this(dbName, tblName, null, parameters, -1);
+  }
+
+  SelfEventContext(String dbName, String tblName,
+      List<List<TPartitionKeyValue>> partitionKeyValues, Map<String, String> parameters) {
+    this(dbName, tblName, partitionKeyValues, parameters, -1);
   }
 
   /**
@@ -55,17 +61,17 @@ public class SelfEventContext {
    */
   SelfEventContext(String dbName, @Nullable String tblName,
       @Nullable List<List<TPartitionKeyValue>> partitionKeyValues,
-      Map<String, String> parameters) {
+      Map<String, String> parameters, long eventId) {
     Preconditions.checkNotNull(parameters);
     this.dbName_ = Preconditions.checkNotNull(dbName);
     this.tblName_ = tblName;
     this.partitionKeyValues_ = partitionKeyValues;
+    insertEventId_ = eventId;
     versionNumberFromEvent_ = Long.parseLong(
         MetastoreEvents.getStringProperty(parameters,
             MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1"));
-    serviceIdFromEvent_ =
-        MetastoreEvents.getStringProperty(parameters,
-            MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
+    serviceidFromEvent_ = MetastoreEvents.getStringProperty(
+        parameters, MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), "");
   }
 
   public String getDbName() {
@@ -76,13 +82,13 @@ public class SelfEventContext {
     return tblName_;
   }
 
+  public long getIdFromEvent() { return insertEventId_; }
+
   public long getVersionNumberFromEvent() {
     return versionNumberFromEvent_;
   }
 
-  public String getServiceIdFromEvent() {
-    return serviceIdFromEvent_;
-  }
+  public String getServiceIdFromEvent() { return serviceidFromEvent_; }
 
   public List<List<TPartitionKeyValue>> getPartitionKeyValues() {
     return partitionKeyValues_ == null ?
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index c3957b9..c62ac6f 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -126,6 +126,8 @@ public class BackendConfig {
     return backendCfg_.hms_event_polling_interval_s;
   }
 
+  public boolean isInsertEventsEnabled() { return backendCfg_.enable_insert_events; }
+
   public boolean isOrcScannerEnabled() {
     return backendCfg_.enable_orc_scanner;
   }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 63a7b81..3ac2a05 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -711,7 +711,7 @@ public class CatalogOpExecutor {
             // pre-existing there is no alter table event generated. Hence we should
             // only add the versions for in-flight events when we are sure that the
             // partition was really added.
-            catalog_.addVersionsForInflightEvents(tbl, newCatalogVersion);
+            catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
             addTableToCatalogUpdate(refreshedTable, response.result);
           }
           reloadMetadata = false;
@@ -851,7 +851,7 @@ public class CatalogOpExecutor {
             reloadTableSchema, null, "ALTER TABLE " + params.getAlter_type().name());
         // now that HMS alter operation has succeeded, add this version to list of
         // inflight events in catalog table if event processing is enabled
-        catalog_.addVersionsForInflightEvents(tbl, newCatalogVersion);
+        catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
         addTableToCatalogUpdate(tbl, response.result);
       }
     } finally {
@@ -3098,7 +3098,7 @@ public class CatalogOpExecutor {
           "'%s' and the new table name '%s' may fix the problem." , tableName.toString(),
           newTableName.toString()));
     }
-    catalog_.addVersionsForInflightEvents(result.second, newCatalogVersion);
+    catalog_.addVersionsForInflightEvents(false, result.second, newCatalogVersion);
     // TODO(todd): if client is a 'v2' impalad, only send back invalidation
     response.result.addToRemoved_catalog_objects(result.first.toMinimalTCatalogObject());
     response.result.addToUpdated_catalog_objects(result.second.toTCatalogObject());
@@ -3678,7 +3678,7 @@ public class CatalogOpExecutor {
     // catalog service identifiers
     if (catalog_.getCatalogServiceId().equals(serviceId)) {
       Preconditions.checkNotNull(version);
-      hdfsPartition.addToVersionsForInflightEvents(Long.parseLong(version));
+      hdfsPartition.addToVersionsForInflightEvents(false, Long.parseLong(version));
     }
   }
 
@@ -4430,7 +4430,7 @@ public class CatalogOpExecutor {
   }
 
   /**
-   * Populates insert event data and calls fireInsertEventAysnc() if external event
+   * Populates insert event data and calls fireInsertEvents() if external event
    * processing is enabled. This is no-op if event processing is disabled or there are
    * no existing partitions affected by this insert.
    *
@@ -4440,11 +4440,16 @@ public class CatalogOpExecutor {
    */
   private void createInsertEvents(Table table,
       List<FeFsPartition> affectedExistingPartitions, boolean isInsertOverwrite) {
-    if (!catalog_.isEventProcessingActive() ||
-        affectedExistingPartitions.size() == 0) return;
+    boolean isInsertEventsEnabled = BackendConfig.INSTANCE.isInsertEventsEnabled();
+    if (!catalog_.isEventProcessingActive() || !isInsertEventsEnabled
+        || affectedExistingPartitions.size() == 0) {
+      return;
+    }
 
     // List of all insert events that we call HMS fireInsertEvent() on.
     List<InsertEventInfo> insertEventInfos = new ArrayList<>();
+    // List of all partitions that we insert into
+    List<HdfsPartition> partitions = new ArrayList<>();
 
     // Map of partition names to file names of all existing partitions touched by the
     // insert.
@@ -4471,58 +4476,54 @@ public class CatalogOpExecutor {
       // Find the delta of the files added by the insert if it is not an overwrite
       // operation. HMS fireListenerEvent() expects an empty list if no new files are
       // added or if the operation is an insert overwrite.
+      HdfsPartition hdfsPartition = (HdfsPartition) part;
       Set<String> deltaFiles = new HashSet<>();
       List<String> partVals = null;
       if (!isInsertOverwrite) {
-        String partitionName = part.getPartitionName() + "/";
+        String partitionName = hdfsPartition.getPartitionName() + "/";
         Set<String> filesPostInsert =
             partitionFilesMapPostInsert.get(partitionName);
         if (table.getNumClusteringCols() > 0) {
           Set<String> filesBeforeInsert =
               partitionFilesMapBeforeInsert.get(partitionName);
           deltaFiles = Sets.difference(filesBeforeInsert, filesPostInsert);
-          partVals = part.getPartitionValuesAsStrings(true);
+          partVals = hdfsPartition.getPartitionValuesAsStrings(true);
         } else {
           Map.Entry<String, Set<String>> entry =
               partitionFilesMapBeforeInsert.entrySet().iterator().next();
           deltaFiles = Sets.difference(entry.getValue(), filesPostInsert);
         }
         LOG.info("{} new files detected for table {} partition {}.",
-            filesPostInsert.size(), table.getTableName(), part.getPartitionName());
+            filesPostInsert.size(), table.getTableName(),
+            hdfsPartition.getPartitionName());
       }
       if (deltaFiles != null || isInsertOverwrite) {
         // Collect all the insert events.
-        insertEventInfos.add(new InsertEventInfo(table.getDb().getName(),
-            table.getName(), partVals, deltaFiles, isInsertOverwrite));
+        insertEventInfos.add(
+            new InsertEventInfo(partVals, deltaFiles, isInsertOverwrite));
+        if (partVals != null) {
+          // insert into partition
+          partitions.add(hdfsPartition);
+        }
       } else {
         LOG.info("No new files were created, and is not a replace. Skipping "
             + "generating INSERT event.");
       }
     }
 
-    // Firing insert events by making calls to HMS APIs can be slow for tables with
-    // large number of partitions. Hence, we fire the insert events asynchronously.
-    fireInsertEventsAsync(insertEventInfos);
-  }
-
-  /**
-   * Helper method to fire insert events asynchronously. This creates a single thread
-   * to execute the fireInsertEvent method and shuts down the thread after it has
-   * finished. In case of any exception, we just log the failure of firing insert events.
-   */
-  private void fireInsertEventsAsync(List<InsertEventInfo> insertEventInfos) {
-    ExecutorService fireInsertEventThread = Executors.newSingleThreadExecutor();
-    CompletableFuture.runAsync(() -> {
-      for (InsertEventInfo info : insertEventInfos) {
-        try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
-          MetaStoreUtil.fireInsertEvent(metaStoreClient.getHiveClient(), info);
-        } catch (Exception e) {
-          LOG.error("Failed to fire insert event. Some tables might not be"
-              + " refreshed on other impala clusters.", e);
+    MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient();
+    List<Long> eventIds = MetastoreShim.fireInsertEvents(metaStoreClient,
+        insertEventInfos, table.getDb().getName(), table.getName());
+    if (!eventIds.isEmpty()) {
+      if (partitions.size() == 0) { // insert into table
+        catalog_.addVersionsForInflightEvents(true, table, eventIds.get(0));
+      } else { // insert into partition
+        for (int par_idx = 0; par_idx < partitions.size(); par_idx++) {
+          partitions.get(par_idx).addToVersionsForInflightEvents(
+              true, eventIds.get(par_idx));
         }
       }
-    }, Executors.newSingleThreadExecutor()).thenRun(() ->
-        fireInsertEventThread.shutdown());
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
index 7bc463f..81b62f6 100644
--- a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
@@ -30,9 +30,6 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.FireEventRequest;
-import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
-import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
@@ -317,13 +314,18 @@ public class MetaStoreUtil {
   }
 
   /**
+   * Check if the hms table is a bucketed table or not
+   */
+  public static boolean isBucketedTable(Table msTbl) {
+    Preconditions.checkNotNull(msTbl);
+    return msTbl.getSd().getNumBuckets() > 0;
+  }
+
+  /**
    * A helper class that encapsulates all the information needed to fire and insert event
    * with HMS.
    */
   public static class InsertEventInfo {
-    private String dbName;
-    private String tableName;
-
     // List of partition values corresponding to the partition keys in
     // a partitioned table. This is null for non-partitioned table.
     private List<String> partVals;
@@ -337,48 +339,15 @@ public class MetaStoreUtil {
     // false otherwise.
     private boolean isOverwrite;
 
-    public InsertEventInfo(String dbName, String tableName, List<String> partVals,
-        Collection<String> newFiles, boolean isOverwrite) {
-      this.dbName = dbName;
-      this.tableName = tableName;
+    public InsertEventInfo(
+        List<String> partVals, Collection<String> newFiles, boolean isOverwrite) {
       this.partVals = partVals;
       this.newFiles = newFiles;
       this.isOverwrite = isOverwrite;
     }
-  }
 
-  /**
-   *  Fires an insert event to HMS notification log. For partitioned table, each
-   *  existing partition touched by the insert will fire a separate insert event.
-   *
-   * @param msClient Metastore client,
-   * @param info A singe insert event encapsulating the information needed to fire insert
-   * event with HMS.
-   */
-  public static void fireInsertEvent(IMetaStoreClient msClient,
-      InsertEventInfo info) throws TException {
-    Preconditions.checkNotNull(msClient);
-    Preconditions.checkNotNull(info.dbName);
-    Preconditions.checkNotNull(info.tableName);
-    Preconditions.checkNotNull(info.newFiles);
-    LOG.debug("Firing an insert event for {}", info.tableName);
-    FireEventRequestData data = new FireEventRequestData();
-    InsertEventRequestData insertData = new InsertEventRequestData();
-    data.setInsertData(insertData);
-    FireEventRequest rqst = new FireEventRequest(true, data);
-    rqst.setDbName(info.dbName);
-    rqst.setTableName(info.tableName);
-    insertData.setFilesAdded(new ArrayList<>(info.newFiles));
-    insertData.setReplace(info.isOverwrite);
-    if (info.partVals != null) rqst.setPartitionVals(info.partVals);
-    msClient.fireListenerEvent(rqst);
-  }
-
-  /**
-   * Check if the hms table is a bucketed table or not
-   */
-  public static boolean isBucketedTable(Table msTbl) {
-    Preconditions.checkNotNull(msTbl);
-    return msTbl.getSd().getNumBuckets() > 0;
+    public List<String> getPartVals() { return this.partVals; }
+    public Collection<String> getNewFiles() { return this.newFiles; }
+    public boolean isOverwrite() { return this.isOverwrite; }
   }
 }
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 41995a8..ba4a0b5 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -90,6 +91,7 @@ import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.thrift.TAlterDbParams;
 import org.apache.impala.thrift.TAlterDbSetOwnerParams;
 import org.apache.impala.thrift.TAlterDbType;
@@ -107,6 +109,7 @@ import org.apache.impala.thrift.TAlterTableSetRowFormatParams;
 import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams;
 import org.apache.impala.thrift.TAlterTableType;
 import org.apache.impala.thrift.TAlterTableUpdateStatsParams;
+import org.apache.impala.thrift.TCatalogServiceRequestHeader;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TColumnType;
 import org.apache.impala.thrift.TCreateDbParams;
@@ -134,12 +137,14 @@ import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.thrift.TTypeNode;
 import org.apache.impala.thrift.TTypeNodeType;
 import org.apache.impala.thrift.TUniqueId;
+import org.apache.impala.thrift.TUpdateCatalogRequest;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.MetaStoreUtil.InsertEventInfo;
 import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -712,6 +717,67 @@ public class MetastoreEventsProcessorTest {
   }
 
   /**
+   * Test insert from impala. Insert into table and partition from impala
+   * should be treated as self-event.
+   */
+  @Test
+  public void testInsertFromImpala() throws ImpalaException {
+    Assume.assumeTrue("Skipping this test because it only works with Hive-3 or greater",
+        TestUtils.getHiveMajorVersion() >= 3);
+    // Test insert into multiple partitions
+    createDatabaseFromImpala(TEST_DB_NAME, null);
+    String tableToInsertPart = "tbl_with_mul_part";
+    createTableFromImpala(TEST_DB_NAME, tableToInsertPart, true);
+    String tableToInsertMulPart = "tbl_to_insert_mul_part";
+    createTableFromImpala(TEST_DB_NAME, tableToInsertMulPart, true);
+    // add first partition
+    TPartitionDef partitionDef = new TPartitionDef();
+    partitionDef.addToPartition_spec(new TPartitionKeyValue("p1", "1"));
+    partitionDef.addToPartition_spec(new TPartitionKeyValue("p2", "100"));
+    alterTableAddPartition(TEST_DB_NAME, tableToInsertPart, partitionDef);
+    alterTableAddPartition(TEST_DB_NAME, tableToInsertMulPart, partitionDef);
+    // add second partition
+    partitionDef = new TPartitionDef();
+    partitionDef.addToPartition_spec(new TPartitionKeyValue("p1", "1"));
+    partitionDef.addToPartition_spec(new TPartitionKeyValue("p2", "200"));
+    alterTableAddPartition(TEST_DB_NAME, tableToInsertPart, partitionDef);
+    alterTableAddPartition(TEST_DB_NAME, tableToInsertMulPart, partitionDef);
+    eventsProcessor_.processEvents();
+    // count self event from here, numberOfSelfEventsBefore=4 as we have 4 ADD PARTITION
+    // events
+    long numberOfSelfEventsBefore =
+        eventsProcessor_.getMetrics()
+            .getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS)
+            .getCount();
+    // insert into partition
+    insertFromImpala(tableToInsertPart, true, "1", "100");
+    insertFromImpala(tableToInsertPart, true, "1", "200");
+    // insert into multiple partition
+    Set<String> created_partitions = new HashSet<String>();
+    String partition1 = "p1=1/p2=100/";
+    String partition2 = "p1=1/p2=200/";
+    created_partitions.add(partition1);
+    created_partitions.add(partition2);
+    insertMulPartFromImpala(tableToInsertMulPart, tableToInsertPart, created_partitions);
+    eventsProcessor_.processEvents();
+
+    // Test insert into table
+    String tableToInsert = "tbl_to_insert";
+    createTableFromImpala(TEST_DB_NAME, tableToInsert, false);
+    insertFromImpala(tableToInsert, false, "", "");
+    eventsProcessor_.processEvents();
+
+    long selfEventsCountAfter =
+        eventsProcessor_.getMetrics()
+            .getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS)
+            .getCount();
+    // 2 single insert partition events, 1 multi insert partitions which includes 2 single
+    // insert events 1 single insert table event
+    assertEquals("Unexpected number of self-events generated",
+        numberOfSelfEventsBefore + 5, selfEventsCountAfter);
+  }
+
+  /**
    * Test generates a sequence of create_table, insert and drop_table in the event stream
    * to make sure when the insert event is processed on a removed table, it doesn't cause
    * any issues with the event processing.
@@ -835,9 +901,10 @@ public class MetastoreEventsProcessorTest {
     List <String> newFiles = addFilesToDirectory(parentPath, "testFile.",
         totalNumberOfFilesToAdd, isOverwrite);
     try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
-      MetaStoreUtil.fireInsertEvent(metaStoreClient.getHiveClient(),
-          new InsertEventInfo(msTbl.getDbName(), msTbl.getTableName(), null,
-          newFiles, isOverwrite));
+      List<InsertEventInfo> insertEventInfos = new ArrayList<>();
+      insertEventInfos.add(new InsertEventInfo(null, newFiles, isOverwrite));
+      MetastoreShim.fireInsertEventHelper(metaStoreClient.getHiveClient(),
+          insertEventInfos, msTbl.getDbName(), msTbl.getTableName());
     }
   }
 
@@ -2711,6 +2778,57 @@ public class MetastoreEventsProcessorTest {
     catalogOpExecutor_.execDdlRequest(req);
   }
 
+  /**
+   * Insert multiple partitions into table from Impala
+   */
+  private void insertMulPartFromImpala(String tblName1, String tblName2,
+      Set<String> created_partitions) throws ImpalaException {
+    String insert_mul_part = String.format(
+        "insert into table %s partition(p1, p2) select * from %s", tblName1, tblName2);
+    TUpdateCatalogRequest testInsertRequest = createTestTUpdateCatalogRequest(
+        TEST_DB_NAME, tblName1, insert_mul_part, created_partitions);
+    catalogOpExecutor_.updateCatalog(testInsertRequest);
+  }
+
+  /**
+   * Insert into table or partition from Impala
+   * @param tblName
+   * @param isPartitioned
+   * @return
+   */
+  private void insertFromImpala(String tblName, boolean isPartitioned, String p1val,
+      String p2val) throws ImpalaException {
+    String partition = String.format("partition (p1=%s, p2='%s')", p1val, p2val);
+    String test_insert_tbl = String.format("insert into table %s %s values ('a','aa') ",
+        tblName, isPartitioned ? partition : "");
+    Set<String> created_partitions = new HashSet<String>();
+    String created_part_str =
+        isPartitioned ? String.format("p1=%s/p2=%s/", p1val, p2val) : "";
+    created_partitions.add(created_part_str);
+    TUpdateCatalogRequest testInsertRequest = createTestTUpdateCatalogRequest(
+        TEST_DB_NAME, tblName, test_insert_tbl, created_partitions);
+    catalogOpExecutor_.updateCatalog(testInsertRequest);
+  }
+
+  /**
+   * Create DML request to Catalog
+   * @param dBName
+   * @param tableName
+   * @param redacted_sql_stmt
+   * @param created_partitions
+   * @return
+   */
+  private TUpdateCatalogRequest createTestTUpdateCatalogRequest(String dBName,
+      String tableName, String redacted_sql_stmt, Set<String> created_partitions) {
+    TUpdateCatalogRequest tUpdateCatalogRequest = new TUpdateCatalogRequest();
+    tUpdateCatalogRequest.setDb_name(dBName);
+    tUpdateCatalogRequest.setTarget_table(tableName);
+    tUpdateCatalogRequest.setCreated_partitions((created_partitions));
+    tUpdateCatalogRequest.setHeader(new TCatalogServiceRequestHeader());
+    tUpdateCatalogRequest.getHeader().setRedacted_sql_stmt(redacted_sql_stmt);
+    return tUpdateCatalogRequest;
+  }
+
   private TColumn getScalarColumn(String colName, TPrimitiveType type) {
     TTypeNode tTypeNode = new TTypeNode(TTypeNodeType.SCALAR);
     tTypeNode.setScalar_type(new TScalarType(type));
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index f39f55f..cdd1a48 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -21,6 +21,7 @@ import pytest
 
 from tests.common.skip import SkipIfHive2
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.environ import HIVE_MAJOR_VERSION
 from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
 from tests.util.hive_utils import HiveDbWrapper
 from tests.util.event_processor_utils import EventProcessorUtils
@@ -268,11 +269,6 @@ class TestEventProcessing(CustomClusterTestSuite):
           # add partition
           "alter table {0}.{1} add if not exists partition (year=1111, month=1)".format(
             db_name, tbl2),
-          # insert into a existing partition; generates ALTER_PARTITION
-          # TODO add support for insert_events (IMPALA-8632)
-          # "insert into table {0}.{1} partition (year, month) "
-          # "select * from functional.alltypessmall where year=2009 and month=1".format(
-          #  db_name, tbl2),
           # compute stats will generates ALTER_PARTITION
           "compute stats {0}.{1}".format(db_name, tbl2),
           "alter table {0}.{1} recover partitions".format(db_name, recover_tbl_name)],
@@ -295,6 +291,11 @@ class TestEventProcessing(CustomClusterTestSuite):
           "alter table {0}.{1} drop if exists partition (year=2100, month=1)".format(
             db_name, tbl_name)]
     }
+    if HIVE_MAJOR_VERSION >= 3:
+      # insert into a existing partition; generates INSERT events
+      self_event_test_queries[True].append("insert into table {0}.{1} partition "
+          "(year, month) select * from functional.alltypessmall where year=2009 "
+          "and month=1".format(db_name, tbl2))
     return self_event_test_queries
 
   def __get_hive_test_queries(self, db_name, recover_tbl_name):


[impala] 01/02: IMPALA-9602: Fix case-sensitivity for local catalog

Posted by vi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vihangk1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0f85cbd51130bb4ec3e7db9bdeb10e4ffe3470ae
Author: Aman Sinha <am...@cloudera.com>
AuthorDate: Sun Apr 5 11:58:35 2020 -0700

    IMPALA-9602: Fix case-sensitivity for local catalog
    
    This patch makes the database and table names lower-case
    when doing lookups, insertion and invalidations of database
    objects or table objects in the local catalog cache. The
    remote catalog already does the right thing by lower-casing
    these names, so this patch makes the behavior consistent with
    what the remote catalog does.
    
    Testing:
     - Added unit tests for CatalogdMetaProvider by examining
       cache hits and misses when loading and invalidating
       database or tables with upper-case names.
     - Manually tested as follows:
       start Impala with local catalog enabled
        start-impala-cluster.py
         --catalogd_args="--catalog_topic_mode=minimal"
         --impalad_args="--use_local_catalog=true"
       Create database in lower-case: "CREATE DATABASE db1;"
       Run the following a few times (this errors without the patch):
       impala-shell.sh -q "DROP TABLE IF EXISTS DB1.ddl_test1 PURGE;
                      CREATE TABLE DB1.ddl_test1 (val string);"
    
    Change-Id: I3f368fa9b50e22ec5057d0bf66c3fd51064d4c26
    Reviewed-on: http://gerrit.cloudera.org:8080/15653
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/local/CatalogdMetaProvider.java | 14 +++--
 .../catalog/local/CatalogdMetaProviderTest.java    | 59 ++++++++++++++++++++++
 2 files changed, 65 insertions(+), 8 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 618c2e9..2c8ece4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -622,7 +622,7 @@ public class CatalogdMetaProvider implements MetaProvider {
   public Database loadDb(final String dbName) throws TException {
     return loadWithCaching("database metadata for " + dbName,
         DB_METADATA_STATS_CATEGORY,
-        new DbCacheKey(dbName, DbCacheKey.DbInfoType.HMS_METADATA),
+        new DbCacheKey(dbName.toLowerCase(), DbCacheKey.DbInfoType.HMS_METADATA),
         new Callable<Database>() {
           @Override
           public Database call() throws Exception {
@@ -641,7 +641,7 @@ public class CatalogdMetaProvider implements MetaProvider {
       throws MetaException, UnknownDBException, TException {
     return loadWithCaching("table names for database " + dbName,
         TABLE_NAMES_STATS_CATEGORY,
-        new DbCacheKey(dbName, DbCacheKey.DbInfoType.TABLE_NAMES),
+        new DbCacheKey(dbName.toLowerCase(), DbCacheKey.DbInfoType.TABLE_NAMES),
         new Callable<ImmutableList<String>>() {
           @Override
           public ImmutableList<String> call() throws Exception {
@@ -678,8 +678,8 @@ public class CatalogdMetaProvider implements MetaProvider {
   @Override
   public Pair<Table, TableMetaRef> loadTable(final String dbName, final String tableName)
       throws NoSuchObjectException, MetaException, TException {
-    // TODO(todd) need to lower case?
-    TableCacheKey cacheKey = new TableCacheKey(dbName, tableName);
+    TableCacheKey cacheKey = new TableCacheKey(dbName.toLowerCase(),
+        tableName.toLowerCase());
     TableMetaRefImpl ref = loadWithCaching(
         "table metadata for " + dbName + "." + tableName,
         TABLE_METADATA_CACHE_CATEGORY,
@@ -1280,9 +1280,8 @@ public class CatalogdMetaProvider implements MetaProvider {
    */
   private void invalidateCacheForDb(String dbName, Iterable<DbCacheKey.DbInfoType> types,
       List<String> invalidated) {
-    // TODO(todd) check whether we need to lower-case/canonicalize dbName?
     for (DbCacheKey.DbInfoType type: types) {
-      DbCacheKey key = new DbCacheKey(dbName, type);
+      DbCacheKey key = new DbCacheKey(dbName.toLowerCase(), type);
       if (cache_.asMap().remove(key) != null) {
         invalidated.add(type + " for DB " + dbName);
       }
@@ -1295,8 +1294,7 @@ public class CatalogdMetaProvider implements MetaProvider {
    */
   private void invalidateCacheForTable(String dbName, String tblName,
       List<String> invalidated) {
-    // TODO(todd) check whether we need to lower-case/canonicalize dbName and tblName?
-    TableCacheKey key = new TableCacheKey(dbName, tblName);
+    TableCacheKey key = new TableCacheKey(dbName.toLowerCase(), tblName.toLowerCase());
     if (cache_.asMap().remove(key) != null) {
       invalidated.add("table " + dbName + "." + tblName);
     }
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
index cb6c5a3..17b84ae 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
@@ -44,6 +44,8 @@ import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TDatabase;
+import org.apache.impala.thrift.TFunction;
+import org.apache.impala.thrift.TFunctionName;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TRuntimeProfileNode;
 import org.apache.impala.thrift.TTable;
@@ -333,4 +335,61 @@ public class CatalogdMetaProviderTest {
     }
   }
 
+  // Test loading and invalidation of databases, tables with upper case
+  // names. Expected behavior is the local catalog should treat these
+  // names as case-insensitive.
+  @Test
+  public void testInvalidateObjectsCaseInsensitive() throws Exception {
+    provider_.loadDb("tpch");
+    provider_.loadTable("tpch", "nation");
+
+    testInvalidateDb("TPCH");
+    testInvalidateTable("TPCH", "nation");
+    testInvalidateTable("tpch", "NATION");
+  }
+
+  private void testInvalidateTable(String dbName, String tblName) throws Exception {
+    CacheStats stats = diffStats();
+
+    provider_.loadTable(dbName, tblName);
+
+    // should get a cache hit since dbName,tblName should be treated as case-insensitive
+    stats = diffStats();
+    assertEquals(1, stats.hitCount());
+    assertEquals(0, stats.missCount());
+
+    // Invalidate it.
+    TCatalogObject obj = new TCatalogObject(TCatalogObjectType.TABLE, 0);
+    obj.setTable(new TTable(dbName, tblName));
+    provider_.invalidateCacheForObject(obj);
+
+    // should get a cache miss if we reload it
+    provider_.loadTable(dbName, tblName);
+    stats = diffStats();
+    assertEquals(0, stats.hitCount());
+    assertEquals(1, stats.missCount());
+  }
+
+  private void testInvalidateDb(String dbName) throws Exception {
+    CacheStats stats = diffStats();
+
+    provider_.loadDb(dbName);
+
+    // should get a cache hit since dbName should be treated as case-insensitive
+    stats = diffStats();
+    assertEquals(1, stats.hitCount());
+    assertEquals(0, stats.missCount());
+
+    // Invalidate it.
+    TCatalogObject obj = new TCatalogObject(TCatalogObjectType.DATABASE, 0);
+    obj.setDb(new TDatabase(dbName));
+    provider_.invalidateCacheForObject(obj);
+
+    // should get a cache miss if we reload it
+    provider_.loadDb(dbName);
+    stats = diffStats();
+    assertEquals(0, stats.hitCount());
+    assertEquals(1, stats.missCount());
+  }
+
 }