You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2019/08/01 20:16:18 UTC

[impala] 03/05: IMPALA-8600: Refresh transactional tables

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2d819655118c8c6e82649e3c3821311f3dd01174
Author: Gabor Kaszab <ga...@cloudera.com>
AuthorDate: Sat Jul 27 12:26:20 2019 +0200

    IMPALA-8600: Refresh transactional tables
    
    Refreshing a subset of partitions in a transactional table might lead
    us to an inconsistent state of that transactional table. As a fix
    user initiated partition refreshes are no longer allowed on ACID
    tables. Additionally, a refresh partition Metastore event actually
    triggers a refresh on the whole ACID table.
    
    An optimisation is implemented to check the locally latest table
    level writeId, fetch the same from HMS and do a refresh only if they
    don't match.
    This couldn't be done for partitioned tables as apparently Hive
    doesn't update the table level writeId if the transactional table is
    partitioned. Similarly, checking the writeId for each partition and
    refresh only the ones where the writeId is not up to date is not
    feasible either as there is no writeId update when Hive makes schema
    changes like adding a column neither on table level or on partition
    level. So after a adding a column in Hive to a partitioned ACID table
    and refreshing that table in Impala, still Impala wouldn't see the
    new column. Hence, I unconditionally refresh the whole table if it's
    ACID and partitioned. Note, that for non-partitioned ACID tables Hive
    updates the table level writeId even for schema changes.
    
    Change-Id: I1851da22452074dbe253bcdd97145e06c7552cd3
    Reviewed-on: http://gerrit.cloudera.org:8080/13938
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../apache/impala/analysis/ResetMetadataStmt.java  |  14 ++
 .../java/org/apache/impala/catalog/HdfsTable.java  |   7 +
 .../impala/catalog/events/MetastoreEvents.java     | 185 ++++++++++++---------
 .../apache/impala/service/CatalogOpExecutor.java   |  52 +++++-
 .../java/org/apache/impala/util/AcidUtils.java     |   2 +-
 .../org/apache/impala/analysis/AnalyzerTest.java   |   9 +
 .../functional-query/queries/QueryTest/acid.test   |   8 +
 tests/custom_cluster/test_event_processing.py      |  28 +++-
 8 files changed, 220 insertions(+), 85 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
index ac4e69e..4200dd8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
@@ -22,12 +22,15 @@ import java.util.List;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.authorization.User;
+import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TCatalogServiceRequestHeader;
 import org.apache.impala.thrift.TResetMetadataRequest;
 import org.apache.impala.thrift.TTableName;
+import org.apache.impala.util.AcidUtils;
 
 import com.google.common.base.Preconditions;
 
@@ -150,6 +153,17 @@ public class ResetMetadataStmt extends StatementBase {
                 tableName_);
           }
           if (partitionSpec_ != null) {
+            try {
+              // Get local table info without reaching out to HMS
+              FeTable table = analyzer.getTable(dbName, tableName_.getTbl());
+              if (AcidUtils.isTransactionalTable(
+                      table.getMetaStoreTable().getParameters())) {
+                throw new AnalysisException("Refreshing a partition is not allowed on " +
+                    "transactional tables. Try to refresh the whole table instead.");
+              }
+            } catch (TableLoadingException e) {
+              throw new AnalysisException(e);
+            }
             partitionSpec_.setPrivilegeRequirement(Privilege.ANY);
             partitionSpec_.analyze(analyzer);
           }
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 2bb579f..8f440f2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1897,4 +1897,11 @@ public class HdfsTable extends Table implements FeFsTable {
     tmpTable.setTableStats(msTbl);
     return tmpTable;
   }
+
+  /**
+   * Returns true if the table is partitioned, false otherwise.
+   */
+  public boolean isPartitioned() {
+    return getMetaStoreTable().getPartitionKeysSize() > 0;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index d626b52..315a38c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -55,6 +55,7 @@ import org.apache.impala.common.Pair;
 import org.apache.impala.common.Reference;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TTableName;
+import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.ClassUtil;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
@@ -602,6 +603,21 @@ public class MetastoreEvents {
       String partString = FileUtils.makePartName(partitionCols, partitionVals);
       return partString;
     }
+
+    /*
+     * Helper function to initiate a table reload on Catalog. Re-throws the exception if
+     * the catalog operation throws.
+     */
+    protected void reloadTableFromCatalog(String operation) throws CatalogException {
+      if (!catalog_.reloadTableIfExists(dbName_, tblName_,
+              "Processing " + operation + " event from HMS")) {
+        debugLog("Automatic refresh on table {} failed as the table is not "
+            + "present either in catalog or metastore.", getFullyQualifiedTblName());
+      } else {
+        infoLog("Table {} has been refreshed after " + operation +".",
+            getFullyQualifiedTblName());
+      }
+    }
   }
 
   /**
@@ -729,7 +745,7 @@ public class MetastoreEvents {
   public static class InsertEvent extends MetastoreTableEvent {
 
     // Represents the partition for this insert. Null if the table is unpartitioned.
-    private final org.apache.hadoop.hive.metastore.api.Partition insertPartition_;
+    private Partition insertPartition_;
 
     /**
      * Prevent instantiation from outside should use MetastoreEventFactory instead
@@ -760,9 +776,14 @@ public class MetastoreEvents {
      */
     @Override
     public void process() throws MetastoreNotificationException {
-      if (insertPartition_ != null)
+      // Reload the whole table if it's a transactional table.
+      if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
+        insertPartition_ = null;
+      }
+
+      if (insertPartition_ != null) {
         processPartitionInserts();
-      else {
+      } else {
         processTableInserts();
       }
     }
@@ -812,14 +833,7 @@ public class MetastoreEvents {
       try {
         // Ignore event if table or database is not in the catalog. Throw exception if
         // refresh fails.
-        if (!catalog_.reloadTableIfExists(dbName_, tblName_,
-              "processing table-level INSERT event from HMS")) {
-          debugLog("Automatic refresh table {} failed as the table is not "
-              + "present either catalog or metastore.", getFullyQualifiedTblName());
-        } else {
-          infoLog("Table {} has been refreshed after insert.",
-              getFullyQualifiedTblName());
-        }
+        reloadTableFromCatalog("table-level INSERT");
       } catch (DatabaseNotFoundException e) {
         debugLog("Automatic refresh of table {} insert failed as the "
             + "database is not present in the catalog.", getFullyQualifiedTblName());
@@ -1357,28 +1371,33 @@ public class MetastoreEvents {
       // Notification is created for newly created partitions only. We need not worry
       // about "IF NOT EXISTS".
       try {
-        boolean success = true;
-        // HMS adds partitions in a transactional way. This means there may be multiple
-        // HMS partition objects in an add_partition event. We try to do the same here by
-        // refreshing all those partitions in a loop. If any partition refresh fails, we
-        // throw MetastoreNotificationNeedsInvalidateException exception. We skip
-        // refresh of the partitions if the table is not present in the catalog.
-        infoLog("Trying to refresh {} partitions added to table {} in the event",
-            addedPartitions_.size(), getFullyQualifiedTblName());
-        for (Partition partition : addedPartitions_) {
-          List<TPartitionKeyValue> tPartSpec =
-              getTPartitionSpecFromHmsPartition(msTbl_, partition);
-          if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
-              "processing ADD_PARTITION event from HMS")) {
-            debugLog("Refresh partitions on table {} failed "
-                + "as table was not present in the catalog.", getFullyQualifiedTblName());
-            success = false;
-            break;
+        // Reload the whole table if it's a transactional table.
+        if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
+          reloadTableFromCatalog("ADD_PARTITION");
+        } else {
+          boolean success = true;
+          // HMS adds partitions in a transactional way. This means there may be multiple
+          // HMS partition objects in an add_partition event. We try to do the same here
+          // by refreshing all those partitions in a loop. If any partition refresh fails,
+          // we throw MetastoreNotificationNeedsInvalidateException exception. We skip
+          // refresh of the partitions if the table is not present in the catalog.
+          infoLog("Trying to refresh {} partitions added to table {} in the event",
+              addedPartitions_.size(), getFullyQualifiedTblName());
+          for (Partition partition : addedPartitions_) {
+            List<TPartitionKeyValue> tPartSpec =
+                getTPartitionSpecFromHmsPartition(msTbl_, partition);
+            if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
+                "processing ADD_PARTITION event from HMS")) {
+              debugLog("Refresh partitions on table {} failed as table was not present " +
+                  "in the catalog.", getFullyQualifiedTblName());
+              success = false;
+              break;
+            }
+          }
+          if (success) {
+            infoLog("Refreshed {} partitions of table {}", addedPartitions_.size(),
+                getFullyQualifiedTblName());
           }
-        }
-        if (success) {
-          infoLog("Refreshed {} partitions of table {}", addedPartitions_.size(),
-              getFullyQualifiedTblName());
         }
       } catch (DatabaseNotFoundException e) {
         debugLog("Refresh partitions on table {} after add_partitions event failed as "
@@ -1448,34 +1467,39 @@ public class MetastoreEvents {
         infoLog("Not processing the event as it is a self-event");
         return;
       }
-      // Refresh the partition that was altered.
-      Preconditions.checkNotNull(partitionAfter_);
-      List<TPartitionKeyValue> tPartSpec = getTPartitionSpecFromHmsPartition(msTbl_,
-          partitionAfter_);
-      try {
-        // Ignore event if table or database is not in catalog. Throw exception if
-        // refresh fails.
-
-        if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
-            "processing ALTER_PARTITION event from HMS")) {
-          debugLog("Refresh of table {} partition {} failed as the table "
-                  + "is not present in the catalog.", getFullyQualifiedTblName(),
-              constructPartitionStringFromTPartitionSpec(tPartSpec));
-        } else {
-          infoLog("Table {} partition {} has been refreshed", getFullyQualifiedTblName(),
+      // Reload the whole table if it's a transactional table.
+      if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
+        reloadTableFromCatalog("ALTER_PARTITION");
+      } else {
+        // Refresh the partition that was altered.
+        Preconditions.checkNotNull(partitionAfter_);
+        List<TPartitionKeyValue> tPartSpec = getTPartitionSpecFromHmsPartition(msTbl_,
+            partitionAfter_);
+        try {
+          // Ignore event if table or database is not in catalog. Throw exception if
+          // refresh fails.
+          if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
+              "processing ALTER_PARTITION event from HMS")) {
+            debugLog("Refresh of table {} partition {} failed as the table "
+                    + "is not present in the catalog.", getFullyQualifiedTblName(),
+                constructPartitionStringFromTPartitionSpec(tPartSpec));
+          } else {
+            infoLog("Table {} partition {} has been refreshed",
+                getFullyQualifiedTblName(),
+                constructPartitionStringFromTPartitionSpec(tPartSpec));
+          }
+        } catch (DatabaseNotFoundException e) {
+          debugLog("Refresh of table {} partition {} "
+                  + "event failed as the database is not present in the catalog.",
+              getFullyQualifiedTblName(),
               constructPartitionStringFromTPartitionSpec(tPartSpec));
+        } catch (CatalogException e) {
+          throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
+                  + "partition on table {} partition {} failed. Event processing cannot "
+                  + "continue. Issue and invalidate command to reset the event processor "
+                  + "state.", getFullyQualifiedTblName(),
+              constructPartitionStringFromTPartitionSpec(tPartSpec)), e);
         }
-      } catch (DatabaseNotFoundException e) {
-        debugLog("Refresh of table {} partition {} "
-                + "event failed as the database is not present in the catalog.",
-            getFullyQualifiedTblName(),
-            constructPartitionStringFromTPartitionSpec(tPartSpec));
-      } catch (CatalogException e) {
-        throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
-                + "partition on table {} partition {} failed. Event processing cannot "
-                + "continue. Issue and invalidate command to reset the event processor "
-                + "state.", getFullyQualifiedTblName(),
-            constructPartitionStringFromTPartitionSpec(tPartSpec)), e);
       }
     }
 
@@ -1532,29 +1556,34 @@ public class MetastoreEvents {
       // We do not need self event as dropPartition() call is a no-op if the directory
       // doesn't exist.
       try {
-        boolean success = true;
-        // We refresh all the partitions that were dropped from HMS. If a refresh
-        // fails, we throw a MetastoreNotificationNeedsInvalidateException
-        infoLog("{} partitions dropped from table {}. Trying "
-            + "to refresh.", droppedPartitions_.size(), getFullyQualifiedTblName());
-        for (Map<String, String> partSpec : droppedPartitions_) {
-          List<TPartitionKeyValue> tPartSpec = new ArrayList<>(partSpec.size());
-          for (Map.Entry<String, String> entry : partSpec.entrySet()) {
-            tPartSpec.add(new TPartitionKeyValue(entry.getKey(), entry.getValue()));
+        // Reload the whole table if it's a transactional table.
+        if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
+          reloadTableFromCatalog("DROP_PARTITION");
+        } else {
+          boolean success = true;
+          // We refresh all the partitions that were dropped from HMS. If a refresh
+          // fails, we throw a MetastoreNotificationNeedsInvalidateException
+          infoLog("{} partitions dropped from table {}. Trying "
+              + "to refresh.", droppedPartitions_.size(), getFullyQualifiedTblName());
+          for (Map<String, String> partSpec : droppedPartitions_) {
+            List<TPartitionKeyValue> tPartSpec = new ArrayList<>(partSpec.size());
+            for (Map.Entry<String, String> entry : partSpec.entrySet()) {
+              tPartSpec.add(new TPartitionKeyValue(entry.getKey(), entry.getValue()));
+            }
+            if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
+                "processing DROP_PARTITION event from HMS")) {
+              debugLog("Could not refresh partition {} of table {} as table "
+                      + "was not present in the catalog.",
+                      getFullyQualifiedTblName());
+              success = false;
+              break;
+            }
           }
-          if (!catalog_.reloadPartitionIfExists(dbName_, tblName_, tPartSpec,
-              "processing DROP_PARTITION event from HMS")) {
-            debugLog("Could not refresh partition {} of table {} as table "
-                    + "was not present in the catalog.",
-                    getFullyQualifiedTblName());
-            success = false;
-            break;
+          if (success) {
+            infoLog("Refreshed {} partitions of table {}", droppedPartitions_.size(),
+                getFullyQualifiedTblName());
           }
         }
-        if (success) {
-          infoLog("Refreshed {} partitions of table {}", droppedPartitions_.size(),
-              getFullyQualifiedTblName());
-        }
       } catch (DatabaseNotFoundException e) {
         debugLog("Could not refresh partitions of table {}"
             + "as database was not present in the catalog.", getFullyQualifiedTblName());
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 1d1fc15..0177c8d 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -83,6 +83,7 @@ import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.PartitionNotFoundException;
 import org.apache.impala.catalog.PartitionStatsUtil;
+import org.apache.impala.catalog.PrunablePartition;
 import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.catalog.ScalarFunction;
 import org.apache.impala.catalog.Table;
@@ -91,7 +92,6 @@ import org.apache.impala.catalog.TableNotFoundException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.View;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
-import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
@@ -3524,6 +3524,22 @@ public class CatalogOpExecutor {
     return msTbl;
   }
 
+  /**
+   * Returns the metastore.api.Table object from the Hive Metastore for an existing
+   * fully loaded table. Gets the MetaStore object from 'catalog_'.
+   */
+  private org.apache.hadoop.hive.metastore.api.Table getTableFromMetaStore(
+      TableName tblName) throws CatalogException {
+    Preconditions.checkNotNull(tblName);
+    org.apache.hadoop.hive.metastore.api.Table msTbl = null;
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      msTbl = msClient.getHiveClient().getTable(tblName.getDb(),tblName.getTbl());
+    } catch (TException e) {
+      LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR, "getTable") + e.getMessage());
+    }
+    return msTbl;
+  }
+
   private static List<FieldSchema> buildFieldSchemaList(List<TColumn> columns) {
     List<FieldSchema> fsList = Lists.newArrayList();
     // Add in all the columns
@@ -3591,15 +3607,45 @@ public class CatalogOpExecutor {
         if (tbl != null) {
           // If the table is not loaded, no need to perform refresh after the initial
           // metadata load.
-          boolean needsRefresh = tbl.isLoaded();
+          boolean isTableLoadedInCatalog = tbl.isLoaded();
           tbl = getExistingTable(tblName.getDb(), tblName.getTbl(),
               "Load triggered by " + cmdString);
           if (tbl != null) {
-            if (needsRefresh) {
+            if (isTableLoadedInCatalog) {
+              boolean isTransactional = AcidUtils.isTransactionalTable(
+                  tbl.getMetaStoreTable().getParameters());
               if (req.isSetPartition_spec()) {
+                Preconditions.checkArgument(!isTransactional);
                 updatedThriftTable = catalog_.reloadPartition(tbl,
                     req.getPartition_spec(), cmdString);
               } else {
+                if (isTransactional) {
+                  org.apache.hadoop.hive.metastore.api.Table hmsTbl =
+                      getTableFromMetaStore(tblName);
+                  if (hmsTbl == null) {
+                      throw new TableNotFoundException("Table not found: " +
+                          tblName.toString());
+                  }
+                  HdfsTable hdfsTable = (HdfsTable)tbl;
+                  if (!hdfsTable.isPartitioned() &&
+                      MetastoreShim.getWriteIdFromMSTable(tbl.getMetaStoreTable()) ==
+                      MetastoreShim.getWriteIdFromMSTable(hmsTbl)) {
+                    // No need to refresh the table if the local writeId equals to the
+                    // latest writeId from HMS and the table is not partitioned.
+                    LOG.debug("Skip reloading table " + tblName.toString() +
+                        " because it has the latest writeId locally");
+                    resp.getResult().setStatus(new TStatus(TErrorCode.OK,
+                        new ArrayList<String>()));
+                    return resp;
+                  }
+                  // TODO IMPALA-8809: Optimisation for partitioned tables:
+                  //   1: Reload the whole table if schema change happened. Identify
+                  //     such scenario by checking Table.TBL_PROP_LAST_DDL_TIME property.
+                  //     Note, table level writeId is not updated by HMS for partitioned
+                  //     ACID tables, there is a Jira to cover this: HIVE-22062.
+                  //   2: If no need for a full table reload then fetch partition level
+                  //     writeIds and reload only the ones that changed.
+                }
                 updatedThriftTable = catalog_.reloadTable(tbl, cmdString);
               }
             } else {
diff --git a/fe/src/main/java/org/apache/impala/util/AcidUtils.java b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
index 4f4ee7a..a27888d 100644
--- a/fe/src/main/java/org/apache/impala/util/AcidUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
@@ -43,7 +43,7 @@ import javax.annotation.Nullable;
  * Contains utility functions for working with Acid tables.
  * <p>
  * The code is mostly copy pasted from Hive. Ideally we should use the
- * the code directly from Hive.
+ * code directly from Hive.
  * </p>
  */
 public class AcidUtils {
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
index ebca3cf..909708c 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
@@ -611,6 +611,15 @@ public class AnalyzerTest extends FrontendTestBase {
 
     AnalyzesOk("show column stats functional_orc_def.full_transactional_table");
     AnalyzesOk("show column stats functional.insert_only_transactional_table");
+
+    AnalyzesOk("refresh functional.insert_only_transactional_table");
+    AnalyzesOk("refresh functional_orc_def.full_transactional_table");
+    AnalysisError("refresh functional.insert_only_transactional_table partition (j=1)",
+        "Refresh a partition is not allowed on transactional tables. Try to refresh " +
+        "the whole table instead.");
+    AnalysisError("refresh functional_orc_def.full_transactional_table partition (j=1)",
+        "Refresh a partition is not allowed on transactional tables. Try to refresh " +
+        "the whole table instead.");
   }
 
   @Test
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid.test b/testdata/workloads/functional-query/queries/QueryTest/acid.test
index 3612e18..dac75f4 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/acid.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid.test
@@ -24,6 +24,14 @@ select * from tt order by x;
 1
 2
 ====
+---- QUERY
+# Do a second refresh on an already refreshed ACID table.
+refresh tt;
+select * from tt order by x;
+---- RESULTS
+1
+2
+====
 ---- HIVE_QUERY
 use $DATABASE;
 insert overwrite table tt values (3);
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index 4397e25..2aa22d4 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -21,6 +21,8 @@ import time
 import requests
 
 from tests.common.environ import build_flavor_timeout
+from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, \
+    SkipIfLocal, SkipIfHive2
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
 from tests.util.hive_utils import HiveDbWrapper
@@ -39,7 +41,20 @@ class TestEventProcessing(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=2")
+  @SkipIfHive2.acid
+  def test_insert_events_transactional(self):
+    """Executes 'run_test_insert_events' for transactional tables.
+    """
+    self.run_test_insert_events(is_transactional=True)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=2")
   def test_insert_events(self):
+    """Executes 'run_test_insert_events' for non-transactional tables.
+    """
+    self.run_test_insert_events()
+
+  def run_test_insert_events(self, is_transactional=False):
     """Test for insert event processing. Events are created in Hive and processed in
     Impala. The following cases are tested :
     Insert into table --> for partitioned and non-partitioned table
@@ -50,9 +65,14 @@ class TestEventProcessing(CustomClusterTestSuite):
     with HiveDbWrapper(self, db_name):
      # Test table with no partitions.
      TBL_INSERT_NOPART = 'tbl_insert_nopart'
+     self.run_stmt_in_hive("drop table if exists %s.%s" % (db_name, TBL_INSERT_NOPART))
      last_synced_event_id = self.get_last_synced_event_id()
-     self.run_stmt_in_hive("create table %s.%s (id int, val int)"
-         % (db_name, TBL_INSERT_NOPART))
+     TBLPROPERTIES = ""
+     if is_transactional:
+       TBLPROPERTIES = "TBLPROPERTIES ('transactional'='true'," \
+           "'transactional_properties'='insert_only')"
+     self.run_stmt_in_hive("create table %s.%s (id int, val int) %s"
+         % (db_name, TBL_INSERT_NOPART, TBLPROPERTIES))
      # Test insert into table, this will fire an insert event.
      self.run_stmt_in_hive("insert into %s.%s values(101, 200)"
          % (db_name, TBL_INSERT_NOPART))
@@ -76,8 +96,10 @@ class TestEventProcessing(CustomClusterTestSuite):
      # Test partitioned table.
      last_synced_event_id = self.get_last_synced_event_id()
      TBL_INSERT_PART = 'tbl_insert_part'
+     self.run_stmt_in_hive("drop table if exists %s.%s" % (db_name, TBL_INSERT_PART))
      self.run_stmt_in_hive("create table %s.%s (id int, name string) "
-         "partitioned by(day int, month int, year int)" % (db_name, TBL_INSERT_PART))
+         "partitioned by(day int, month int, year int) %s"
+         % (db_name, TBL_INSERT_PART, TBLPROPERTIES))
      # Insert data into partitions.
      self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)"
          "values(101, 'x')" % (db_name, TBL_INSERT_PART))