You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by as...@apache.org on 2021/11/23 05:22:57 UTC

[impala] branch master updated (a690b74 -> ee03727)

This is an automated email from the ASF dual-hosted git repository.

asherman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from a690b74  IMPALA-11020: Reattach STDOUT/STDERR independently.
     new 097b101  IMPALA-10923: Fine grained table refreshing at partition level events for transactional tables
     new c3f2ecc  IMPALA-11031: Listmap.getIndex() name is misleading
     new ee03727  IMPALA-11025: Transactional tables should use /test-warehouse/managed/databasename.db

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/catalog/catalog-server.cc                   |   5 +
 be/src/util/backend-gflag-util.cc                  |   3 +
 common/thrift/BackendGflags.thrift                 |   2 +
 .../java/org/apache/impala/catalog/Catalog.java    |  47 +++
 .../impala/catalog/CatalogServiceCatalog.java      |  94 +++++-
 .../org/apache/impala/catalog/HdfsPartition.java   |   6 +-
 .../catalog/HdfsPartitionLocationCompressor.java   |   2 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  | 104 ++++++-
 .../main/java/org/apache/impala/catalog/Table.java |   2 +-
 .../org/apache/impala/catalog/TableWriteId.java    |  71 +++++
 .../impala/catalog/events/MetastoreEvents.java     | 335 +++++++++++++++++++--
 .../hive/common/MutableValidReaderWriteIdList.java |  36 ++-
 .../hive/common/MutableValidWriteIdList.java       |  18 +-
 .../apache/impala/planner/DataSourceScanNode.java  |   2 +-
 .../org/apache/impala/planner/HBaseScanNode.java   |   3 +-
 .../org/apache/impala/planner/HdfsScanNode.java    |   2 +-
 .../org/apache/impala/planner/KuduScanNode.java    |   2 +-
 .../org/apache/impala/service/BackendConfig.java   |   4 +
 .../apache/impala/service/CatalogOpExecutor.java   | 181 ++++++++++-
 .../main/java/org/apache/impala/util/ListMap.java  |   4 +-
 .../impala/catalog/CatalogTableWriteIdTest.java    |  74 +++++
 .../org/apache/impala/catalog/CatalogTest.java     |   1 -
 .../impala/catalog/FileMetadataLoaderTest.java     |   1 +
 .../events/MetastoreEventsProcessorTest.java       | 257 ++++++++++++++--
 .../common/MutableValidReaderWriteIdListTest.java  |  66 +++-
 testdata/bin/generate-schema-statements.py         |   5 +-
 .../functional-query/queries/QueryTest/acid.test   |   2 +-
 tests/metadata/test_ddl.py                         |   3 +-
 28 files changed, 1234 insertions(+), 98 deletions(-)
 create mode 100644 fe/src/main/java/org/apache/impala/catalog/TableWriteId.java
 create mode 100644 fe/src/test/java/org/apache/impala/catalog/CatalogTableWriteIdTest.java

[impala] 02/03: IMPALA-11031: Listmap.getIndex() name is misleading

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c3f2ecc95f217ca9b1c475c44f524bb2fbab5545
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Fri Nov 19 17:32:56 2021 +0100

    IMPALA-11031: Listmap.getIndex() name is misleading
    
    Listmap.getIndex(t) modifies the ListMap object when there is
    no mapping for 't'. Hence the name of it is very misleading as
    the reader wouldn't expect modifications from simple getters.
    
    This patch renames it to getOrAddIndex().
    
    Change-Id: I689dfb67e1a9104812489d6299ed43446d2fcae8
    Reviewed-on: http://gerrit.cloudera.org:8080/18042
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java       | 6 +++---
 .../org/apache/impala/catalog/HdfsPartitionLocationCompressor.java  | 2 +-
 fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java  | 2 +-
 fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java       | 3 ++-
 fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java        | 2 +-
 fe/src/main/java/org/apache/impala/planner/KuduScanNode.java        | 2 +-
 fe/src/main/java/org/apache/impala/util/ListMap.java                | 4 ++--
 7 files changed, 11 insertions(+), 10 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 25f8aea..c92f5ef 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -141,7 +141,7 @@ public class HdfsPartition extends CatalogObjectImpl
           int origHostIdx = FileBlock.getReplicaHostIdx(it, j);
           boolean isCached = FileBlock.isReplicaCached(it, j);
           TNetworkAddress origHost = origIndex.get(origHostIdx);
-          int newHostIdx = dstIndex.getIndex(origHost);
+          int newHostIdx = dstIndex.getOrAddIndex(origHost);
           it.mutateReplicaHostIdxs(j, FileBlock.makeReplicaIdx(isCached, newHostIdx));
         }
       }
@@ -178,7 +178,7 @@ public class HdfsPartition extends CatalogObjectImpl
         if (isEc) {
           fbFileBlockOffsets[blockIdx++] = FileBlock.createFbFileBlock(fbb,
               loc.getOffset(), loc.getLength(),
-              (short) hostIndex.getIndex(REMOTE_NETWORK_ADDRESS));
+              (short) hostIndex.getOrAddIndex(REMOTE_NETWORK_ADDRESS));
         } else {
           fbFileBlockOffsets[blockIdx++] =
               FileBlock.createFbFileBlock(fbb, loc, hostIndex, numUnknownDiskIds);
@@ -399,7 +399,7 @@ public class HdfsPartition extends CatalogObjectImpl
       // map it to the corresponding hostname from getHosts().
       for (int i = 0; i < loc.getNames().length; ++i) {
         TNetworkAddress networkAddress = BlockReplica.parseLocation(loc.getNames()[i]);
-        short replicaIdx = (short) hostIndex.getIndex(networkAddress);
+        short replicaIdx = (short) hostIndex.getOrAddIndex(networkAddress);
         boolean isReplicaCached = cachedHosts.contains(loc.getHosts()[i]);
         replicaIdx = makeReplicaIdx(isReplicaCached, replicaIdx);
         fbb.addShort(replicaIdx);
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java
index 2ba6414..f4f9bf3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java
@@ -75,7 +75,7 @@ public class HdfsPartitionLocationCompressor {
   // Compress a location prefix, adding it to the bidirectional map (indexToPrefix_,
   // prefixToIndex_) if it is not already present.
   private int prefixToIndex(String s) {
-    return prefixMap_.getIndex(s);
+    return prefixMap_.getOrAddIndex(s);
   }
 
   // A surrogate for THdfsPartitionLocation, which represents a partition's location
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
index 1860baa..c89b276 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
@@ -326,7 +326,7 @@ public class DataSourceScanNode extends ScanNode {
   private void computeScanRangeLocations(Analyzer analyzer) {
     // TODO: Does the port matter?
     TNetworkAddress networkAddress = addressToTNetworkAddress("localhost:12345");
-    Integer hostIndex = analyzer.getHostIndex().getIndex(networkAddress);
+    Integer hostIndex = analyzer.getHostIndex().getOrAddIndex(networkAddress);
     scanRangeSpecs_ = new TScanRangeSpec();
     scanRangeSpecs_.addToConcrete_ranges(new TScanRangeLocationList(
         new TScanRange(), Lists.newArrayList(new TScanRangeLocation(hostIndex))));
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
index 999c5bf..17f570f 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
@@ -497,7 +497,8 @@ public class HBaseScanNode extends ScanNode {
           TScanRangeLocationList scanRangeLocation = new TScanRangeLocationList();
           TNetworkAddress networkAddress = addressToTNetworkAddress(locEntry.getKey());
           scanRangeLocation.addToLocations(
-              new TScanRangeLocation(analyzer.getHostIndex().getIndex(networkAddress)));
+              new TScanRangeLocation(
+                  analyzer.getHostIndex().getOrAddIndex(networkAddress)));
 
           TScanRange scanRange = new TScanRange();
           scanRange.setHbase_key_range(keyRange);
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index b9e84d5..b84ce7f 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1314,7 +1314,7 @@ public class HdfsScanNode extends ScanNode {
             partition.getHostIndex().getEntry(replicaHostIdx);
         Preconditions.checkNotNull(networkAddress);
         // Translate from network address to the global (to this request) host index.
-        Integer globalHostIdx = analyzer.getHostIndex().getIndex(networkAddress);
+        Integer globalHostIdx = analyzer.getHostIndex().getOrAddIndex(networkAddress);
         location.setHost_idx(globalHostIdx);
         if (fsHasBlocks && !fileDesc.getIsEc() && FileBlock.getDiskId(block, j) == -1) {
           ++numScanRangesNoDiskIds_;
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 3eb881a..dbb49a7 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -246,7 +246,7 @@ public class KuduScanNode extends ScanNode {
         TNetworkAddress address =
             new TNetworkAddress(replica.getRpcHost(), replica.getRpcPort());
         // Use the network address to look up the host in the global list
-        Integer hostIndex = analyzer.getHostIndex().getIndex(address);
+        Integer hostIndex = analyzer.getHostIndex().getOrAddIndex(address);
         locations.add(new TScanRangeLocation(hostIndex));
         hostIndexSet_.add(hostIndex);
       }
diff --git a/fe/src/main/java/org/apache/impala/util/ListMap.java b/fe/src/main/java/org/apache/impala/util/ListMap.java
index db3942b..228a2a6 100644
--- a/fe/src/main/java/org/apache/impala/util/ListMap.java
+++ b/fe/src/main/java/org/apache/impala/util/ListMap.java
@@ -53,7 +53,7 @@ public class ListMap<T> {
    * Map from T t to Integer index. If the mapping from t doesn't
    * exist, then create a new mapping from t to a unique index.
    */
-  public int getIndex(T t) {
+  public int getOrAddIndex(T t) {
     Integer index = map_.get(t);
     if (index != null) return index;
     // No match was found, add a new entry.
@@ -70,7 +70,7 @@ public class ListMap<T> {
   }
 
   /**
-   * Populate the bi-map from the given list.  Does not perform a copy
+   * Populate the bi-map from the given list. Does not perform a copy
    * of the list.
    */
   public synchronized void populate(List<T> list) {

[impala] 03/03: IMPALA-11025: Transactional tables should use /test-warehouse/managed/databasename.db

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ee03727971f379fe8fb3161387783ce00a9f6b9d
Author: Andrew Sherman <as...@cloudera.com>
AuthorDate: Fri Nov 19 10:33:02 2021 -0800

    IMPALA-11025: Transactional tables should use /test-warehouse/managed/databasename.db
    
    Recent Hive releases seem to be enforcing that data for a managed table
    is stored under the hive.metastore.warehouse.dir path property in a
    folder path similar to databasename.db/tablename  - see
    https://cwiki.apache.org/confluence/display/Hive/Managed+vs.+External+Tables
    Use this form /test-warehouse/managed/databasename.db in
    generate-schema-statements.py when creating transactional tables.
    
    Testing:
    - A few small changes to tests that verify filesystem changes for acid
      tables.
    - Exhaustive tests pass.
    
    Change-Id: Ib870ca802c9fa180e6be7a6f65bef35b227772db
    Reviewed-on: http://gerrit.cloudera.org:8080/18046
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java  | 1 +
 testdata/bin/generate-schema-statements.py                           | 5 +++--
 testdata/workloads/functional-query/queries/QueryTest/acid.test      | 2 +-
 tests/metadata/test_ddl.py                                           | 3 ++-
 4 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
index 5fd2f7f..51070e2 100644
--- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
@@ -152,6 +152,7 @@ public class FileMetadataLoaderTest {
     ValidWriteIdList writeIds = MetastoreShim.getValidWriteIdListFromString(
         "functional_orc_def.complextypestbl_minor_compacted:10:10::");
     Path tablePath = new Path("hdfs://localhost:20500/test-warehouse/managed/" +
+                              "functional_orc_def.db/" +
                               "complextypestbl_minor_compacted_orc_def/");
     FileMetadataLoader fml = new FileMetadataLoader(tablePath, /* recursive=*/true,
         /* oldFds = */ Collections.emptyList(), hostIndex, new ValidReadTxnList(""),
diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py
index 72c39f0..d0d715d 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -698,7 +698,8 @@ def generate_statements(output_name, test_vectors, sections,
         hdfs_location = hdfs_location.split('.')[-1]
       # Transactional tables need to be put under the 'managed' directory.
       if is_transactional(tblproperties):
-        hdfs_location = os.path.join('managed', hdfs_location)
+        db_location = '{0}{1}.db'.format(db_name, db_suffix)
+        hdfs_location = os.path.join('managed', db_location, hdfs_location)
       data_path = os.path.join(options.hive_warehouse_dir, hdfs_location)
 
       output = impala_create
@@ -775,7 +776,7 @@ def generate_statements(output_name, test_vectors, sections,
       if not force_reload and hdfs_location in existing_tables:
         print 'HDFS path:', data_path, 'contains data. Data loading can be skipped.'
       else:
-        print 'HDFS path:', data_path, 'does not exists or is empty. Data will be loaded.'
+        print 'HDFS path:', data_path, 'does not exist or is empty. Data will be loaded.'
         if not db_suffix:
           if load:
             hive_output.load_base.append(build_load_statement(load, db_name,
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid.test b/testdata/workloads/functional-query/queries/QueryTest/acid.test
index 9d912c4..1b27ee4 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/acid.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid.test
@@ -117,7 +117,7 @@ show files in functional_orc_def.complextypestbl_minor_compacted;
 ---- LABELS
 Path,Size,Partition
 ---- RESULTS
-row_regex:'$NAMENODE/test-warehouse/managed/complextypestbl_minor_compacted_orc_def/delta_0000001_0000008_v\d+/bucket_00000','.+KB',''
+row_regex:'$NAMENODE/test-warehouse/managed/functional_orc_def.db/complextypestbl_minor_compacted_orc_def/delta_0000001_0000008_v\d+/bucket_00000','.+KB',''
 ---- TYPES
 STRING,STRING,STRING
 ====
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index 739ba4f..34c2f76 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -308,7 +308,8 @@ class TestDdlStatements(TestDdlBase):
   @SkipIfGCS.hive
   @UniqueDatabase.parametrize(sync_ddl=True)
   def test_create_table_like_file_orc(self, vector, unique_database):
-    COMPLEXTYPETBL_PATH = 'test-warehouse/managed/complextypestbl_orc_def/'
+    COMPLEXTYPETBL_PATH = 'test-warehouse/managed/functional_orc_def.db/' \
+                          'complextypestbl_orc_def/'
     base_dir = filter(lambda s: s.startswith('base'),
       self.filesystem_client.ls(COMPLEXTYPETBL_PATH))[0]
     bucket_file = filter(lambda s: s.startswith('bucket'),

[impala] 01/03: IMPALA-10923: Fine grained table refreshing at partition level events for transactional tables

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 097b10104f23e0927d5b21b43a79f6cc10425f59
Author: Yu-Wen Lai <yu...@cloudera.com>
AuthorDate: Tue Aug 3 22:18:05 2021 -0700

    IMPALA-10923: Fine grained table refreshing at partition level events
    for transactional tables
    
    To enable fine-grained table refreshing, there are three main changes
    in this commit.
    1. Maintain validWriteIdList in Catalogd for transactional tables. We
      will keep track of write id changes for partitioned tables by
      AllocWriteIdEvents, CommitTxnEvents, and AbortTxnEvents.
    2. Conduct partition level refreshing for transactional tables'
      addPartitionEvents, dropPartitionEvents, and AlterPartitionEvents.
    3. Introduce a config
      hms_event_incremental_refresh_transactional_table, which can switch
      on/off the fine-grained table refreshing.
    
    Performance Tests:
    A simple test was performed by running insert into one partition for
    a partitioned ACID table(50,000 partitions). Below are the time taken
    to refresh this table by the event.
    
    Storage                Before              After
    =============================================================
    S3                     50 secs             50 msecs
    local                  3 secs              3 msecs
    
    Change-Id: I6ba07c9a338a25614690e314335ee4b801486da9
    Reviewed-on: http://gerrit.cloudera.org:8080/17858
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Sourabh Goyal <so...@cloudera.com>
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   |   5 +
 be/src/util/backend-gflag-util.cc                  |   3 +
 common/thrift/BackendGflags.thrift                 |   2 +
 .../java/org/apache/impala/catalog/Catalog.java    |  47 +++
 .../impala/catalog/CatalogServiceCatalog.java      |  94 +++++-
 .../java/org/apache/impala/catalog/HdfsTable.java  | 104 ++++++-
 .../main/java/org/apache/impala/catalog/Table.java |   2 +-
 .../org/apache/impala/catalog/TableWriteId.java    |  71 +++++
 .../impala/catalog/events/MetastoreEvents.java     | 335 +++++++++++++++++++--
 .../hive/common/MutableValidReaderWriteIdList.java |  36 ++-
 .../hive/common/MutableValidWriteIdList.java       |  18 +-
 .../org/apache/impala/service/BackendConfig.java   |   4 +
 .../apache/impala/service/CatalogOpExecutor.java   | 181 ++++++++++-
 .../impala/catalog/CatalogTableWriteIdTest.java    |  74 +++++
 .../org/apache/impala/catalog/CatalogTest.java     |   1 -
 .../events/MetastoreEventsProcessorTest.java       | 257 ++++++++++++++--
 .../common/MutableValidReaderWriteIdListTest.java  |  66 +++-
 17 files changed, 1216 insertions(+), 84 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 38960ca..de8f4a8 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -113,6 +113,11 @@ DEFINE_bool(invalidate_hms_cache_on_ddls, true, "This configuration is used "
     "for non transactional tables if alter/create/delete table hms apis are "
      "invoked over catalogd's metastore endpoint");
 
+DEFINE_bool(hms_event_incremental_refresh_transactional_table, true, "When set to true "
+    "events processor will refresh transactional tables incrementally for partition "
+    "level events. Otherwise, it will always reload the whole table for transactional "
+    "tables.");
+
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_subscriber_port);
 DECLARE_int32(state_store_port);
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 4edf301..1cc089d 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -97,6 +97,7 @@ DECLARE_bool(fallback_to_hms_on_errors);
 DECLARE_bool(enable_catalogd_hms_cache);
 DECLARE_string(kudu_sasl_protocol_name);
 DECLARE_bool(invalidate_hms_cache_on_ddls);
+DECLARE_bool(hms_event_incremental_refresh_transactional_table);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -313,6 +314,8 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_invalidate_hms_cache_on_ddls(FLAGS_invalidate_hms_cache_on_ddls);
   cfg.__set_startup_filesystem_check_directories(
       FLAGS_startup_filesystem_check_directories);
+  cfg.__set_hms_event_incremental_refresh_transactional_table(
+      FLAGS_hms_event_incremental_refresh_transactional_table);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 181bb70..88017f6 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -215,4 +215,6 @@ struct TBackendGflags {
   95: required bool invalidate_hms_cache_on_ddls
 
   96: required string startup_filesystem_check_directories
+
+  97: required bool hms_event_incremental_refresh_transactional_table
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index c662c78..031dfe4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -21,8 +21,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -91,6 +93,15 @@ public abstract class Catalog implements AutoCloseable {
   // Cache of data sources.
   protected final CatalogObjectCache<DataSource> dataSources_;
 
+  // Cache of transaction to write id mapping for the open transactions which are detected
+  // by the catalogd via events processor. The entries get cleaned up on receiving commit
+  // transaction or abort transaction events.
+  // We need this mapping because not all the write ids for a commit event can be
+  // retrieved from HMS. We can fetch write id for write events via getAllWriteEventInfo.
+  // However, we don't have API to fetch write ids for DDL events.
+  protected final ConcurrentHashMap<Long, Set<TableWriteId>> txnToWriteIds_ =
+      new ConcurrentHashMap<>();
+
   // Cache of known HDFS cache pools. Allows for checking the existence
   // of pools without hitting HDFS.
   protected final CatalogObjectCache<HdfsCachePool> hdfsCachePools_ =
@@ -758,4 +769,40 @@ public abstract class Catalog implements AutoCloseable {
       MetastoreShim.releaseLock(client.getHiveClient(), lockId);
     }
   }
+
+  /**
+   * Returns write ids for an open txn from the Catalog. If there is no write id
+   * associated with the txnId, it returns empty set.
+   */
+  public Set<TableWriteId> getWriteIds(Long txnId) {
+    Preconditions.checkNotNull(txnId);
+    return Collections.unmodifiableSet(txnToWriteIds_.getOrDefault(txnId,
+        Collections.emptySet()));
+  }
+
+  /**
+   * Adds a mapping from txnId to tableWriteId to the Catalog.
+   */
+  public void addWriteId(Long txnId, TableWriteId tableWriteId) {
+    Preconditions.checkNotNull(txnId);
+    Preconditions.checkNotNull(tableWriteId);
+    txnToWriteIds_.computeIfAbsent(txnId, k -> new HashSet<>()).add(tableWriteId);
+  }
+
+  /**
+   * Removes and returns all write id records for a transaction. If there is no write id
+   * associated with the txnId, it returns empty set.
+   */
+  public Set<TableWriteId> removeWriteIds(Long txnId) {
+    Preconditions.checkNotNull(txnId);
+    Set<TableWriteId> resultSet = txnToWriteIds_.remove(txnId);
+    return resultSet != null ? resultSet : Collections.emptySet();
+  }
+
+  /**
+   * Clears all write id records.
+   */
+  public void clearWriteIds() {
+    txnToWriteIds_.clear();
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 942698c..ae5d363 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -20,7 +20,6 @@ package org.apache.impala.catalog;
 import static org.apache.impala.thrift.TCatalogObjectType.HDFS_PARTITION;
 import static org.apache.impala.thrift.TCatalogObjectType.TABLE;
 
-import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -75,6 +74,7 @@ import org.apache.impala.common.Pair;
 import org.apache.impala.common.Reference;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.hive.common.MutableValidWriteIdList;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.CatalogLookupStatus;
@@ -1957,6 +1957,9 @@ public class CatalogServiceCatalog extends Catalog {
       // bumped. Don't need to update it in this case.
       if (lastResetStartVersion_ < startVersion) lastResetStartVersion_ = startVersion;
       versionLock_.writeLock().unlock();
+      // clear all txn to write ids mapping so that there is no memory leak for previous
+      // events
+      clearWriteIds();
       // restart the event processing for id just before the reset
       metastoreEventProcessor_.start(currentEventId);
     }
@@ -2401,7 +2404,7 @@ public class CatalogServiceCatalog extends Catalog {
       return tbl.toTCatalogObject(resultType);
     } finally {
       context.stop();
-      Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
+      UnlockWriteLockIfErroneouslyLocked();
       tbl.releaseWriteLock();
     }
   }
@@ -2573,11 +2576,16 @@ public class CatalogServiceCatalog extends Catalog {
    * Refresh table if exists. Returns true if reloadTable() succeeds, false
    * otherwise.
    */
-  public boolean reloadTableIfExists(String dbName, String tblName, String reason)
-      throws CatalogException {
+  public boolean reloadTableIfExists(String dbName, String tblName, long eventId,
+      String reason) throws CatalogException {
     try {
       Table table = getTable(dbName, tblName);
       if (table == null || table instanceof IncompleteTable) return false;
+      if (eventId > 0 && eventId <= table.getCreateEventId()) {
+        LOG.debug("Not reloading the table {}.{} for event {} since it is recreated at "
+            + "event {}.", dbName, tblName, eventId, table.getCreateEventId());
+        return false;
+      }
       reloadTable(table, reason);
     } catch (DatabaseNotFoundException | TableLoadingException e) {
       LOG.info(String.format("Reload table if exists failed with: %s", e.getMessage()));
@@ -2922,7 +2930,7 @@ public class CatalogServiceCatalog extends Catalog {
       return reloadHdfsPartition(hdfsTable, partitionName, wasPartitionReloaded,
           resultType, reason, newCatalogVersion, hdfsPartition);
     } finally {
-      Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
+      UnlockWriteLockIfErroneouslyLocked();
       tbl.releaseWriteLock();
     }
   }
@@ -2968,7 +2976,7 @@ public class CatalogServiceCatalog extends Catalog {
       // note that hdfsPartition can be null here which is a valid input argument
       // in such a case a new hdfsPartition is added and nothing is removed.
       hmsPartToHdfsPart.put(hmsPartition, hdfsPartition);
-      hdfsTable.reloadPartitions(msClient.getHiveClient(), hmsPartToHdfsPart);
+      hdfsTable.reloadPartitions(msClient.getHiveClient(), hmsPartToHdfsPart, true);
     }
     hdfsTable.setCatalogVersion(newCatalogVersion);
     wasPartitionReloaded.setRef(true);
@@ -3539,6 +3547,80 @@ public class CatalogServiceCatalog extends Catalog {
     }
   }
 
+  /**
+   * Marks write ids with corresponding status for the table if it is loaded HdfsTable.
+   */
+  public void addWriteIdsToTable(String dbName, String tblName, long eventId,
+      List<Long> writeIds, MutableValidWriteIdList.WriteIdStatus status)
+    throws CatalogException {
+    Table tbl;
+    try {
+      tbl = getTable(dbName, tblName);
+    } catch (DatabaseNotFoundException e) {
+      LOG.debug("Not adding write ids to table {}.{} for event {} " +
+          "since database was not found", dbName, tblName, eventId);
+      return;
+    }
+    if (tbl == null) {
+      LOG.debug("Not adding write ids to table {}.{} for event {} since it was not found",
+          dbName, tblName, eventId);
+      return;
+    }
+    if (!tbl.isLoaded()) {
+      LOG.debug("Not adding write ids to table {}.{} for event {} " +
+          "since it was not loaded" , dbName, tblName, eventId);
+      return;
+    }
+    if (!(tbl instanceof HdfsTable)) {
+      LOG.debug("Not adding write ids to table {}.{} for event {} " +
+          "since it is not HdfsTable", dbName, tblName, eventId);
+      return;
+    }
+    if (eventId > 0 && eventId <= tbl.getCreateEventId()) {
+      LOG.debug("Not adding write ids to table {}.{} for event {} since it is recreated.",
+          dbName, tblName, eventId);
+      return;
+    }
+    if (!tryWriteLock(tbl)) {
+      throw new CatalogException(String.format(
+          "Error locking table %s for event %d", tbl.getFullName(), eventId));
+    }
+    try {
+      long newCatalogVersion = incrementAndGetCatalogVersion();
+      versionLock_.writeLock().unlock();
+      HdfsTable hdfsTable = (HdfsTable) tbl;
+      // A non-acid table could be upgraded to an acid table, and its valid write id list
+      // is not yet be loaded. In this case, we just do nothing. The table should be
+      // reloaded for the AlterTable event that sets the table as transactional.
+      if (hdfsTable.getValidWriteIds() == null) {
+        LOG.debug("Not adding write ids to table {}.{} for event {} since it was just "
+            + "upgraded to an acid table and it's valid write id list is not loaded",
+            dbName, tblName, eventId);
+        return;
+      }
+      if (!hdfsTable.addWriteIds(writeIds, status)) {
+        return;
+      }
+      tbl.setCatalogVersion(newCatalogVersion);
+      LOG.debug("Added {} writeId to table {}: {} for event {}", status,
+          tbl.getFullName(), writeIds, eventId);
+    } finally {
+      UnlockWriteLockIfErroneouslyLocked();
+      tbl.releaseWriteLock();
+    }
+  }
+
+  /**
+   * This method checks if the version lock is unlocked. If it's still locked then it
+   * logs an error and unlocks it.
+   */
+  private void UnlockWriteLockIfErroneouslyLocked() {
+    if (versionLock_.isWriteLockedByCurrentThread()) {
+      LOG.error("Write lock should have been released.");
+      versionLock_.writeLock().unlock();
+    }
+  }
+
   CatalogdTableInvalidator getCatalogdTableInvalidator() {
     return catalogdTableInvalidator_;
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 8959344..afc084e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -193,6 +192,8 @@ public class HdfsTable extends Table implements FeFsTable {
   // for a given ValidWriteIdList
   public static final String FILEMETADATA_CACHE_MISS_METRIC = "filemetadata-cache-miss";
   public static final String FILEMETADATA_CACHE_HIT_METRIC = "filemetadata-cache-hit";
+  // metric used to monitor the number of times method loadFileMetadata is called
+  public static final String NUM_LOAD_FILEMETADATA_METRIC = "num-load-filemetadata";
 
   // Load all partitions time, including fetching all partitions
   // from HMS and loading all partitions. The code path is
@@ -740,6 +741,7 @@ public class HdfsTable extends Table implements FeFsTable {
   private long loadFileMetadataForPartitions(IMetaStoreClient client,
       Collection<HdfsPartition.Builder> partBuilders, boolean isRefresh,
       String debugActions) throws CatalogException {
+    getMetrics().getCounter(NUM_LOAD_FILEMETADATA_METRIC).inc();
     final Clock clock = Clock.defaultClock();
     long startTime = clock.getTick();
 
@@ -2709,7 +2711,7 @@ public class HdfsTable extends Table implements FeFsTable {
           hmsPartToHdfsPart.put(partition, hdfsPartition);
         }
       }
-      reloadPartitions(client, hmsPartToHdfsPart);
+      reloadPartitions(client, hmsPartToHdfsPart, true);
       return hmsPartToHdfsPart.size();
     } catch (NoSuchObjectException e) {
       // HMS throws a NoSuchObjectException if the table does not exist
@@ -2724,6 +2726,43 @@ public class HdfsTable extends Table implements FeFsTable {
   }
 
   /**
+   * Reload the HdfsPartitions which correspond to the given partitions.
+   *
+   * @param client is the HMS client to be used.
+   * @param partsFromEvent Partition objects from the event.
+   * @param loadFileMetadata If true, file metadata will be reloaded.
+   * @param reason Reason for reloading the partitions for logging purposes.
+   * @return the number of partitions which were reloaded.
+   */
+  public int reloadPartitionsFromEvent(IMetaStoreClient client,
+      List<Partition> partsFromEvent, boolean loadFileMetadata, String reason)
+      throws CatalogException {
+    Preconditions.checkArgument(partsFromEvent != null
+        && !partsFromEvent.isEmpty());
+    Preconditions.checkState(isWriteLockedByCurrentThread(), "Write Lock should be "
+        + "held before reloadPartitionsFromEvent");
+    LOG.info("Reloading partition metadata for table: {} ({})", getFullName(), reason);
+    Map<Partition, HdfsPartition> hmsPartToHdfsPart = new HashMap<>();
+    try {
+      for (Partition partition : partsFromEvent) {
+        List<LiteralExpr> partExprs = getTypeCompatiblePartValues(partition.getValues());
+        HdfsPartition hdfsPartition = getPartition(partExprs);
+        // only reload partitions that have more recent write id
+        if (hdfsPartition != null &&
+            (!AcidUtils.isTransactionalTable(msTable_.getParameters())
+                || hdfsPartition.getWriteId() < partition.getWriteId())) {
+          hmsPartToHdfsPart.put(partition, hdfsPartition);
+        }
+      }
+      reloadPartitions(client, hmsPartToHdfsPart, loadFileMetadata);
+      return hmsPartToHdfsPart.size();
+    } catch (UnsupportedEncodingException e) {
+      throw new CatalogException(
+          "Unexpected error while retrieving partitions for table " + getFullName(), e);
+    }
+  }
+
+  /**
    * Reloads the metadata of partitions given by a map of HMS Partitions to existing (old)
    * HdfsPartitions.
    * @param hmsPartsToHdfsParts The map of HMS partition object to the old HdfsPartition.
@@ -2732,9 +2771,13 @@ public class HdfsTable extends Table implements FeFsTable {
    *                            reconstructed from the HMS partition key. If the
    *                            value for a given partition key is null then nothing is
    *                            removed and a new HdfsPartition is simply added.
+   * @param loadFileMetadata If true, file metadata will be reloaded.
    */
   public void reloadPartitions(IMetaStoreClient client,
-      Map<Partition, HdfsPartition> hmsPartsToHdfsParts) throws CatalogException {
+      Map<Partition, HdfsPartition> hmsPartsToHdfsParts, boolean loadFileMetadata)
+      throws CatalogException {
+    Preconditions.checkState(isWriteLockedByCurrentThread(), "Write Lock should be "
+        + "held before reloadPartitions");
     FsPermissionCache permissionCache = new FsPermissionCache();
     Map<HdfsPartition.Builder, HdfsPartition> partBuilderToPartitions = new HashMap<>();
     for (Map.Entry<Partition, HdfsPartition> entry : hmsPartsToHdfsParts.entrySet()) {
@@ -2750,9 +2793,11 @@ public class HdfsTable extends Table implements FeFsTable {
       }
       partBuilderToPartitions.put(partBuilder, oldPartition);
     }
-    // load file metadata in parallel
-    loadFileMetadataForPartitions(client,
-        partBuilderToPartitions.keySet(),/*isRefresh=*/true);
+    if (loadFileMetadata) {
+      // load file metadata in parallel
+      loadFileMetadataForPartitions(client, partBuilderToPartitions.keySet(),
+          /*isRefresh=*/true);
+    }
     for (Map.Entry<HdfsPartition.Builder, HdfsPartition> entry :
         partBuilderToPartitions.entrySet()) {
       if (entry.getValue() != null) {
@@ -2795,6 +2840,7 @@ public class HdfsTable extends Table implements FeFsTable {
     metrics_.addTimer(CATALOG_UPDATE_DURATION_METRIC);
     metrics_.addCounter(FILEMETADATA_CACHE_HIT_METRIC);
     metrics_.addCounter(FILEMETADATA_CACHE_MISS_METRIC);
+    metrics_.addCounter(NUM_LOAD_FILEMETADATA_METRIC);
   }
 
   /**
@@ -2838,7 +2884,7 @@ public class HdfsTable extends Table implements FeFsTable {
   }
 
   /**
-   * Set ValistWriteIdList with stored writeId
+   * Set ValidWriteIdList with stored writeId
    * @param client the client to access HMS
    */
   protected boolean loadValidWriteIdList(IMetaStoreClient client)
@@ -2861,7 +2907,49 @@ public class HdfsTable extends Table implements FeFsTable {
 
   @Override
   public ValidWriteIdList getValidWriteIds() {
-    return validWriteIds_;
+    if (validWriteIds_ == null) {
+      return null;
+    }
+    // returns a copy to avoid validWriteIds_ is modified outside
+    return MetastoreShim.getValidWriteIdListFromString(validWriteIds_.toString());
+  }
+
+  /**
+   * Sets validWriteIds of this table.
+   * @param writeIdList If the writeIdList is {@link MutableValidWriteIdList}, it is set
+   *                    as the original instance. Otherwise, a new instance is created.
+   */
+  public void setValidWriteIds(ValidWriteIdList writeIdList) {
+    if (writeIdList != null) {
+      validWriteIds_ = new MutableValidReaderWriteIdList(writeIdList);
+    } else {
+      validWriteIds_ = null;
+    }
+  }
+
+  /**
+   * Add write ids to the validWriteIdList of this table.
+   * @param writeIds a list of write ids
+   * @param status the status of the writeIds argument
+   * @return True if any of writeIds is added and false otherwise
+   */
+  public boolean addWriteIds(List<Long> writeIds,
+      MutableValidWriteIdList.WriteIdStatus status) throws CatalogException {
+    Preconditions.checkState(isWriteLockedByCurrentThread(), "Write Lock should be held "
+        + "before addWriteIds.");
+    Preconditions.checkArgument(writeIds != null, "Cannot add null write ids");
+    Preconditions.checkState(validWriteIds_ != null, "Write id list should not be null");
+    switch (status) {
+      case OPEN:
+        return validWriteIds_.addOpenWriteId(Collections.max(writeIds));
+      case COMMITTED:
+        return validWriteIds_.addCommittedWriteIds(writeIds);
+      case ABORTED:
+        return validWriteIds_.addAbortedWriteIds(writeIds);
+      default:
+        throw new CatalogException("Unknown write id status " + status + " for table "
+            + getFullName());
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index e1524d6..3376a85 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -139,7 +139,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
 
   // Represents the event id in the metastore which pertains to the creation of this
   // table. Defaults to -1 for a preexisting table or if events processing is not active.
-  protected long createEventId_ = -1;
+  protected volatile long createEventId_ = -1;
 
   // tracks the in-flight metastore events for this table. Used by Events processor to
   // avoid unnecessary refresh when the event is received
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableWriteId.java b/fe/src/main/java/org/apache/impala/catalog/TableWriteId.java
new file mode 100644
index 0000000..ce269e4
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/TableWriteId.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A class that associate a table with write id. This is used to track what table and
+ * what write id is allocated for a transaction.
+ */
+public class TableWriteId {
+  // Database name
+  private String dbName_;
+  // Table name
+  private String tblName_;
+  // Create event id, which help us determine if a table is recreated
+  private long createEventId_;
+  // Write id of this table in the transaction
+  private long writeId_;
+
+  public TableWriteId(String dbName, String tblName, long createEventId, long writeId) {
+    Preconditions.checkArgument(dbName != null && tblName != null);
+    this.dbName_ = dbName;
+    this.tblName_ = tblName;
+    this.createEventId_ = createEventId;
+    this.writeId_ = writeId;
+  }
+
+  public String getDbName() {
+    return dbName_;
+  }
+
+  public String getTblName() {
+    return tblName_;
+  }
+
+  public long getCreateEventId() {
+    return createEventId_;
+  }
+
+  public long getWriteId() {
+    return writeId_;
+  }
+
+  public boolean equals(Object object) {
+    if (this == object) return true;
+    if (object == null || getClass() != object.getClass()) return false;
+    TableWriteId that = (TableWriteId) object;
+    return dbName_.equals(that.dbName_) && tblName_.equals(that.tblName_) &&
+        createEventId_ == that.createEventId_ && writeId_ == that.writeId_;
+  }
+
+  public int hashCode() {
+    return java.util.Objects.hash(super.hashCode(), createEventId_, writeId_);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 59c381a..7fd86da 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -30,13 +30,21 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Objects;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.GetAllWriteEventInfoRequest;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
+import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
 import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage;
@@ -51,16 +59,21 @@ import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.DatabaseNotFoundException;
 import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.TableNotFoundException;
 import org.apache.impala.catalog.TableNotLoadedException;
+import org.apache.impala.catalog.TableWriteId;
 import org.apache.impala.common.Metrics;
 import org.apache.impala.common.Reference;
 import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.hive.common.MutableValidWriteIdList;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.AcidUtils;
+import org.apache.thrift.TException;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
 
@@ -107,6 +120,9 @@ public class MetastoreEvents {
     DROP_PARTITION("DROP_PARTITION"),
     INSERT("INSERT"),
     INSERT_PARTITIONS("INSERT_PARTITIONS"),
+    ALLOC_WRITE_ID_EVENT("ALLOC_WRITE_ID_EVENT"),
+    COMMIT_TXN("COMMIT_TXN"),
+    ABORT_TXN("ABORT_TXN"),
     OTHER("OTHER");
 
     private final String eventType_;
@@ -167,6 +183,16 @@ public class MetastoreEvents {
       Preconditions.checkNotNull(event.getEventType());
       MetastoreEventType metastoreEventType =
           MetastoreEventType.from(event.getEventType());
+      if (BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable()) {
+        switch (metastoreEventType) {
+          case ALLOC_WRITE_ID_EVENT:
+            return new AllocWriteIdEvent(catalogOpExecutor_, metrics_, event);
+          case COMMIT_TXN:
+            return new CommitTxnEvent(catalogOpExecutor_, metrics_, event);
+          case ABORT_TXN:
+            return new AbortTxnEvent(catalogOpExecutor_, metrics_, event);
+        }
+      }
       switch (metastoreEventType) {
         case CREATE_TABLE:
           return new CreateTableEvent(catalogOpExecutor_, metrics_, event);
@@ -697,7 +723,7 @@ public class MetastoreEvents {
     protected boolean reloadTableFromCatalog(String operation, boolean isTransactional)
         throws CatalogException {
       try {
-        if (!catalog_.reloadTableIfExists(dbName_, tblName_,
+        if (!catalog_.reloadTableIfExists(dbName_, tblName_, getEventId(),
             "Processing " + operation + " event from HMS")) {
           debugLog("Automatic refresh on table {} failed as the table "
               + "either does not exist anymore or is not in loaded state.",
@@ -723,14 +749,21 @@ public class MetastoreEvents {
      * Reloads the partitions provided, only if the table is loaded and if the partitions
      * exist in catalogd.
      * @param partitions the list of Partition objects which need to be reloaded.
+     * @param loadFileMetadata
      * @param reason The reason for reload operation which is used for logging by
      *               catalogd.
      */
-    protected void reloadPartitions(List<Partition> partitions, String reason)
-        throws CatalogException {
+    protected void reloadPartitions(List<Partition> partitions, boolean loadFromEvent,
+        boolean loadFileMetadata, String reason) throws CatalogException {
       try {
-        int numPartsRefreshed = catalogOpExecutor_.reloadPartitionsIfExist(getEventId(),
-            dbName_, tblName_, partitions, reason);
+        int numPartsRefreshed;
+        if (loadFromEvent) {
+          numPartsRefreshed = catalogOpExecutor_.reloadPartitionsFromEvent(getEventId(),
+              dbName_, tblName_, partitions, loadFileMetadata, reason);
+        } else {
+          numPartsRefreshed = catalogOpExecutor_.reloadPartitionsIfExist(getEventId(),
+              dbName_, tblName_, partitions, reason);
+        }
         if (numPartsRefreshed > 0) {
           metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_PARTITION_REFRESHES)
               .inc(numPartsRefreshed);
@@ -986,7 +1019,7 @@ public class MetastoreEvents {
         // Ignore event if table or database is not in catalog. Throw exception if
         // refresh fails. If the partition does not exist in metastore the reload
         // method below removes it from the catalog
-        reloadPartitions(Arrays.asList(insertPartition_), "INSERT event");
+        reloadPartitions(Arrays.asList(insertPartition_), false, true, "INSERT event");
       } catch (CatalogException e) {
         throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
                 + "partition on table {} partition {} failed. Event processing cannot "
@@ -1538,13 +1571,15 @@ public class MetastoreEvents {
         return;
       }
       try {
-        // Reload the whole table if it's a transactional table or materialized view.
-        // Materialized views are treated as a special case because it's possible to
-        // receive partition event on MVs, but they are regular views in Impala. That
-        // cause problems on the reloading partition logic which expects it to be a
-        // HdfsTable.
-        if ((AcidUtils.isTransactionalTable(msTbl_.getParameters()) && !isSelfEvent())
-            || MetaStoreUtils.isMaterializedViewTable(msTbl_)) {
+        // Reload the whole table if it's a transactional table and incremental
+        // refresh is not enabled. Materialized views are treated as a special case
+        // because it's possible to receive partition event on MVs, but they are
+        // regular views in Impala. That cause problems on the reloading partition
+        // logic which expects it to be a HdfsTable.
+        boolean incrementalRefresh =
+            BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable();
+        if ((AcidUtils.isTransactionalTable(msTbl_.getParameters()) && !isSelfEvent() &&
+            !incrementalRefresh) || MetaStoreUtils.isMaterializedViewTable(msTbl_)) {
           reloadTableFromCatalog("ADD_PARTITION", true);
         } else {
           // HMS adds partitions in a transactional way. This means there may be multiple
@@ -1684,14 +1719,15 @@ public class MetastoreEvents {
       // on the reloading partition logic which expects it to be a HdfsTable.
       if (AcidUtils.isTransactionalTable(msTbl_.getParameters())
           || MetaStoreUtils.isMaterializedViewTable(msTbl_)) {
-        reloadTableFromCatalog("ALTER_PARTITION", true);
+        reloadTransactionalTable();
       } else {
         // Refresh the partition that was altered.
         Preconditions.checkNotNull(partitionAfter_);
         List<TPartitionKeyValue> tPartSpec = getTPartitionSpecFromHmsPartition(msTbl_,
             partitionAfter_);
         try {
-          reloadPartitions(Arrays.asList(partitionAfter_), "ALTER_PARTITION event");
+          reloadPartitions(Arrays.asList(partitionAfter_), false, true,
+              "ALTER_PARTITION event");
         } catch (CatalogException e) {
           throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
                   + "partition on table {} partition {} failed. Event processing cannot "
@@ -1723,6 +1759,17 @@ public class MetastoreEvents {
           Arrays.asList(getTPartitionSpecFromHmsPartition(msTbl_, partitionAfter_)),
           partitionAfter_.getParameters());
     }
+
+    private void reloadTransactionalTable() throws CatalogException {
+      boolean incrementalRefresh =
+          BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable();
+      if (incrementalRefresh) {
+        reloadPartitions(Collections.singletonList(partitionAfter_), true, false,
+            "ALTER_PARTITION");
+      } else {
+        reloadTableFromCatalog("ALTER_PARTITION", true);
+      }
+    }
   }
 
   /**
@@ -1815,7 +1862,7 @@ public class MetastoreEvents {
           partitions.add(event.getPartitionForBatching());
         }
         try {
-          reloadPartitions(partitions, getEventType().toString() + " event");
+          reloadPartitions(partitions, false, true, getEventType().toString() + " event");
         } catch (CatalogException e) {
           throw new MetastoreNotificationNeedsInvalidateException(String.format(
               "Refresh partitions on table %s failed when processing event ids %s-%s. "
@@ -1904,8 +1951,10 @@ public class MetastoreEvents {
         // receive partition event on MVs, but they are regular views in Impala. That
         // cause problems on the reloading partition logic which expects it to be a
         // HdfsTable.
-        if (AcidUtils.isTransactionalTable(msTbl_.getParameters())
-            || MetaStoreUtils.isMaterializedViewTable(msTbl_)) {
+        boolean incrementalRefresh =
+            BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable();
+        if ((AcidUtils.isTransactionalTable(msTbl_.getParameters()) &&
+            !incrementalRefresh) || MetaStoreUtils.isMaterializedViewTable(msTbl_)) {
           reloadTableFromCatalog("DROP_PARTITION", true);
         } else {
           int numPartsRemoved = catalogOpExecutor_
@@ -1938,6 +1987,256 @@ public class MetastoreEvents {
   }
 
   /**
+   * Metastore event handler for ALLOC_WRITE_ID_EVENT events. This event is used to keep
+   * track of write ids for partitioned transactional tables.
+   */
+  public static class AllocWriteIdEvent extends MetastoreTableEvent {
+    private final List<TxnToWriteId> txnToWriteIdList_;
+    private org.apache.impala.catalog.Table tbl_;
+
+    private AllocWriteIdEvent(CatalogOpExecutor catalogOpExecutor,
+        Metrics metrics, NotificationEvent event) throws MetastoreNotificationException {
+      super(catalogOpExecutor, metrics, event);
+      Preconditions.checkState(
+          getEventType().equals(MetastoreEventType.ALLOC_WRITE_ID_EVENT));
+      Preconditions.checkNotNull(event.getMessage());
+      AllocWriteIdMessage allocWriteIdMessage =
+          MetastoreEventsProcessor.getMessageDeserializer().getAllocWriteIdMessage(
+              event.getMessage());
+      txnToWriteIdList_ = allocWriteIdMessage.getTxnToWriteIdList();
+      try {
+        // We need to retrieve msTbl_ from catalog because the AllocWriteIdEvent
+        // doesn't bring the table object. However, we need msTbl_ for
+        // MetastoreTableEvent.isEventProcessingDisabled() to determine if event
+        // processing is disabled for the table.
+        tbl_ = catalog_.getTable(dbName_, tblName_);
+        if (tbl_ != null && tbl_.getCreateEventId() < getEventId()) {
+          msTbl_ = tbl_.getMetaStoreTable();
+        }
+      } catch (DatabaseNotFoundException e) {
+        // do nothing
+      } catch (Exception e) {
+        throw new MetastoreNotificationException(debugString("Unable to retrieve table "
+            + "object for AllocWriteIdEvent: {}", getEventId()), e);
+      }
+    }
+
+    @Override
+    protected void process() throws MetastoreNotificationException {
+      if (msTbl_ == null) {
+        debugLog("Ignoring the event since table {} is not found",
+            getFullyQualifiedTblName());
+        return;
+      }
+      // For non-partitioned tables, we can just reload the whole table without
+      // keeping track of write ids.
+      if (msTbl_.getPartitionKeysSize() == 0) {
+        debugLog("Ignoring the event since table {} is non-partitioned",
+            getFullyQualifiedTblName());
+        return;
+      }
+      try {
+        List<Long> writeIds = txnToWriteIdList_.stream()
+            .map(TxnToWriteId::getWriteId)
+            .collect(Collectors.toList());
+        catalog_.addWriteIdsToTable(dbName_, tblName_, getEventId(), writeIds,
+            MutableValidWriteIdList.WriteIdStatus.OPEN);
+        for (TxnToWriteId txnToWriteId : txnToWriteIdList_) {
+          TableWriteId tableWriteId = new TableWriteId(dbName_, tblName_,
+              tbl_.getCreateEventId(), txnToWriteId.getWriteId());
+          catalog_.addWriteId(txnToWriteId.getTxnId(), tableWriteId);
+        }
+      } catch (CatalogException e) {
+        throw new MetastoreNotificationNeedsInvalidateException("Failed to mark open "
+            + "write ids to table. Event processing cannot continue. Issue an "
+            + "invalidate metadata command to reset event processor.", e);
+      }
+    }
+
+    @Override
+    protected SelfEventContext getSelfEventContext() {
+      throw new UnsupportedOperationException("self-event evaluation is not needed for "
+          + "this event type");
+    }
+
+    @Override
+    protected boolean isEventProcessingDisabled() {
+      if (msTbl_ == null) {
+        return false;
+      }
+      return super.isEventProcessingDisabled();
+    }
+  }
+
+  /**
+   * Metastore event handler for COMMIT_TXN events. Handles commit event for transactinal
+   * tables.
+   */
+  public static class CommitTxnEvent extends MetastoreEvent {
+    private final CommitTxnMessage commitTxnMessage_;
+    private final long txnId_;
+
+    CommitTxnEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
+        NotificationEvent event) {
+      super(catalogOpExecutor, metrics, event);
+      Preconditions.checkState(getEventType().equals(MetastoreEventType.COMMIT_TXN));
+      Preconditions.checkNotNull(event.getMessage());
+      commitTxnMessage_ = MetastoreEventsProcessor.getMessageDeserializer()
+          .getCommitTxnMessage(event.getMessage());
+      txnId_ = commitTxnMessage_.getTxnId();
+    }
+
+    @Override
+    protected void process() throws MetastoreNotificationException {
+      // To ensure no memory leaking in case an exception is thrown, we remove entries
+      // at first.
+      Set<TableWriteId> committedWriteIds = catalog_.removeWriteIds(txnId_);
+      // Via getAllWriteEventInfo, we can get data insertion info for transactional tables
+      // even though there are no insert events generated for transactional tables. Note
+      // that we cannot get DDL info from this API.
+      List<WriteEventInfo> writeEventInfoList;
+      try (MetaStoreClientPool.MetaStoreClient client = catalog_.getMetaStoreClient()) {
+        writeEventInfoList = client.getHiveClient().getAllWriteEventInfo(
+            new GetAllWriteEventInfoRequest(txnId_));
+      } catch (TException e) {
+        throw new MetastoreNotificationNeedsInvalidateException(debugString("Failed to "
+            + "get write event infos for txn {}. Event processing cannot continue. Issue "
+            + "an invalidate metadata command to reset event processor.", txnId_), e);
+      }
+
+      try {
+        if (writeEventInfoList != null && !writeEventInfoList.isEmpty()) {
+          commitTxnMessage_.addWriteEventInfo(writeEventInfoList);
+          addCommittedWriteIdsAndRefreshPartitions();
+        }
+        // committed write ids for DDL need to be added here
+        addCommittedWriteIdsToTables(committedWriteIds);
+      } catch (Exception e) {
+        throw new MetastoreNotificationNeedsInvalidateException(debugString("Failed to "
+            + "mark committed write ids and refresh partitions for txn {}. Event "
+            + "processing cannot continue. Issue an invalidate metadata command to reset "
+            + "event processor.", txnId_), e);
+      }
+    }
+
+    private void addCommittedWriteIdsToTables(Set<TableWriteId> tableWriteIds)
+        throws CatalogException {
+      for (TableWriteId tableWriteId: tableWriteIds) {
+        catalog_.addWriteIdsToTable(tableWriteId.getDbName(), tableWriteId.getTblName(),
+            getEventId(),
+            Collections.singletonList(tableWriteId.getWriteId()),
+            MutableValidWriteIdList.WriteIdStatus.COMMITTED);
+      }
+    }
+
+    private void addCommittedWriteIdsAndRefreshPartitions() throws Exception {
+      Preconditions.checkNotNull(commitTxnMessage_.getWriteIds());
+      List<Long> writeIds = Collections.unmodifiableList(commitTxnMessage_.getWriteIds());
+      List<Partition> parts = new ArrayList<>();
+      // To load partitions together for the same table, indexes are grouped by table name
+      Map<TableName, List<Integer>> tableNameToIdxs = new HashMap<>();
+      for (int i = 0; i < writeIds.size(); i++) {
+        org.apache.hadoop.hive.metastore.api.Table tbl = commitTxnMessage_.getTableObj(i);
+        TableName tableName = new TableName(tbl.getDbName(), tbl.getTableName());
+        parts.add(commitTxnMessage_.getPartitionObj(i));
+        tableNameToIdxs.computeIfAbsent(tableName, k -> new ArrayList<>()).add(i);
+      }
+      for (Map.Entry<TableName, List<Integer>> entry : tableNameToIdxs.entrySet()) {
+        org.apache.hadoop.hive.metastore.api.Table tbl =
+            commitTxnMessage_.getTableObj(entry.getValue().get(0));
+        List<Long> writeIdsForTable = entry.getValue().stream()
+            .map(i -> writeIds.get(i))
+            .collect(Collectors.toList());
+        List<Partition> partsForTable = entry.getValue().stream()
+            .map(i -> parts.get(i))
+            .collect(Collectors.toList());
+        if (tbl.getPartitionKeysSize() > 0
+            && !MetaStoreUtils.isMaterializedViewTable(tbl)) {
+          try {
+            catalogOpExecutor_.addCommittedWriteIdsAndReloadPartitionsIfExist(
+                getEventId(), entry.getKey().getDb(), entry.getKey().getTbl(),
+                writeIdsForTable, partsForTable, "CommitTxnEvent");
+          } catch (TableNotLoadedException e) {
+            debugLog("Ignoring reloading since table {} is not loaded",
+                entry.getKey());
+          } catch (DatabaseNotFoundException | TableNotFoundException e) {
+            debugLog("Ignoring reloading since table {} is not found",
+                entry.getKey());
+          }
+        } else {
+          catalog_.reloadTableIfExists(entry.getKey().getDb(), entry.getKey().getTbl(),
+              getEventId(), "CommitTxnEvent");
+        }
+      }
+    }
+
+    @Override
+    protected boolean isEventProcessingDisabled() {
+      return false;
+    }
+
+    @Override
+    protected SelfEventContext getSelfEventContext() {
+      throw new UnsupportedOperationException("Self-event evaluation is not needed for "
+          + "this event type");
+    }
+  }
+
+  /**
+   * Metastore event handler for ABORT_TXN events. Handles abort event for transactional
+   * tables.
+   */
+  public static class AbortTxnEvent extends MetastoreEvent {
+    private final long txnId_;
+
+    AbortTxnEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
+        NotificationEvent event) {
+      super(catalogOpExecutor, metrics, event);
+      Preconditions.checkState(getEventType().equals(MetastoreEventType.ABORT_TXN));
+      Preconditions.checkNotNull(event.getMessage());
+      AbortTxnMessage abortTxnMessage =
+          MetastoreEventsProcessor.getMessageDeserializer().getAbortTxnMessage(
+              event.getMessage());
+      txnId_ = abortTxnMessage.getTxnId();
+    }
+
+    @Override
+    protected void process() throws MetastoreNotificationException {
+      try {
+        addAbortedWriteIdsToTables(catalog_.getWriteIds(txnId_));
+      } catch (CatalogException e) {
+        throw new MetastoreNotificationNeedsInvalidateException(debugString("Failed to "
+            + "mark aborted write ids to table for txn {}. Event processing cannot "
+            + "continue. Issue an invalidate metadata command to reset event processor.",
+            txnId_), e);
+      } finally {
+        catalog_.removeWriteIds(txnId_);
+      }
+    }
+
+    private void addAbortedWriteIdsToTables(Set<TableWriteId> tableWriteIds)
+        throws CatalogException {
+      for (TableWriteId tableWriteId: tableWriteIds) {
+        catalog_.addWriteIdsToTable(tableWriteId.getDbName(), tableWriteId.getTblName(),
+            getEventId(),
+            Collections.singletonList(tableWriteId.getWriteId()),
+            MutableValidWriteIdList.WriteIdStatus.ABORTED);
+      }
+    }
+
+    @Override
+    protected boolean isEventProcessingDisabled() {
+      return false;
+    }
+
+    @Override
+    protected SelfEventContext getSelfEventContext() {
+      throw new UnsupportedOperationException("Self-event evaluation is not needed for "
+          + "this event type");
+    }
+  }
+
+  /**
    * An event type which is ignored. Useful for unsupported metastore event types
    */
   public static class IgnoredEvent extends MetastoreEvent {
diff --git a/fe/src/main/java/org/apache/impala/hive/common/MutableValidReaderWriteIdList.java b/fe/src/main/java/org/apache/impala/hive/common/MutableValidReaderWriteIdList.java
index 409d8e7..b3a82a1 100644
--- a/fe/src/main/java/org/apache/impala/hive/common/MutableValidReaderWriteIdList.java
+++ b/fe/src/main/java/org/apache/impala/hive/common/MutableValidReaderWriteIdList.java
@@ -235,46 +235,60 @@ public class MutableValidReaderWriteIdList implements MutableValidWriteIdList {
     return RangeResponse.SOME;
   }
 
+  @Override
   public boolean isWriteIdOpen(long writeId) {
     int index = Collections.binarySearch(exceptions, writeId);
     return index >= 0 && !abortedBits.get(index);
   }
 
   @Override
-  public void addOpenWriteId(long writeId) {
+  public boolean addOpenWriteId(long writeId) {
     if (writeId <= highWatermark) {
-      LOG.debug("Not adding open write id: {} {}", writeId, highWatermark);
-      return;
+      LOG.debug("Not adding open write id: {} since high water mark: {}", writeId,
+          highWatermark);
+      return false;
     }
     for (long currentId = highWatermark + 1; currentId <= writeId; currentId++) {
       exceptions.add(currentId);
     }
     highWatermark = writeId;
+    return true;
   }
 
   @Override
-  public void addAbortedWriteIds(List<Long> writeIds) {
+  public boolean addAbortedWriteIds(List<Long> writeIds) {
     Preconditions.checkNotNull(writeIds);
     Preconditions.checkArgument(writeIds.size() > 0);
+    // used to track if any of the writeIds is added
+    boolean added = false;
     long maxWriteId = Collections.max(writeIds);
-    Preconditions.checkArgument(maxWriteId <= highWatermark,
-        "Should mark write id as open before abort it: %s", maxWriteId);
+    if (maxWriteId > highWatermark) {
+      LOG.trace("Current high water mark: {} and max aborted write id: {}, so mark them "
+          + "as open first", highWatermark, maxWriteId);
+      addOpenWriteId(maxWriteId);
+      added = true;
+    }
     for (long writeId : writeIds) {
       int index = Collections.binarySearch(exceptions, writeId);
       // make sure the write id is not committed
       Preconditions.checkState(index >= 0);
+      added = added || !abortedBits.get(index);
       abortedBits.set(index);
     }
     updateMinOpenWriteId();
+    return added;
   }
 
   @Override
-  public void addCommittedWriteIds(List<Long> writeIds) {
+  public boolean addCommittedWriteIds(List<Long> writeIds) {
     Preconditions.checkNotNull(writeIds);
     Preconditions.checkArgument(writeIds.size() > 0);
     long maxWriteId = Collections.max(writeIds);
-    Preconditions.checkArgument(maxWriteId <= highWatermark,
-        "Should mark write id (%s) as open before commit it", maxWriteId);
+    if (maxWriteId > highWatermark) {
+      LOG.trace("Current high water mark: {} and max committed write id: {}, so mark "
+          + "them as open first", highWatermark, maxWriteId);
+      addOpenWriteId(maxWriteId);
+    }
     List<Long> updatedExceptions = new ArrayList<>();
     BitSet updatedAbortedBits = new BitSet();
 
@@ -283,7 +297,8 @@ public class MutableValidReaderWriteIdList implements MutableValidWriteIdList {
       int idx = Collections.binarySearch(exceptions, writeId);
       if (idx >= 0) {
         // make sure the write id is open rather than aborted
-        Preconditions.checkState(!abortedBits.get(idx));
+        Preconditions.checkState(!abortedBits.get(idx),
+            "write id %d is expected to be open but is aborted", writeId);
         idxToRemove.add(idx);
       }
     }
@@ -297,6 +312,7 @@ public class MutableValidReaderWriteIdList implements MutableValidWriteIdList {
     exceptions = updatedExceptions;
     abortedBits = updatedAbortedBits;
     updateMinOpenWriteId();
+    return !idxToRemove.isEmpty();
   }
 
   private void updateMinOpenWriteId() {
diff --git a/fe/src/main/java/org/apache/impala/hive/common/MutableValidWriteIdList.java b/fe/src/main/java/org/apache/impala/hive/common/MutableValidWriteIdList.java
index 2fb7641..544fc7d 100644
--- a/fe/src/main/java/org/apache/impala/hive/common/MutableValidWriteIdList.java
+++ b/fe/src/main/java/org/apache/impala/hive/common/MutableValidWriteIdList.java
@@ -25,21 +25,33 @@ import java.util.List;
  * This is the mutable version of ValidWriteIdList
  */
 public interface MutableValidWriteIdList extends ValidWriteIdList {
+  enum WriteIdStatus {
+    OPEN, ABORTED, COMMITTED
+  }
+
   /**
    * This method will mark write ids between highWatermark + 1 and writeId inclusive as
    * open.
+   * @return True if the write id is added and false otherwise.
    */
-  void addOpenWriteId(long writeId);
+  boolean addOpenWriteId(long writeId);
 
   /**
    * This method will mark the writeIds as aborted.
    * Note that we cannot abort a write id that is committed.
+   * @return True if any write id is added and false otherwise.
    */
-  void addAbortedWriteIds(List<Long> writeIds);
+  boolean addAbortedWriteIds(List<Long> writeIds);
 
   /**
    * This method will mark the writeIds as committed.
    * Note that we cannot commit a write id that is aborted.
+   * @return True if any write id is added and false otherwise.
+   */
+  boolean addCommittedWriteIds(List<Long> writeIds);
+
+  /**
+   * @return True if the write id is open and false otherwise.
    */
-  void addCommittedWriteIds(List<Long> writeIds);
+  boolean isWriteIdOpen(long writeId);
 }
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 75047fa..b0614ae 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -333,4 +333,8 @@ public class BackendConfig {
   public String getStartupFilesystemCheckDirectories() {
     return backendCfg_.startup_filesystem_check_directories;
   }
+
+  public boolean getHMSEventIncrementalRefreshTransactionalTable() {
+    return backendCfg_.hms_event_incremental_refresh_transactional_table;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 71322cf..f7ca3e0 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -31,7 +31,6 @@ import com.google.common.collect.Sets;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -40,6 +39,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
@@ -54,6 +54,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
@@ -90,7 +91,6 @@ import org.apache.impala.authorization.AuthorizationManager;
 import org.apache.impala.authorization.User;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogObject;
-import org.apache.impala.catalog.CatalogObject.ThriftObjectType;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnNotFoundException;
@@ -146,6 +146,7 @@ import org.apache.impala.common.Reference;
 import org.apache.impala.common.TransactionException;
 import org.apache.impala.common.TransactionKeepalive.HeartbeatContext;
 import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.hive.common.MutableValidWriteIdList;
 import org.apache.impala.thrift.JniCatalogConstants;
 import org.apache.impala.thrift.TAlterDbParams;
 import org.apache.impala.thrift.TAlterDbSetOwnerParams;
@@ -4145,7 +4146,7 @@ public class CatalogOpExecutor {
   }
 
   /**
-   * Reloads the given partitions if the exists and have not been removed since the event
+   * Reloads the given partitions if they exist and have not been removed since the event
    * was generated.
    *
    * @param eventId EventId being processed.
@@ -4157,9 +4158,8 @@ public class CatalogOpExecutor {
    * @return the number of partitions which were reloaded. If the table does not exist,
    * returns 0. Some partitions could be skipped if they don't exist anymore.
    */
-  public int reloadPartitionsIfExist(long eventId, String dbName,
-      String tblName, List<Partition> partsFromEvent, String reason)
-      throws CatalogException {
+  public int reloadPartitionsIfExist(long eventId, String dbName, String tblName,
+      List<Partition> partsFromEvent, String reason) throws CatalogException {
     Table table = catalog_.getTable(dbName, tblName);
     if (table == null) {
       DeleteEventLog deleteEventLog = catalog_.getMetastoreEventProcessor()
@@ -4216,6 +4216,175 @@ public class CatalogOpExecutor {
     return 0;
   }
 
+  /**
+   * Reloads the given partitions if they exist and have not been removed since the event
+   * was generated. We don't retrieve partitions from HMS but use partitions from event.
+   *
+   * @param eventId EventId being processed.
+   * @param dbName Database name for the partition
+   * @param tblName Table name for the partition
+   * @param partsFromEvent List of {@link Partition} objects from the events to be
+   *                       reloaded.
+   * @param reason Reason for reloading the partitions for logging purposes.
+   * @param loadFileMetadata If true, reload file metadata. Otherwise, just reload
+   *                         partitions metadata.
+   * @return the number of partitions which were reloaded. If the table does not exist,
+   * returns 0. Some partitions could be skipped if they don't exist anymore.
+   */
+  public int reloadPartitionsFromEvent(long eventId, String dbName, String tblName,
+      List<Partition> partsFromEvent, boolean loadFileMetadata, String reason)
+      throws CatalogException {
+    Table table = catalog_.getTable(dbName, tblName);
+    if (table == null) {
+      DeleteEventLog deleteEventLog = catalog_.getMetastoreEventProcessor()
+          .getDeleteEventLog();
+      if (deleteEventLog
+          .wasRemovedAfter(eventId, DeleteEventLog.getTblKey(dbName, tblName))) {
+        LOG.info(
+            "Not reloading the partition of table {} since it was removed "
+                + "later in catalog", new TableName(dbName, tblName));
+        return 0;
+      } else {
+        throw new TableNotFoundException(
+            "Table " + dbName + "." + tblName + " not found");
+      }
+    }
+    if (table instanceof IncompleteTable) {
+      LOG.info("Table {} is not loaded. Skipping drop partition event {}",
+          table.getFullName(), eventId);
+      return 0;
+    }
+    if (!(table instanceof HdfsTable)) {
+      throw new CatalogException("Partition event received on a non-hdfs table");
+    }
+    try {
+      tryWriteLock(table, reason);
+      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+      catalog_.getLock().writeLock().unlock();
+      HdfsTable hdfsTable = (HdfsTable) table;
+      int numOfPartsReloaded;
+      try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+        numOfPartsReloaded = hdfsTable.reloadPartitionsFromEvent(
+            metaStoreClient.getHiveClient(), partsFromEvent, loadFileMetadata, reason);
+      }
+      hdfsTable.setCatalogVersion(newCatalogVersion);
+      return numOfPartsReloaded;
+    } catch (InternalException e) {
+      throw new CatalogException(
+          "Could not acquire lock on the table " + table.getFullName(), e);
+    } finally {
+      UnlockWriteLockIfErronouslyLocked();
+      table.releaseWriteLock();
+    }
+  }
+
+  /**
+   * This function is only used by CommitTxnEvent to mark write ids as committed and
+   * reload partitions from events atomically.
+   *
+   * @param eventId EventId being processed
+   * @param dbName Database name for the partition
+   * @param tblName Table name for the partition
+   * @param writeIds List of write ids for this transaction
+   * @param partsFromEvent List of Partition objects from the events to be reloaded
+   * @param reason Reason for reloading the partitions for logging purposes.
+   * @return the number of partitions which were reloaded. Some partitions can be
+   * skipped if they don't exist anymore, or they have stale write ids.
+   */
+  public int addCommittedWriteIdsAndReloadPartitionsIfExist(long eventId, String dbName,
+      String tblName, List<Long> writeIds, List<Partition> partsFromEvent, String reason)
+      throws CatalogException {
+    Table table = catalog_.getTable(dbName, tblName);
+    if (table == null) {
+      DeleteEventLog deleteEventLog = catalog_.getMetastoreEventProcessor()
+          .getDeleteEventLog();
+      if (deleteEventLog
+          .wasRemovedAfter(eventId, DeleteEventLog.getTblKey(dbName, tblName))) {
+        LOG.info(
+            "Not reloading partitions of table {} for event {} since it was removed "
+                + "later in catalog", new TableName(dbName, tblName), eventId);
+        return 0;
+      } else {
+        throw new TableNotFoundException(
+            "Table " + dbName + "." + tblName + " not found");
+      }
+    }
+    if (table instanceof IncompleteTable) {
+      LOG.info("Table {} is not loaded. Skipping partition event {}",
+          table.getFullName(), eventId);
+      return 0;
+    }
+    if (!(table instanceof HdfsTable)) {
+      throw new CatalogException("Partition event received on a non-hdfs table");
+    }
+
+    HdfsTable hdfsTable = (HdfsTable) table;
+    ValidWriteIdList previousWriteIdList = hdfsTable.getValidWriteIds();
+    try {
+      tryWriteLock(table, reason);
+      long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+      catalog_.getLock().writeLock().unlock();
+      Preconditions.checkState(previousWriteIdList != null,
+          "Write id list of table %s should not be null", table.getFullName());
+      // get a copy of previous write id list
+      previousWriteIdList = MetastoreShim.getValidWriteIdListFromString(
+          previousWriteIdList.toString());
+      // some partitions from the event or the table itself
+      // may not exist in HMS anymore. Hence, we collect the names here and re-fetch
+      // the partitions from HMS.
+      List<Partition> partsToRefresh = new ArrayList<>();
+      List<Long> writeIdsToRefresh = new ArrayList<>();
+      ListIterator<Partition> it = partsFromEvent.listIterator();
+      while (it.hasNext()) {
+        // The partition objects from HMS event was persisted when transaction was not
+        // committed, so its write id is smaller than the write id of the write event.
+        // Since the event is committed at this point, we need to update the partition
+        // object's write id by event's write id.
+        long writeId = writeIds.get(it.nextIndex());
+        Partition part = it.next();
+        // Aborted write id is not allowed. The write id can be committed if the table
+        // in cache is ahead of this commit event.
+        Preconditions.checkState(!previousWriteIdList.isWriteIdAborted(writeId),
+            "Write id %d of Table %s should not be aborted",
+            writeId, table.getFullName());
+        // Valid write id means committed write id here.
+        if (!previousWriteIdList.isWriteIdValid(writeId)) {
+          part.setWriteId(writeId);
+          partsToRefresh.add(part);
+          writeIdsToRefresh.add(writeId);
+        }
+      }
+      if (partsToRefresh.isEmpty()) {
+        LOG.info("Not reloading partitions of table {} for event {} since the cache is "
+            + "already up-to-date", table.getFullName(), eventId);
+        return 0;
+      }
+      // set write id as committed before reload the partitions so that we can get
+      // up-to-date filemetadata.
+      hdfsTable.addWriteIds(writeIdsToRefresh,
+          MutableValidWriteIdList.WriteIdStatus.COMMITTED);
+      int numOfPartsReloaded;
+      try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+        numOfPartsReloaded = hdfsTable.reloadPartitionsFromEvent(
+            metaStoreClient.getHiveClient(), partsToRefresh, true, reason);
+      }
+      hdfsTable.setCatalogVersion(newCatalogVersion);
+      return numOfPartsReloaded;
+    } catch (InternalException e) {
+      throw new CatalogException(
+          "Could not acquire lock on the table " + table.getFullName(), e);
+    } catch (Exception e) {
+      LOG.info("Rolling back the write id list of table {} because reloading "
+          + "for event {} is failed: {}", table.getFullName(), eventId, e.getMessage());
+      // roll back the original writeIdList
+      hdfsTable.setValidWriteIds(previousWriteIdList);
+      throw e;
+    } finally {
+      UnlockWriteLockIfErronouslyLocked();
+      table.releaseWriteLock();
+    }
+  }
+
   public ReentrantLock getMetastoreDdlLock() {
     return metastoreDdlLock_;
   }
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTableWriteIdTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogTableWriteIdTest.java
new file mode 100644
index 0000000..f2c6c79
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTableWriteIdTest.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Set;
+
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+
+import static org.junit.Assert.*;
+
+public class CatalogTableWriteIdTest {
+
+  private CatalogServiceCatalog catalog_;
+
+  @Before
+  public void init() {
+    catalog_ = CatalogServiceTestCatalog.create();
+  }
+
+  @After
+  public void cleanUp() {
+    catalog_.close();
+  }
+
+  @Test
+  public void test() {
+    TableWriteId tableWriteId1 = new TableWriteId("default", "table1", -1L, 1L);
+    catalog_.addWriteId(1L, tableWriteId1);
+
+    TableWriteId tableWriteId2 = new TableWriteId("default", "table2", -1L, 2L);
+    catalog_.addWriteId(1L, tableWriteId2);
+
+    TableWriteId tableWriteId3 = new TableWriteId("default", "table3", -1L, 3L);
+    catalog_.addWriteId(2L, tableWriteId3);
+
+    Set<TableWriteId> set = catalog_.getWriteIds(1L);
+    assertNotNull(set);
+    assertTrue(set.contains(tableWriteId1));
+    assertTrue(set.contains(tableWriteId2));
+
+    set = catalog_.getWriteIds(2L);
+    assertNotNull(set);
+    assertTrue(set.contains(tableWriteId3));
+
+    catalog_.removeWriteIds(1L);
+    catalog_.removeWriteIds(2L);
+
+    set = catalog_.getWriteIds(1L);
+    assertNotNull(set);
+    assertTrue(set.isEmpty());
+    set = catalog_.getWriteIds(2L);
+    assertNotNull(set);
+    assertTrue(set.isEmpty());
+  }
+}
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
index 0060189..ff4d0b6 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
@@ -29,7 +29,6 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collection;
 import java.util.Collections;
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 51ba024..ad064d4 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.api.Catalog;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -63,6 +64,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.impala.analysis.FunctionName;
 import org.apache.impala.analysis.HdfsUri;
@@ -98,7 +100,9 @@ import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorS
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.TransactionException;
 import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.testutil.CatalogServiceTestCatalog;
@@ -120,6 +124,7 @@ import org.apache.impala.thrift.TAlterTableSetRowFormatParams;
 import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams;
 import org.apache.impala.thrift.TAlterTableType;
 import org.apache.impala.thrift.TAlterTableUpdateStatsParams;
+import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TCatalogServiceRequestHeader;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TColumnType;
@@ -151,6 +156,7 @@ import org.apache.impala.thrift.TTypeNodeType;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateCatalogRequest;
 import org.apache.impala.thrift.TUpdatedPartition;
+import org.apache.impala.util.MetaStoreUtil;
 import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -374,7 +380,7 @@ public class MetastoreEventsProcessorTest {
       // create a database and table in the custom hive catalog whose name matches
       // with one already existing in Impala
       createDatabase(catName, TEST_DB_NAME, null);
-      createTable(catName, TEST_DB_NAME, tblName, null, false);
+      createTable(catName, TEST_DB_NAME, tblName, null, false, null);
       eventsProcessor_.processEvents();
       assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
       // assert that dbname and table in the default catalog exist
@@ -1077,6 +1083,28 @@ public class MetastoreEventsProcessorTest {
     }
   }
 
+  private void simulateInsertIntoTransactionalTableFromFS(
+      org.apache.hadoop.hive.metastore.api.Table msTbl, Partition partition,
+      int totalNumberOfFilesToAdd, long txnId, long writeId) throws IOException {
+    Path parentPath = partition == null ?
+        new Path(msTbl.getSd().getLocation()) : new Path(partition.getSd().getLocation());
+    Path deltaPath = new Path(parentPath, String.format("delta_%d_%d", writeId, writeId));
+    List<String> newFiles = addFilesToDirectory(deltaPath, "testFile.",
+        totalNumberOfFilesToAdd, false);
+    List<InsertEventRequestData> insertEventReqDatas = new ArrayList<>();
+    InsertEventRequestData insertEventRequestData = new InsertEventRequestData();
+    if (partition != null) {
+      insertEventRequestData.setPartitionVal(partition.getValues());
+    }
+    insertEventRequestData.setFilesAdded(newFiles);
+    insertEventRequestData.setReplace(false);
+    insertEventReqDatas.add(insertEventRequestData);
+    MetaStoreUtil.TableInsertEventInfo insertEventInfo =
+        new MetaStoreUtil.TableInsertEventInfo(insertEventReqDatas, true, txnId, writeId);
+    MetastoreShim.fireInsertEvents(catalog_.getMetaStoreClient(), insertEventInfo,
+        msTbl.getDbName(), msTbl.getTableName());
+  }
+
   /**
    * Test generates ALTER_TABLE events for various cases (table rename, parameter change,
    * add/remove/change column) and makes sure that the table is updated on the CatalogD
@@ -1567,7 +1595,7 @@ public class MetastoreEventsProcessorTest {
               tblTransition.first);
         }
         createDatabase(TEST_DB_NAME, dbParams);
-        createTable(null, TEST_DB_NAME, testTblName, tblParams, false);
+        createTable(null, TEST_DB_NAME, testTblName, tblParams, false, null);
         eventsProcessor_.processEvents();
         // table creation is skipped since the flag says so
         assertNull(catalog_.getTable(TEST_DB_NAME, testTblName));
@@ -1695,9 +1723,9 @@ public class MetastoreEventsProcessorTest {
     }
 
     org.apache.hadoop.hive.metastore.api.Table tableBefore =
-        getTestTable(null, dbName, tblName, beforeParams, false);
+        getTestTable(null, dbName, tblName, beforeParams, false, null);
     org.apache.hadoop.hive.metastore.api.Table tableAfter =
-        getTestTable(null, dbName, tblName, afterParams, false);
+        getTestTable(null, dbName, tblName, afterParams, false, null);
 
     Map<String, String> dbParams = new HashMap<>(1);
     if (dbFlag != null) {
@@ -1721,7 +1749,7 @@ public class MetastoreEventsProcessorTest {
     // issue a dummy alter table by adding a param
     afterParams.put("dummy", "value");
     org.apache.hadoop.hive.metastore.api.Table nextTable =
-        getTestTable(null, dbName, tblName, afterParams, false);
+        getTestTable(null, dbName, tblName, afterParams, false, null);
     NotificationEvent nextNotification =
         createFakeAlterTableNotification(dbName, tblName, tableAfter, nextTable);
     alterTableEvent =
@@ -1782,9 +1810,9 @@ public class MetastoreEventsProcessorTest {
     Map<String, String> tblParams = new HashMap<>(1);
     tblParams.put(MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey(), "true");
     // event 2
-    createTable(null, TEST_DB_NAME, "tbl_should_skipped", tblParams, true);
+    createTable(null, TEST_DB_NAME, "tbl_should_skipped", tblParams, true, null);
     // event 3
-    createTable(null, TEST_DB_NAME, testTblName, null, true);
+    createTable(null, TEST_DB_NAME, testTblName, null, true, null);
     List<List<String>> partitionVals = new ArrayList<>();
     partitionVals.add(Arrays.asList("1"));
     partitionVals.add(Arrays.asList("2"));
@@ -1942,7 +1970,7 @@ public class MetastoreEventsProcessorTest {
       Map<String, String> dbParams, Map<String, String> tblParams) throws Exception {
     assertNull(catalog_.getDb(dbName));
     createDatabase(dbName, dbParams);
-    createTable(null, dbName, tblName, tblParams, true);
+    createTable(null, dbName, tblName, tblParams, true, null);
     List<List<String>> partVals = new ArrayList<>(3);
     partVals.add(Arrays.asList("1"));
     partVals.add(Arrays.asList("2"));
@@ -2444,6 +2472,153 @@ public class MetastoreEventsProcessorTest {
     return metastoreEvents;
   }
 
+  @Test
+  public void testCommitEvent() throws TException, ImpalaException, IOException {
+    // Turn on incremental refresh for transactional table
+    final TBackendGflags origCfg = BackendConfig.INSTANCE.getBackendCfg();
+    try {
+      final TBackendGflags stubCfg = origCfg.deepCopy();
+      stubCfg.setHms_event_incremental_refresh_transactional_table(true);
+      BackendConfig.create(stubCfg);
+
+      createDatabase(TEST_DB_NAME, null);
+      testInsertIntoTransactionalTable("testCommitEvent_transactional", false, false);
+      testInsertIntoTransactionalTable("testCommitEvent_transactional_part", false, true);
+    } finally {
+      // Restore original config
+      BackendConfig.create(origCfg);
+    }
+  }
+
+  @Test
+  public void testAbortEvent() throws TException, ImpalaException, IOException {
+    // Turn on incremental refresh for transactional table
+    final TBackendGflags origCfg = BackendConfig.INSTANCE.getBackendCfg();
+    try {
+      final TBackendGflags stubCfg = origCfg.deepCopy();
+      stubCfg.setHms_event_incremental_refresh_transactional_table(true);
+      BackendConfig.create(stubCfg);
+
+      createDatabase(TEST_DB_NAME, null);
+      testInsertIntoTransactionalTable("testAbortEvent_transactional", true, false);
+      testInsertIntoTransactionalTable("testAbortEvent_transactional_part", true, true);
+    } finally {
+      // Restore original config
+      BackendConfig.create(origCfg);
+    }
+  }
+
+  private void testInsertIntoTransactionalTable(String tblName, boolean toAbort
+      , boolean isPartitioned) throws TException, CatalogException,
+      TransactionException, IOException {
+    createTransactionalTable(TEST_DB_NAME, tblName, isPartitioned);
+    if (isPartitioned) {
+      List<List<String>> partVals = new ArrayList<>();
+      partVals.add(Arrays.asList("1"));
+      addPartitions(TEST_DB_NAME, tblName, partVals);
+    }
+    eventsProcessor_.processEvents();
+    loadTable(tblName);
+    Table tbl = catalog_.getTable(TEST_DB_NAME, tblName);
+
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      long txnId = MetastoreShim.openTransaction(client.getHiveClient());
+      long writeId = MetastoreShim.allocateTableWriteId(client.getHiveClient(),
+          txnId, TEST_DB_NAME, tblName);
+      eventsProcessor_.processEvents();
+      ValidWriteIdList writeIdList = tbl.getValidWriteIds();
+      assertFalse(writeIdList.isWriteIdValid(writeId));
+      assertFalse(writeIdList.isWriteIdAborted(writeId));
+      Partition partition = null;
+      if (isPartitioned) {
+        partition = client.getHiveClient().getPartition(
+            TEST_DB_NAME, tblName, Arrays.asList("1"));
+      }
+      simulateInsertIntoTransactionalTableFromFS(
+          tbl.getMetaStoreTable(), partition, 1, txnId, writeId);
+      if (toAbort) {
+        MetastoreShim.abortTransaction(client.getHiveClient(), txnId);
+      } else {
+        MetastoreShim.commitTransaction(client.getHiveClient(), txnId);
+      }
+      eventsProcessor_.processEvents();
+      String partName = isPartitioned ? "p1=1" : "";
+      int numFiles = ((HdfsTable) tbl)
+          .getPartitionsForNames(Collections.singletonList(partName))
+          .get(0)
+          .getNumFileDescriptors();
+      writeIdList = tbl.getValidWriteIds();
+      if (toAbort) {
+        // For non-partitioned tables, we don't keep track of write ids in the Catalog
+        // to reduce memory footprint so the writeIdList won't be up-to-date until next
+        // table reloading.
+        assertEquals(0, numFiles);
+      } else {
+        assertTrue(writeIdList.isWriteIdValid(writeId));
+        assertEquals(1, numFiles);
+      }
+    }
+  }
+
+  @Test
+  public void testAlterPartitionNotReloadFMD() throws Exception {
+    // Turn on incremental refresh for transactional table
+    final TBackendGflags origCfg = BackendConfig.INSTANCE.getBackendCfg();
+    try {
+      final TBackendGflags stubCfg = origCfg.deepCopy();
+      stubCfg.setHms_event_incremental_refresh_transactional_table(true);
+      BackendConfig.create(stubCfg);
+
+      String testTblName = "testAlterPartitionNotReloadFMD";
+      createDatabase(TEST_DB_NAME, null);
+      createTransactionalTable(TEST_DB_NAME, testTblName, true);
+      List<List<String>> partVals = new ArrayList<>();
+      partVals.add(Arrays.asList("1"));
+      addPartitions(TEST_DB_NAME, testTblName, partVals);
+      try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+        long txnId = MetastoreShim.openTransaction(client.getHiveClient());
+        long writeId = MetastoreShim.allocateTableWriteId(client.getHiveClient(),
+            txnId, TEST_DB_NAME, testTblName);
+        Partition partition = client.getHiveClient().getPartition(TEST_DB_NAME,
+            testTblName, Arrays.asList("1"));
+        org.apache.hadoop.hive.metastore.api.Table table =
+            client.getHiveClient().getTable(TEST_DB_NAME, testTblName);
+        simulateInsertIntoTransactionalTableFromFS(table, partition, 1, txnId, writeId);
+        MetastoreShim.commitTransaction(client.getHiveClient(), txnId);
+      }
+      eventsProcessor_.processEvents();
+      loadTable(testTblName);
+      HdfsTable tbl = (HdfsTable) catalog_.getTable(TEST_DB_NAME, testTblName);
+
+      alterPartitionsParamsInTxn(TEST_DB_NAME, testTblName, "testAlterPartition", "true",
+          partVals);
+      assertNull(tbl.getPartitionsForNames(Collections.singletonList("p1=1")).get(0)
+          .getParameters().get("testAlterPartition"));
+      long numLoadFMDBefore =
+          tbl.getMetrics().getCounter(HdfsTable.NUM_LOAD_FILEMETADATA_METRIC).getCount();
+      List<HdfsPartition.FileDescriptor> FDbefore = tbl.getPartitionsForNames(
+          Collections.singletonList("p1=1")).get(0).getFileDescriptors();
+      eventsProcessor_.processEvents();
+      // After event processing, parameters should be updated
+      assertNotNull(tbl.getPartitionsForNames(Collections.singletonList("p1=1")).get(0)
+          .getParameters().get("testAlterPartition"));
+      // However, file metadata should not be reloaded after an alter partition event
+      long numLoadFMDAfter =
+          tbl.getMetrics().getCounter(HdfsTable.NUM_LOAD_FILEMETADATA_METRIC).getCount();
+      List<HdfsPartition.FileDescriptor> FDafter = tbl.getPartitionsForNames(
+          Collections.singletonList("p1=1")).get(0).getFileDescriptors();
+      assertEquals("File metadata should not be reloaded",
+          numLoadFMDBefore, numLoadFMDAfter);
+      // getFileDescriptors() always returns a new instance, so we need to compare the
+      // underlying array
+      assertEquals(Lists.transform(FDbefore, HdfsPartition.FileDescriptor.TO_BYTES),
+          Lists.transform(FDafter, HdfsPartition.FileDescriptor.TO_BYTES));
+    } finally {
+      // Restore original config
+      BackendConfig.create(origCfg);
+    }
+  }
+
   private abstract class AlterTableExecutor {
     protected abstract void execute() throws Exception;
 
@@ -2701,10 +2876,19 @@ public class MetastoreEventsProcessorTest {
     }
   }
 
+  private void createTransactionalTable(
+      String dbName, String tblName, boolean isPartitioned) throws TException {
+    Map<String, String> params = new HashMap<>();
+    params.put("transactional", "true");
+    params.put("transactional_properties", "insert_only");
+    createTable(null, dbName, tblName, params, isPartitioned, "MANAGED_TABLE");
+  }
+
   private void createTable(String catName, String dbName, String tblName,
-      Map<String, String> params, boolean isPartitioned) throws TException {
+      Map<String, String> params, boolean isPartitioned, String tableType)
+      throws TException {
     org.apache.hadoop.hive.metastore.api.Table
-        tbl = getTestTable(catName, dbName, tblName, params, isPartitioned);
+        tbl = getTestTable(catName, dbName, tblName, params, isPartitioned, tableType);
 
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       msClient.getHiveClient().createTable(tbl);
@@ -2712,8 +2896,8 @@ public class MetastoreEventsProcessorTest {
   }
 
   private org.apache.hadoop.hive.metastore.api.Table getTestTable(String catName,
-      String dbName, String tblName, Map<String, String> params, boolean isPartitioned)
-      throws MetaException {
+      String dbName, String tblName, Map<String, String> params, boolean isPartitioned,
+      String tableType) throws MetaException {
     org.apache.hadoop.hive.metastore.api.Table tbl =
         new org.apache.hadoop.hive.metastore.api.Table();
     if (catName != null) tbl.setCatName(catName);
@@ -2742,6 +2926,12 @@ public class MetastoreEventsProcessorTest {
           new FieldSchema("p1","string","partition p1 description"));
       tbl.setPartitionKeys(pcols);
     }
+    if (tableType != null) {
+      Preconditions.checkArgument(tableType.equals(TableType.MANAGED_TABLE.toString())
+              || tableType.equals(TableType.EXTERNAL_TABLE.toString()),
+          "Invalid table type " + tableType);
+      tbl.setTableType(tableType);
+    }
     return tbl;
   }
 
@@ -3166,7 +3356,7 @@ public class MetastoreEventsProcessorTest {
     String insert_mul_part = String.format(
         "insert into table %s partition(p1, p2) select * from %s", tblName1, tblName2);
     TUpdateCatalogRequest testInsertRequest = createTestTUpdateCatalogRequest(
-        TEST_DB_NAME, tblName1, insert_mul_part, updated_partitions, overwrite);
+        TEST_DB_NAME, tblName1, insert_mul_part, updated_partitions, overwrite, -1, -1);
     catalogOpExecutor_.updateCatalog(testInsertRequest);
   }
 
@@ -3178,6 +3368,12 @@ public class MetastoreEventsProcessorTest {
    */
   private void insertFromImpala(String tblName, boolean isPartitioned, String p1val,
       String p2val, boolean isOverwrite, List<String> files) throws ImpalaException {
+    insertFromImpala(tblName, isPartitioned, p1val, p2val, isOverwrite, files, -1, -1);
+  }
+
+  private void insertFromImpala(String tblName, boolean isPartitioned, String p1val,
+      String p2val, boolean isOverwrite, List<String> files, long txnId, long writeId)
+      throws ImpalaException {
     String partition = String.format("partition (%s, %s)", p1val, p2val);
     String test_insert_tbl = String.format("insert into table %s %s values ('a','aa') ",
         tblName, isPartitioned ? partition : "");
@@ -3188,7 +3384,8 @@ public class MetastoreEventsProcessorTest {
         isPartitioned ? String.format("%s/%s", p1val, p2val) : "";
     updated_partitions.put(created_part_str, updatedPartition);
     TUpdateCatalogRequest testInsertRequest = createTestTUpdateCatalogRequest(
-        TEST_DB_NAME, tblName, test_insert_tbl, updated_partitions, isOverwrite);
+        TEST_DB_NAME, tblName, test_insert_tbl, updated_partitions, isOverwrite,
+        txnId, writeId);
     catalogOpExecutor_.updateCatalog(testInsertRequest);
   }
 
@@ -3197,7 +3394,8 @@ public class MetastoreEventsProcessorTest {
    */
   private TUpdateCatalogRequest createTestTUpdateCatalogRequest(String dBName,
       String tableName, String redacted_sql_stmt,
-      Map<String, TUpdatedPartition> updated_partitions, boolean isOverwrite) {
+      Map<String, TUpdatedPartition> updated_partitions, boolean isOverwrite,
+      long txnId, long writeId) {
     TUpdateCatalogRequest tUpdateCatalogRequest = new TUpdateCatalogRequest();
     tUpdateCatalogRequest.setDb_name(dBName);
     tUpdateCatalogRequest.setTarget_table(tableName);
@@ -3205,6 +3403,8 @@ public class MetastoreEventsProcessorTest {
     tUpdateCatalogRequest.setHeader(new TCatalogServiceRequestHeader());
     tUpdateCatalogRequest.getHeader().setRedacted_sql_stmt(redacted_sql_stmt);
     if (isOverwrite) tUpdateCatalogRequest.setIs_overwrite(true);
+    if (txnId > 0) tUpdateCatalogRequest.setTransaction_id(txnId);
+    if (writeId > 0) tUpdateCatalogRequest.setWrite_id(writeId);
     return tUpdateCatalogRequest;
   }
 
@@ -3230,11 +3430,11 @@ public class MetastoreEventsProcessorTest {
 
   private void createTable(String dbName, String tblName, boolean isPartitioned)
       throws TException {
-    createTable(null, dbName, tblName, null, isPartitioned);
+    createTable(null, dbName, tblName, null, isPartitioned, null);
   }
 
   private void createTable(String tblName, boolean isPartitioned) throws TException {
-    createTable(null, TEST_DB_NAME, tblName, null, isPartitioned);
+    createTable(null, TEST_DB_NAME, tblName, null, isPartitioned, null);
   }
 
   private void dropTable(String tableName) throws TException {
@@ -3374,12 +3574,31 @@ public class MetastoreEventsProcessorTest {
     try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
       List<Partition> partitions = new ArrayList<>();
       for (List<String> partVal : partVals) {
-        Partition partition = metaStoreClient.getHiveClient().getPartition(TEST_DB_NAME,
+        Partition partition = metaStoreClient.getHiveClient().getPartition(db,
             tblName, partVal);
         partition.getParameters().put(key, val);
         partitions.add(partition);
       }
-      metaStoreClient.getHiveClient().alter_partitions(TEST_DB_NAME, tblName, partitions);
+      metaStoreClient.getHiveClient().alter_partitions(db, tblName, partitions);
+    }
+  }
+
+  private void alterPartitionsParamsInTxn(String db, String tblName, String key,
+      String val, List<List<String>> partVals) throws Exception {
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      long txnId = MetastoreShim.openTransaction(metaStoreClient.getHiveClient());
+      long writeId = MetastoreShim.allocateTableWriteId(metaStoreClient.getHiveClient(),
+          txnId, db, tblName);
+      List<Partition> partitions = new ArrayList<>();
+      for (List<String> partVal : partVals) {
+        Partition partition = metaStoreClient.getHiveClient().getPartition(db,
+            tblName, partVal);
+        partition.getParameters().put(key, val);
+        partition.setWriteId(writeId);
+        partitions.add(partition);
+      }
+      metaStoreClient.getHiveClient().alter_partitions(db, tblName, partitions);
+      MetastoreShim.commitTransaction(metaStoreClient.getHiveClient(), txnId);
     }
   }
 
diff --git a/fe/src/test/java/org/apache/impala/hive/common/MutableValidReaderWriteIdListTest.java b/fe/src/test/java/org/apache/impala/hive/common/MutableValidReaderWriteIdListTest.java
index a041dec..8781089 100644
--- a/fe/src/test/java/org/apache/impala/hive/common/MutableValidReaderWriteIdListTest.java
+++ b/fe/src/test/java/org/apache/impala/hive/common/MutableValidReaderWriteIdListTest.java
@@ -46,7 +46,7 @@ public class MutableValidReaderWriteIdListTest {
 
   @Test
   public void exceptions() {
-    ValidReaderWriteIdList writeIdList =
+    ValidWriteIdList writeIdList =
         new ValidReaderWriteIdList(tableName, new long[] {2L, 4L}, new BitSet(), 5, 4L);
     MutableValidReaderWriteIdList mutableWriteIdList =
         new MutableValidReaderWriteIdList(writeIdList);
@@ -66,7 +66,7 @@ public class MutableValidReaderWriteIdListTest {
     for (int i = 0; i < 1000; i++) {
       exceptions[i] = i + 100;
     }
-    ValidReaderWriteIdList writeIdList =
+    ValidWriteIdList writeIdList =
         new ValidReaderWriteIdList(tableName, exceptions, new BitSet(), 2000, 900);
     MutableValidReaderWriteIdList mutableWriteIdList =
         new MutableValidReaderWriteIdList(writeIdList);
@@ -118,7 +118,7 @@ public class MutableValidReaderWriteIdListTest {
     BitSet bitSet = new BitSet(exceptions.length);
     bitSet.set(0); // mark writeId "2L" aborted
     bitSet.set(3); // mark writeId "8L" aborted
-    ValidReaderWriteIdList writeIdList =
+    ValidWriteIdList writeIdList =
         new ValidReaderWriteIdList(tableName, exceptions, bitSet, 11, 4);
     MutableValidReaderWriteIdList mutableWriteIdList =
         new MutableValidReaderWriteIdList(writeIdList);
@@ -132,16 +132,17 @@ public class MutableValidReaderWriteIdListTest {
 
   @Test
   public void testAddOpenWriteId() {
-    long[] exceptions = {2, 4, 6, 8, 10};
+    long[] exceptions = {2L, 4L, 6L, 8L, 10L};
     BitSet bitSet = new BitSet(exceptions.length);
     bitSet.set(0); // mark writeId "2L" aborted
     bitSet.set(3); // mark writeId "8L" aborted
-    ValidReaderWriteIdList writeIdList =
+    ValidWriteIdList writeIdList =
         new ValidReaderWriteIdList(tableName, exceptions, bitSet, 11, 4);
     MutableValidReaderWriteIdList mutableWriteIdList =
         new MutableValidReaderWriteIdList(writeIdList);
 
-    mutableWriteIdList.addOpenWriteId(13);
+    assertFalse(mutableWriteIdList.addOpenWriteId(4));
+    assertTrue(mutableWriteIdList.addOpenWriteId(13));
     String str = mutableWriteIdList.writeToString();
     assertEquals(tableName + ":13:4:4,6,10,12,13:2,8", str);
     assertTrue(mutableWriteIdList.isWriteIdValid(11));
@@ -157,14 +158,14 @@ public class MutableValidReaderWriteIdListTest {
     BitSet bitSet = new BitSet(exceptions.length);
     bitSet.set(0); // mark writeId "2L" aborted
     bitSet.set(3); // mark writeId "8L" aborted
-    ValidReaderWriteIdList writeIdList =
+    ValidWriteIdList writeIdList =
         new ValidReaderWriteIdList(tableName, exceptions, bitSet, 11, 4);
     MutableValidReaderWriteIdList mutableWriteIdList =
         new MutableValidReaderWriteIdList(writeIdList);
 
     mutableWriteIdList.addOpenWriteId(13);
-    mutableWriteIdList.addAbortedWriteIds(Collections.singletonList(4L));
-    mutableWriteIdList.addAbortedWriteIds(Collections.singletonList(12L));
+    assertFalse(mutableWriteIdList.addAbortedWriteIds(Collections.singletonList(2L)));
+    assertTrue(mutableWriteIdList.addAbortedWriteIds(Arrays.asList(4L, 12L)));
     String str = mutableWriteIdList.writeToString();
     assertEquals(tableName + ":13:6:6,10,13:2,4,8,12", str);
     assertFalse(mutableWriteIdList.isWriteIdValid(4));
@@ -181,12 +182,13 @@ public class MutableValidReaderWriteIdListTest {
     BitSet bitSet = new BitSet(exceptions.length);
     bitSet.set(0); // mark writeId "2L" aborted
     bitSet.set(3); // mark writeId "8L" aborted
-    ValidReaderWriteIdList writeIdList =
+    ValidWriteIdList writeIdList =
         new ValidReaderWriteIdList(tableName, exceptions, bitSet, 11, 4);
     MutableValidReaderWriteIdList mutableWriteIdList =
         new MutableValidReaderWriteIdList(writeIdList);
 
-    mutableWriteIdList.addCommittedWriteIds(Arrays.asList(4L, 10L));
+    assertFalse(mutableWriteIdList.addCommittedWriteIds(Collections.singletonList(1L)));
+    assertTrue(mutableWriteIdList.addCommittedWriteIds(Arrays.asList(4L, 10L)));
     String str = mutableWriteIdList.writeToString();
     assertEquals(tableName + ":11:6:6:2,8", str);
     assertTrue(mutableWriteIdList.isWriteIdAborted(2));
@@ -202,10 +204,50 @@ public class MutableValidReaderWriteIdListTest {
     BitSet bitSet = new BitSet(exceptions.length);
     bitSet.set(0); // mark writeId "2L" aborted
     bitSet.set(3); // mark writeId "8L" aborted
-    ValidReaderWriteIdList writeIdList =
+    ValidWriteIdList writeIdList =
         new ValidReaderWriteIdList(tableName, exceptions, bitSet, 11, 4);
     MutableValidReaderWriteIdList mutableWriteIdList =
         new MutableValidReaderWriteIdList(writeIdList);
     mutableWriteIdList.addCommittedWriteIds(Collections.singletonList(2L));
   }
+
+  @Test
+  public void testAddNotOpenToCommitted() {
+    long[] exceptions = {2L, 4L, 6L, 8L, 10L};
+    BitSet bitSet = new BitSet(exceptions.length);
+    bitSet.set(0); // mark writeId "2L" aborted
+    bitSet.set(3); // mark writeId "8L" aborted
+    ValidWriteIdList writeIdList =
+        new ValidReaderWriteIdList(tableName, exceptions, bitSet, 11, 4);
+    MutableValidReaderWriteIdList mutableWriteIdList =
+        new MutableValidReaderWriteIdList(writeIdList);
+
+    // write id "13L" is not open before, it should mark "12L" & "13L" open and then
+    // mark "13L" committed
+    assertTrue(mutableWriteIdList.addCommittedWriteIds(Collections.singletonList(13L)));
+    String str = mutableWriteIdList.writeToString();
+    assertEquals(tableName + ":13:4:4,6,10,12:2,8", str);
+    assertTrue(mutableWriteIdList.isWriteIdOpen(12));
+    assertTrue(mutableWriteIdList.isWriteIdValid(13));
+  }
+
+  @Test
+  public void testAddNotOpenToAborted() {
+    long[] exceptions = {2L, 4L, 6L, 8L, 10L};
+    BitSet bitSet = new BitSet(exceptions.length);
+    bitSet.set(0); // mark writeId "2L" aborted
+    bitSet.set(3); // mark writeId "8L" aborted
+    ValidWriteIdList writeIdList =
+        new ValidReaderWriteIdList(tableName, exceptions, bitSet, 11, 4);
+    MutableValidReaderWriteIdList mutableWriteIdList =
+        new MutableValidReaderWriteIdList(writeIdList);
+
+    // write id "13L" is not open before, it should mark "12L" & "13L" open and then
+    // mark "13L" committed
+    assertTrue(mutableWriteIdList.addAbortedWriteIds(Collections.singletonList(13L)));
+    String str = mutableWriteIdList.writeToString();
+    assertEquals(tableName + ":13:4:4,6,10,12:2,8,13", str);
+    assertTrue(mutableWriteIdList.isWriteIdOpen(12));
+    assertTrue(mutableWriteIdList.isWriteIdAborted(13));
+  }
 }
\ No newline at end of file