You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/10/22 06:01:39 UTC

[impala] 01/02: IMPALA-10219: Expose DEBUG_ACTION query option in catalog

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 15c3b13e9730479e096275d974000ae9fe8fbb83
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Sat Oct 3 17:01:35 2020 -0700

    IMPALA-10219: Expose DEBUG_ACTION query option in catalog
    
    This patches enables DEBUG_ACTION in the catalog service's
    java code. Specifically, DEBUG_ACTION query option is now
    exposed to TResetMetadataRequest and TExecDdlRequest
    so that we can inject delays while executing refresh
    or ddl statements.
    
    For example,
    1. To inject a delay of 100ms per HDFS list operation
    during refresh statement set the following query option:
    
    set debug_action=catalogd_refresh_hdfs_listing_delay:SLEEP@100;
    
    2. To inject a delay of 100ms in alter table recover
    partitions statement:
    
    set debug_action=catalogd_table_recover_delay:SLEEP@100;
    
    3. To inject a delay of 100ms in compute stats statement
    
    set debug_action=catalogd_update_stats_delay:SLEEP@100;
    
    Note that this option only adds the delay during the
    update_stats phase of the compute stats execution.
    
    Testing:
    1. Added a test which sets the query option and makes
    sure that command takes more time than without query option.
    2. Added unit tests for the debugAction implementation
    logic.
    
    Change-Id: Ia7196b1ce76415a5faf3fa8575a26d22b2bf50b1
    Reviewed-on: http://gerrit.cloudera.org:8080/16548
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/catalog-op-executor.cc                 |   1 +
 be/src/util/debug-util.cc                          |   4 +
 common/thrift/CatalogService.thrift                |   6 +
 .../impala/catalog/CatalogServiceCatalog.java      |  12 +-
 .../apache/impala/catalog/FileMetadataLoader.java  |  21 +++-
 .../java/org/apache/impala/catalog/HdfsTable.java  |  74 +++++++----
 .../org/apache/impala/catalog/IcebergTable.java    |   2 +-
 .../impala/catalog/ParallelFileMetadataLoader.java |  10 +-
 .../org/apache/impala/common/FileSystemUtil.java   |  13 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  26 ++--
 .../java/org/apache/impala/service/Frontend.java   |  10 ++
 .../java/org/apache/impala/util/DebugUtils.java    | 137 +++++++++++++++++++++
 .../events/MetastoreEventsProcessorTest.java       |   2 +-
 .../org/apache/impala/util/DebugUtilsTest.java     |  67 ++++++++++
 tests/common/impala_test_suite.py                  |  11 ++
 tests/metadata/test_catalogd_debug_actions.py      |  50 ++++++++
 16 files changed, 389 insertions(+), 57 deletions(-)

diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index f22dfad..0ffd708 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -132,6 +132,7 @@ Status CatalogOpExecutor::ExecComputeStats(
   TDdlExecRequest& update_stats_req = catalog_op_req.ddl_params;
   update_stats_req.__set_ddl_type(TDdlType::ALTER_TABLE);
   update_stats_req.__set_sync_ddl(compute_stats_request.sync_ddl);
+  update_stats_req.__set_debug_action(compute_stats_request.ddl_params.debug_action);
 
   const TComputeStatsParams& compute_stats_params =
       compute_stats_request.ddl_params.compute_stats_params;
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index e176852..af10ddd 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -352,6 +352,10 @@ static bool ParseProbability(const string& prob_str, bool* should_execute) {
   return true;
 }
 
+/// The catalog java code also implements a equivalent method for processing the debug
+/// actions in the Java code. See DebugUtils.java for more details. Any changes to the
+/// implementation logic here like adding a new type of action, should make changes in
+/// the DebugUtils.java too.
 Status DebugActionImpl(
     const string& debug_action, const char* label, const std::vector<string>& args) {
   const DebugActionTokens& action_list = TokenizeDebugActions(debug_action);
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 5acdb7f..ad316b9 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -156,6 +156,9 @@ struct TDdlExecRequest {
 
   // Parameters for replaying an exported testcase.
   25: optional JniCatalog.TCopyTestCaseReq copy_test_case_params
+
+  // Passes the debug actions to catalogd if the query option is set.
+  26: optional string debug_action
 }
 
 // Response from executing a TDdlExecRequest
@@ -256,6 +259,9 @@ struct TResetMetadataRequest {
   // If set, refreshes partition objects which are modified externally.
   // Applicable only when refreshing the table.
   9: optional bool refresh_updated_hms_partitions
+
+  // debug_action is set from the query_option when available.
+  10: optional string debug_action
 }
 
 // Response from TResetMetadataRequest
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 5c6bf1a..38aed8f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -95,6 +95,7 @@ import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPartitionStats;
 import org.apache.impala.thrift.TPrincipalType;
 import org.apache.impala.thrift.TPrivilege;
+import org.apache.impala.thrift.TResetMetadataRequest;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TTableType;
@@ -2275,7 +2276,7 @@ public class CatalogServiceCatalog extends Catalog {
    * {@code refreshUpdatedPartitions} argument.
    */
   public TCatalogObject reloadTable(Table tbl, String reason) throws CatalogException {
-    return reloadTable(tbl, false, reason);
+    return reloadTable(tbl, new TResetMetadataRequest(), reason);
   }
 
   /**
@@ -2287,8 +2288,8 @@ public class CatalogServiceCatalog extends Catalog {
    * If {@code refreshUpdatedParts} is true, the refresh logic detects updated
    * partitions in metastore and reloads them too.
    */
-  public TCatalogObject reloadTable(Table tbl, boolean refreshUpdatedParts, String reason)
-      throws CatalogException {
+  public TCatalogObject reloadTable(Table tbl, TResetMetadataRequest request,
+      String reason) throws CatalogException {
     LOG.info(String.format("Refreshing table metadata: %s", tbl.getFullName()));
     Preconditions.checkState(!(tbl instanceof IncompleteTable));
     String dbName = tbl.getDb().getName();
@@ -2312,7 +2313,8 @@ public class CatalogServiceCatalog extends Catalog {
         }
         if (tbl instanceof HdfsTable) {
           ((HdfsTable) tbl)
-              .load(true, msClient.getHiveClient(), msTbl, refreshUpdatedParts, reason);
+              .load(true, msClient.getHiveClient(), msTbl,
+                  request.refresh_updated_hms_partitions, request.debug_action, reason);
         } else {
           tbl.load(true, msClient.getHiveClient(), msTbl, reason);
         }
@@ -3305,7 +3307,7 @@ public class CatalogServiceCatalog extends Catalog {
           .map(HdfsPartition.Builder::new)
           .collect(Collectors.toList());
       new ParallelFileMetadataLoader(
-          table, partBuilders, reqWriteIdList, validTxnList, logPrefix).load();
+          table, partBuilders, reqWriteIdList, validTxnList, null, logPrefix).load();
       for (HdfsPartition.Builder builder : partBuilders) {
         // Let's retrieve the original partition instance from builder because this is
         // stored in the keys of 'partToPartialInfoMap'.
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index 2daca11..c1cba7f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Reference;
@@ -73,6 +72,7 @@ public class FileMetadataLoader {
   private List<FileDescriptor> loadedInsertDeltaFds_;
   private List<FileDescriptor> loadedDeleteDeltaFds_;
   private LoadStats loadStats_;
+  private String debugAction_;
 
   /**
    * @param partDir the dir for which to fetch file metadata
@@ -177,16 +177,17 @@ public class FileMetadataLoader {
         (oldFdsByRelPath_.isEmpty() || forceRefreshLocations);
 
     String msg = String.format("%s file metadata%s from path %s",
-          oldFdsByRelPath_.isEmpty() ? "Loading" : "Refreshing",
-          listWithLocations ? " with eager location-fetching" : "",
-          partDir_);
+        oldFdsByRelPath_.isEmpty() ? "Loading" : "Refreshing",
+        listWithLocations ? " with eager location-fetching" : "", partDir_);
     LOG.trace(msg);
     try (ThreadNameAnnotator tna = new ThreadNameAnnotator(msg)) {
       RemoteIterator<? extends FileStatus> fileStatuses;
       if (listWithLocations) {
-        fileStatuses = FileSystemUtil.listFiles(fs, partDir_, recursive_);
+        fileStatuses = FileSystemUtil
+            .listFiles(fs, partDir_, recursive_, debugAction_);
       } else {
-        fileStatuses = FileSystemUtil.listStatus(fs, partDir_, recursive_);
+        fileStatuses = FileSystemUtil
+            .listStatus(fs, partDir_, recursive_, debugAction_);
 
         // TODO(todd): we could look at the result of listing without locations, and if
         // we see that a substantial number of the files have changed, it may be better
@@ -280,6 +281,14 @@ public class FileMetadataLoader {
       (fd.getModificationTime() != status.getModificationTime());
   }
 
+  /**
+   * Enables injection of a debug actions to introduce delays in HDFS listStatus or
+   * listFiles call during the file-metadata loading.
+   */
+  public void setDebugAction(String debugAction) {
+    this.debugAction_ = debugAction;
+  }
+
   // File/Block metadata loading stats for a single HDFS path.
   public static class LoadStats {
     private final Path partDir_;
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 361de4e..75b68df 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -34,6 +34,7 @@ import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import javax.annotation.Nullable;
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -87,6 +88,7 @@ import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.AvroSchemaConverter;
 import org.apache.impala.util.AvroSchemaUtils;
+import org.apache.impala.util.DebugUtils;
 import org.apache.impala.util.FsPermissionCache;
 import org.apache.impala.util.FsPermissionChecker;
 import org.apache.impala.util.HdfsCachingUtil;
@@ -684,6 +686,17 @@ public class HdfsTable extends Table implements FeFsTable {
   }
 
   /**
+   * Similar to
+   * {@link #loadFileMetadataForPartitions(IMetaStoreClient, Collection, boolean)}
+   * but without any injecting the debug actions.
+   */
+  private long loadFileMetadataForPartitions(IMetaStoreClient client,
+      Collection<HdfsPartition.Builder> partBuilders, boolean isRefresh)
+      throws CatalogException {
+    return loadFileMetadataForPartitions(client, partBuilders, isRefresh, null);
+  }
+
+  /**
    * Helper method to load the block locations for each partition in 'parts'.
    * New file descriptor lists are loaded and the partitions are updated in place.
    *
@@ -692,8 +705,8 @@ public class HdfsTable extends Table implements FeFsTable {
    * @return time in nanoseconds spent in loading file metadata.
    */
   private long loadFileMetadataForPartitions(IMetaStoreClient client,
-      Collection<HdfsPartition.Builder> partBuilders, boolean isRefresh)
-      throws CatalogException {
+      Collection<HdfsPartition.Builder> partBuilders, boolean isRefresh,
+      String debugActions) throws CatalogException {
     final Clock clock = Clock.defaultClock();
     long startTime = clock.getTick();
 
@@ -714,9 +727,9 @@ public class HdfsTable extends Table implements FeFsTable {
     // we'll throw an exception here and end up bailing out of whatever catalog operation
     // we're in the middle of. This could cause a partial metadata update -- eg we may
     // have refreshed the top-level table properties without refreshing the files.
-    new ParallelFileMetadataLoader(
-        this, partBuilders, validWriteIds_, validTxnList, logPrefix)
-        .load();
+    ParallelFileMetadataLoader loader = new ParallelFileMetadataLoader(
+        this, partBuilders, validWriteIds_, validTxnList, debugActions, logPrefix);
+    loader.load();
 
     // TODO(todd): would be good to log a summary of the loading process:
     // - how many block locations did we reuse/load individually/load via batch
@@ -1059,7 +1072,7 @@ public class HdfsTable extends Table implements FeFsTable {
       throws TableLoadingException {
     load(reuseMetadata, client, msTbl, /* loadPartitionFileMetadata */
         true, /* loadTableSchema*/true, false,
-        /* partitionsToUpdate*/null, reason);
+        /* partitionsToUpdate*/null, null, reason);
   }
 
   public void load(boolean reuseMetadata, IMetaStoreClient client,
@@ -1067,7 +1080,16 @@ public class HdfsTable extends Table implements FeFsTable {
       String reason) throws TableLoadingException {
     load(reuseMetadata, client, msTbl, /* loadPartitionFileMetadata */
         true, /* loadTableSchema*/true, refreshUpdatedPartitions,
-        /* partitionsToUpdate*/null, reason);
+        /* partitionsToUpdate*/null, null, reason);
+  }
+
+  public void load(boolean reuseMetadata, IMetaStoreClient hiveClient,
+      org.apache.hadoop.hive.metastore.api.Table msTbl,
+      boolean refreshUpdatedPartitions, String debugAction, String reason)
+      throws CatalogException {
+    load(reuseMetadata, hiveClient, msTbl, /* loadPartitionFileMetadata */
+        true, /* loadTableSchema*/true, refreshUpdatedPartitions,
+        /* partitionsToUpdate*/null, debugAction, reason);
   }
 
   /**
@@ -1097,8 +1119,8 @@ public class HdfsTable extends Table implements FeFsTable {
   public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl,
       boolean loadParitionFileMetadata, boolean loadTableSchema,
-      boolean refreshUpdatedPartitions, Set<String> partitionsToUpdate, String reason)
-      throws TableLoadingException {
+      boolean refreshUpdatedPartitions, Set<String> partitionsToUpdate,
+      @Nullable String debugAction, String reason) throws TableLoadingException {
     final Timer.Context context =
         getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
     String annotation = String.format("%s metadata for %s%s partition(s) of %s.%s (%s)",
@@ -1133,14 +1155,15 @@ public class HdfsTable extends Table implements FeFsTable {
           storageMetadataLoadTime_ += updateMdFromHmsTable(msTbl);
           if (msTbl.getPartitionKeysSize() == 0) {
             if (loadParitionFileMetadata) {
-              storageMetadataLoadTime_ += updateUnpartitionedTableFileMd(client);
+              storageMetadataLoadTime_ += updateUnpartitionedTableFileMd(client,
+                  debugAction);
             } else {  // Update the single partition stats in case table stats changes.
               updateUnpartitionedTableStats();
             }
           } else {
             storageMetadataLoadTime_ += updatePartitionsFromHms(
                 client, partitionsToUpdate, loadParitionFileMetadata,
-                refreshUpdatedPartitions);
+                refreshUpdatedPartitions, debugAction);
           }
           LOG.info("Incrementally loaded table metadata for: " + getFullName());
         } else {
@@ -1223,7 +1246,7 @@ public class HdfsTable extends Table implements FeFsTable {
    * This is optimized for the case where few files have changed. See
    * {@link FileMetadataLoader#load} for details.
    */
-  private long updateUnpartitionedTableFileMd(IMetaStoreClient client)
+  private long updateUnpartitionedTableFileMd(IMetaStoreClient client, String debugAction)
       throws CatalogException {
     Preconditions.checkState(getNumClusteringCols() == 0);
     if (LOG.isTraceEnabled()) {
@@ -1245,7 +1268,7 @@ public class HdfsTable extends Table implements FeFsTable {
     // partition instance to local catalog coordinators.
     partBuilder.setPrevId(oldPartition.getId());
     long fileMdLoadTime = loadFileMetadataForPartitions(client,
-        ImmutableList.of(partBuilder), /*isRefresh=*/true);
+        ImmutableList.of(partBuilder), /*isRefresh=*/true, debugAction);
     setUnpartitionedTableStats(partBuilder);
     addPartition(partBuilder.build());
     return fileMdLoadTime;
@@ -1277,7 +1300,7 @@ public class HdfsTable extends Table implements FeFsTable {
    */
   private long updatePartitionsFromHms(IMetaStoreClient client,
       Set<String> partitionsToUpdate, boolean loadPartitionFileMetadata,
-      boolean refreshUpdatedPartitions) throws Exception {
+      boolean refreshUpdatedPartitions, String debugAction) throws Exception {
     if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + getFullName());
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     Preconditions.checkNotNull(msTbl);
@@ -1285,9 +1308,9 @@ public class HdfsTable extends Table implements FeFsTable {
     Preconditions.checkState(loadPartitionFileMetadata || partitionsToUpdate == null);
     PartitionDeltaUpdater deltaUpdater =
         refreshUpdatedPartitions ? new PartBasedDeltaUpdater(client,
-            loadPartitionFileMetadata, partitionsToUpdate)
+            loadPartitionFileMetadata, partitionsToUpdate, debugAction)
             : new PartNameBasedDeltaUpdater(client, loadPartitionFileMetadata,
-                partitionsToUpdate);
+                partitionsToUpdate, debugAction);
     deltaUpdater.apply();
     return deltaUpdater.loadTimeForFileMdNs_;
   }
@@ -1310,12 +1333,14 @@ public class HdfsTable extends Table implements FeFsTable {
     // if loadFileMd_ flag is set, files for these partitions will also be
     // reloaded.
     private final Set<String> partitionsToUpdate_;
+    private final String debugAction_;
 
     PartitionDeltaUpdater(IMetaStoreClient client, boolean loadPartitionFileMetadata,
-        Set<String> partitionsToUpdate) {
+        Set<String> partitionsToUpdate, String debugAction) {
       this.client_ = client;
       this.loadFileMd_ = loadPartitionFileMetadata;
       this.partitionsToUpdate_ = partitionsToUpdate;
+      this.debugAction_ = debugAction;
     }
 
     /**
@@ -1407,7 +1432,7 @@ public class HdfsTable extends Table implements FeFsTable {
               .collect(Collectors.toList());
         }
         loadTimeForFileMdNs_ += loadFileMetadataForPartitions(client_,
-            partitionsToLoadFiles,/* isRefresh=*/true);
+            partitionsToLoadFiles,/* isRefresh=*/true, debugAction_);
         updatePartitions(partitionsToLoadFiles);
       }
     }
@@ -1434,8 +1459,8 @@ public class HdfsTable extends Table implements FeFsTable {
 
     public PartBasedDeltaUpdater(
         IMetaStoreClient client, boolean loadPartitionFileMetadata,
-        Set<String> partitionsToUpdate) throws Exception {
-      super(client, loadPartitionFileMetadata, partitionsToUpdate);
+        Set<String> partitionsToUpdate, String debugAction) throws Exception {
+      super(client, loadPartitionFileMetadata, partitionsToUpdate, debugAction);
       Stopwatch sw = Stopwatch.createStarted();
       List<Partition> partitionList;
       if (partitionsToUpdate != null) {
@@ -1533,8 +1558,8 @@ public class HdfsTable extends Table implements FeFsTable {
 
     public PartNameBasedDeltaUpdater(
         IMetaStoreClient client, boolean loadPartitionFileMetadata,
-        Set<String> partitionsToUpdate) throws Exception {
-      super(client, loadPartitionFileMetadata, partitionsToUpdate);
+        Set<String> partitionsToUpdate, String debugAction) throws Exception {
+      super(client, loadPartitionFileMetadata, partitionsToUpdate, debugAction);
       // Retrieve all the partition names from the Hive Metastore. We need this to
       // identify the delta between partitions of the local HdfsTable and the table entry
       // in the Hive Metastore. Note: This is a relatively "cheap" operation
@@ -2230,8 +2255,10 @@ public class HdfsTable extends Table implements FeFsTable {
    * Returns the HDFS paths corresponding to HdfsTable partitions that don't exist in
    * the Hive Metastore. An HDFS path is represented as a list of strings values, one per
    * partition key column.
+   * @param debugAction
    */
-  public List<List<String>> getPathsWithoutPartitions() throws CatalogException {
+  public List<List<String>> getPathsWithoutPartitions(@Nullable String debugAction)
+      throws CatalogException {
     Set<List<LiteralExpr>> existingPartitions = new HashSet<>();
     // Get the list of partition values of existing partitions in Hive Metastore.
     for (HdfsPartition partition: partitionMap_.values()) {
@@ -2247,6 +2274,7 @@ public class HdfsTable extends Table implements FeFsTable {
     try {
       getAllPartitionsNotInHms(basePath, partitionKeys, existingPartitions,
           partitionsNotInHms);
+      DebugUtils.executeDebugAction(debugAction, DebugUtils.RECOVER_PARTITIONS_DELAY);
     } catch (Exception e) {
       throw new CatalogException(String.format("Failed to recover partitions for %s " +
           "with exception:%s.", getFullName(), e));
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index e134d22..04e1bf9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -232,7 +232,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
         loadSchemaFromIceberg();
         // Loading hdfs table after loaded schema from Iceberg,
         // in case we create external Iceberg table skipping column info in sql.
-        hdfsTable_.load(false, msClient, msTable_, true, true, false, null, reason);
+        hdfsTable_.load(false, msClient, msTable_, true, true, false, null, null, reason);
         pathHashToFileDescMap_ = Utils.loadAllPartition(this);
         loadAllColumnStats(msClient);
       } catch (Exception e) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
index 616bb50..8ac4e2d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
@@ -33,19 +33,14 @@ import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.impala.catalog.FeFsTable.Utils;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
-import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Pair;
-import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.BackendConfig;
-import org.apache.impala.thrift.TValidWriteIdList;
 import org.apache.impala.util.ThreadNameAnnotator;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.MoreExecutors;
 
 
@@ -76,8 +71,8 @@ public class ParallelFileMetadataLoader {
 
   public ParallelFileMetadataLoader(HdfsTable table,
       Collection<HdfsPartition.Builder> partBuilders,
-      ValidWriteIdList writeIdList, ValidTxnList validTxnList, String logPrefix)
-      throws CatalogException {
+      ValidWriteIdList writeIdList, ValidTxnList validTxnList, String debugAction,
+      String logPrefix) throws CatalogException {
     if (writeIdList != null || validTxnList != null) {
       // make sure that both either both writeIdList and validTxnList are set or both
       // of them are not.
@@ -104,6 +99,7 @@ public class ParallelFileMetadataLoader {
       boolean hasCachedPartition = Iterables.any(e.getValue(),
           HdfsPartition.Builder::isMarkedCached);
       loader.setForceRefreshBlockLocations(hasCachedPartition);
+      loader.setDebugAction(debugAction);
       loaders_.put(e.getKey(), loader);
     }
     this.logPrefix_ = logPrefix;
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index b4a41b2..ea01c6a 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.impala.catalog.HdfsCompression;
+import org.apache.impala.util.DebugUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +54,7 @@ import java.util.UUID;
  * Common utility functions for operating on FileSystem objects.
  */
 public class FileSystemUtil {
+
   private static final Configuration CONF = new Configuration();
   private static final Logger LOG = LoggerFactory.getLogger(FileSystemUtil.class);
 
@@ -564,7 +566,7 @@ public class FileSystemUtil {
    * Note that the order (breadth-first vs depth-first, sorted vs not) is undefined.
    */
   public static RemoteIterator<? extends FileStatus> listStatus(FileSystem fs, Path p,
-      boolean recursive) throws IOException {
+      boolean recursive, String debugAction) throws IOException {
     try {
       if (recursive) {
         // The Hadoop FileSystem API doesn't provide a recursive listStatus call that
@@ -584,12 +586,12 @@ public class FileSystemUtil {
         // even though it returns LocatedFileStatus objects with "fake" blocks which we
         // will ignore.
         if (isS3AFileSystem(fs)) {
-          return listFiles(fs, p, true);
+          return listFiles(fs, p, true, debugAction);
         }
-
+        DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY);
         return new FilterIterator(p, new RecursingIterator(fs, p));
       }
-
+      DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY);
       return new FilterIterator(p, fs.listStatusIterator(p));
     } catch (FileNotFoundException e) {
       if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + p.toString(), e);
@@ -601,8 +603,9 @@ public class FileSystemUtil {
    * Wrapper around FileSystem.listFiles(), similar to the listStatus() wrapper above.
    */
   public static RemoteIterator<? extends FileStatus> listFiles(FileSystem fs, Path p,
-      boolean recursive) throws IOException {
+      boolean recursive, String debugAction) throws IOException {
     try {
+      DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY);
       return new FilterIterator(p, fs.listFiles(p, recursive));
     } catch (FileNotFoundException e) {
       if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + p.toString(), e);
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 7f5aae2..05ff417 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -42,6 +42,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import javax.annotation.Nullable;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileSystem;
@@ -197,6 +198,7 @@ import org.apache.impala.thrift.TUpdateCatalogResponse;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.AcidUtils.TblTransaction;
 import org.apache.impala.util.CompressionUtil;
+import org.apache.impala.util.DebugUtils;
 import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.impala.util.IcebergUtil;
@@ -358,7 +360,7 @@ public class CatalogOpExecutor {
           TAlterTableParams alter_table_params = ddlRequest.getAlter_table_params();
           tTableName = Optional.of(alter_table_params.getTable_name());
           catalogOpMetric_.increment(ddl_type, tTableName);
-          alterTable(alter_table_params, response);
+          alterTable(alter_table_params, ddlRequest.getDebug_action(), response);
           break;
         case ALTER_VIEW:
           TCreateOrAlterViewParams alter_view_params = ddlRequest.getAlter_view_params();
@@ -632,7 +634,8 @@ public class CatalogOpExecutor {
    * table metadata, except for RENAME, ADD PARTITION and DROP PARTITION. This call is
    * thread-safe, i.e. concurrent operations on the same table are serialized.
    */
-  private void alterTable(TAlterTableParams params, TDdlExecResponse response)
+  private void alterTable(TAlterTableParams params, @Nullable String debugAction,
+      TDdlExecResponse response)
       throws ImpalaException {
     // When true, loads the file/block metadata.
     boolean reloadFileMetadata = false;
@@ -813,7 +816,7 @@ public class CatalogOpExecutor {
           Preconditions.checkState(params.isSetUpdate_stats_params());
           Reference<Long> numUpdatedColumns = new Reference<>(0L);
           alterTableUpdateStats(tbl, params.getUpdate_stats_params(),
-              numUpdatedPartitions, numUpdatedColumns);
+              numUpdatedPartitions, numUpdatedColumns, debugAction);
           reloadTableSchema = true;
           addSummary(response, "Updated " + numUpdatedPartitions.getRef() +
               " partition(s) and " + numUpdatedColumns.getRef() + " column(s).");
@@ -834,7 +837,7 @@ public class CatalogOpExecutor {
           }
           break;
         case RECOVER_PARTITIONS:
-          alterTableRecoverPartitions(tbl);
+          alterTableRecoverPartitions(tbl, debugAction);
           addSummary(response, "Partitions have been recovered.");
           break;
         case SET_OWNER:
@@ -942,7 +945,8 @@ public class CatalogOpExecutor {
           getMetaStoreTable(msClient, tbl);
       if (tbl instanceof HdfsTable) {
         ((HdfsTable) tbl).load(true, msClient.getHiveClient(), msTbl,
-            reloadFileMetadata, reloadTableSchema, false, partitionsToUpdate, reason);
+            reloadFileMetadata, reloadTableSchema, false, partitionsToUpdate, null,
+            reason);
       } else {
         tbl.load(true, msClient.getHiveClient(), msTbl, reason);
       }
@@ -1059,7 +1063,8 @@ public class CatalogOpExecutor {
    * and 'numUpdatedColumns', respectively.
    */
   private void alterTableUpdateStats(Table table, TAlterTableUpdateStatsParams params,
-      Reference<Long> numUpdatedPartitions, Reference<Long> numUpdatedColumns)
+      Reference<Long> numUpdatedPartitions, Reference<Long> numUpdatedColumns,
+      @Nullable String debugAction)
       throws ImpalaException {
     Preconditions.checkState(table.getLock().isHeldByCurrentThread());
     Preconditions.checkState(params.isSetTable_stats() || params.isSetColumn_stats());
@@ -1104,6 +1109,7 @@ public class CatalogOpExecutor {
         throw ex;
       }
     }
+    DebugUtils.executeDebugAction(debugAction, DebugUtils.UPDATE_STATS_DELAY);
   }
 
   private void alterTableUpdateStatsInner(Table table,
@@ -3701,13 +3707,15 @@ public class CatalogOpExecutor {
    * Recover partitions of specified table.
    * Add partitions to metastore which exist in HDFS but not in metastore.
    */
-  private void alterTableRecoverPartitions(Table tbl) throws ImpalaException {
+  private void alterTableRecoverPartitions(Table tbl, @Nullable String debugAction)
+      throws ImpalaException {
     Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread());
     if (!(tbl instanceof HdfsTable)) {
       throw new CatalogException("Table " + tbl.getFullName() + " is not an HDFS table");
     }
     HdfsTable hdfsTable = (HdfsTable) tbl;
-    List<List<String>> partitionsNotInHms = hdfsTable.getPathsWithoutPartitions();
+    List<List<String>> partitionsNotInHms = hdfsTable
+        .getPathsWithoutPartitions(debugAction);
     if (partitionsNotInHms.isEmpty()) return;
 
     List<Partition> hmsPartitions = Lists.newArrayList();
@@ -4327,7 +4335,7 @@ public class CatalogOpExecutor {
                 //   2: If no need for a full table reload then fetch partition level
                 //     writeIds and reload only the ones that changed.
                 updatedThriftTable = catalog_
-                    .reloadTable(tbl, req.refresh_updated_hms_partitions, cmdString);
+                    .reloadTable(tbl, req, cmdString);
               }
             } else {
               // Table was loaded from scratch, so it's already "refreshed".
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 69301f9..3c0f57f 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -667,11 +667,21 @@ public class Frontend {
           clientRequest.getRedacted_stmt() : clientRequest.getStmt());
       ddl.getDdl_params().setHeader(header);
       ddl.getDdl_params().setSync_ddl(ddl.isSync_ddl());
+      // forward debug_actions to the catalogd
+      if (result.getQuery_options().isSetDebug_action()) {
+        ddl.getDdl_params()
+            .setDebug_action(result.getQuery_options().getDebug_action());
+      }
     }
     if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) {
       ddl.getReset_metadata_params().setSync_ddl(ddl.isSync_ddl());
       ddl.getReset_metadata_params().setRefresh_updated_hms_partitions(
           result.getQuery_options().isRefresh_updated_hms_partitions());
+      // forward debug_actions to the catalogd
+      if (result.getQuery_options().isSetDebug_action()) {
+        ddl.getReset_metadata_params()
+            .setDebug_action(result.getQuery_options().getDebug_action());
+      }
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
new file mode 100644
index 0000000..3c48940
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import java.util.List;
+import java.util.Random;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is the DebugAction equivalent from the backend (see DebugActionImpl in
+ * debug-util.cc). This is useful to execute certain debug actions (like Sleep, Jitter)
+ * which can be executed from the code. The debug actions are passed to the CatalogService
+ * using a query option (debug_action).
+ */
+public class DebugUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DebugUtils.class);
+  private static final Random random = new Random();
+
+  // debug action label for introducing HDFS listing delay during listFiles or statuses.
+  public static final String REFRESH_HDFS_LISTING_DELAY
+      = "catalogd_refresh_hdfs_listing_delay";
+
+  // debug action label for introducing delay in alter table recover partitions command.
+  public static final String RECOVER_PARTITIONS_DELAY = "catalogd_table_recover_delay";
+
+  // debug action label for introducing delay in update stats command.
+  public static final String UPDATE_STATS_DELAY = "catalogd_update_stats_delay";
+
+  /**
+   * Given list of debug actions, execute the debug action pertaining to the given label.
+   * The debugActions string is of the format specified for the query_option/configuration
+   * debug_actions. It is generally of the format
+   * LABEL:ACTION@ACTION_PARAMS|LABEL:ACTION@ACTION_PARAMS.
+   * For example, if the debug action configuration is:
+   * CATALOGD_HDFS_LISTING_DELAY:SLEEP@100|CATALOGD_HMS_RPC_DELAY:JITTER@100@0.2
+   * Then a when a label "CATALOGD_HDFS_LISTING_DELAY" is provided, this method will sleep
+   * for 100 milli-seconds. If the label CATALOGD_HMS_RPC_DELAY is provided, this method
+   * will sleep for a random value between 1-100 milli-seconds with a probability of 0.2.
+   *
+   * @param debugActions the debug actions with the format given in the description
+   *                     above.
+   * @param label        the label of action which needs to be executed.
+   */
+  public static void executeDebugAction(String debugActions, String label) {
+    if (Strings.isNullOrEmpty(debugActions)) {
+      return;
+    }
+    List<String> actions = Splitter.on('|').splitToList(debugActions);
+    for (String action : actions) {
+      List<String> components = Splitter.on(':').splitToList(action);
+      if (components.isEmpty()) continue;
+      if (!components.get(0).equalsIgnoreCase(label)) continue;
+      // found the debug action for the given label
+      // get the debug action params
+      Preconditions.checkState(components.size() > 1,
+          "Invalid debug action " + action);
+      List<String> actionParams = Splitter.on('@').splitToList(components.get(1));
+      Preconditions.checkState(actionParams.size() > 1,
+          "Illegal debug action format found in " + debugActions + " for label"
+              + label);
+      switch (actionParams.get(0)) {
+        case "SLEEP":
+          // the debug action params should be of the format SLEEP@<millis>
+          Preconditions.checkState(actionParams.size() == 2);
+          try {
+            int timeToSleepMs = Integer.parseInt(actionParams.get(1).trim());
+            LOG.trace("Sleeping for {} msec to execute debug action {}",
+                timeToSleepMs, label);
+            Thread.sleep(timeToSleepMs);
+          } catch (NumberFormatException ex) {
+            LOG.error("Invalid number format in debug action {}", action);
+          } catch (InterruptedException e) {
+            LOG.warn("Sleep interrupted for the debug action {}", label);
+          }
+          break;
+        case "JITTER":
+          // the JITTER debug action is of format JITTER@<millis>[@<probability>}
+          Preconditions.checkState(actionParams.size() <= 3);
+          try {
+            int maxTimeToSleepMs = Integer.parseInt(actionParams.get(1).trim());
+            boolean shouldExecute = true;
+            if (actionParams.size() == 3) {
+              shouldExecute = parseProbability(actionParams.get(2));
+            }
+            if (!shouldExecute) {
+              continue;
+            }
+            long timeToSleepMs = random.nextInt(maxTimeToSleepMs);
+            LOG.trace("Sleeping for {} msec to execute debug action {}",
+                timeToSleepMs, action);
+            Thread.sleep(timeToSleepMs);
+          } catch (NumberFormatException ex) {
+            LOG.error("Invalid number format in debug action {}", action);
+          } catch (InterruptedException ex) {
+            LOG.warn("Sleep interrupted for the debug action {}", label);
+          }
+          break;
+        default:
+          LOG.error("Debug action {} is not implemented", actionParams.get(0));
+      }
+    }
+  }
+
+
+  /**
+   * Parses the probability action parameter of a debug action.
+   *
+   * @return true if the action should be executed, else false.
+   */
+  private static boolean parseProbability(String probability) {
+    double p = Double.parseDouble(probability.trim());
+    if (p <= 0 || p > 1.0) {
+      return false;
+    }
+    return random.nextDouble() < p;
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 27e1ab1..50e7f85 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -911,7 +911,7 @@ public class MetastoreEventsProcessorTest {
       }
       List<String> filesCopied = new ArrayList<>();
       RemoteIterator<? extends FileStatus> it = FileSystemUtil
-          .listStatus(srcFs, src, true);
+          .listStatus(srcFs, src, true, null);
       while (it.hasNext()) {
         FileStatus status = it.next();
         if (status.isDirectory()) continue;
diff --git a/fe/src/test/java/org/apache/impala/util/DebugUtilsTest.java b/fe/src/test/java/org/apache/impala/util/DebugUtilsTest.java
new file mode 100644
index 0000000..9a19c84
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/util/DebugUtilsTest.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the debug actions implementation.
+ */
+public class DebugUtilsTest {
+
+  @Test
+  public void testSleepDebugAction() {
+    DebugUtils.executeDebugAction("TEST_SLEEP_ACTION:SLEEP@1", "test_sleep_action");
+    long startTime = System.currentTimeMillis();
+    DebugUtils
+        .executeDebugAction("TEST_SLEEP_ACTION:SLEEP@100|SOME_OTHER_ACTION:SLEEP@10",
+            "SOME_OTHER_ACTION");
+    long endTime = System.currentTimeMillis();
+    // make sure you are executing the right sleep action
+    Assert.assertTrue(endTime - startTime < 100 && endTime - startTime >= 10);
+    // make sure that code doesn't throw if label is not found
+    DebugUtils.executeDebugAction("TEST_SLEEP_ACTION:SLEEP@1", "INVALID_LABEL");
+    // make sure that code doesn't throw if there is a unsupported action type
+    DebugUtils.executeDebugAction("TEST_SLEEP_ACTION:NOT_FOUND@1", "TEST_SLEEP_ACTION");
+  }
+
+  @Test(expected = Exception.class)
+  public void testSleepDebugActionNegative() throws Exception {
+    DebugUtils.executeDebugAction("TEST_SLEEP_ACTION:SLEEP10", "TEST_SLEEP_ACTION");
+    DebugUtils.executeDebugAction("TEST_SLEEP_ACTION|SLEEP10", "TEST_SLEEP_ACTION");
+    DebugUtils.executeDebugAction("TEST_SLEEP_ACTION@SLEEP:10", "TEST_SLEEP_ACTION");
+  }
+
+  @Test
+  public void testJitter() {
+    DebugUtils.executeDebugAction("TEST_JITTER_ACTION:JITTER@1", "test_jitter_action");
+    long startTime = System.currentTimeMillis();
+    DebugUtils.executeDebugAction(
+        "SOME_OTHER_ACTION:SLEEP@100|TEST_JITTER_ACTION:JITTER@10@0.2",
+        "test_jitter_action");
+    long endTime = System.currentTimeMillis();
+    Assert.assertTrue(endTime - startTime < 100);
+  }
+
+  @Test(expected = Exception.class)
+  public void testJitterNegative() throws Exception {
+    DebugUtils.executeDebugAction("TEST_JITTER_ACTION@JITTER:1", "test_jitter_action");
+    DebugUtils.executeDebugAction("TEST_JITTER_ACTION:JITTER", "test_jitter_action");
+  }
+}
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index ac11ed3..5d1a1fa 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -842,6 +842,17 @@ class ImpalaTestSuite(BaseTestSuite):
   def execute_query(self, query, query_options=None):
     return self.__execute_query(self.client, query, query_options)
 
+  def exec_and_time(self, query, query_options=None, impalad=0):
+    """Executes a given query on the given impalad and returns the time taken in
+    millisecondsas seen by the client."""
+    client = self.create_client_for_nth_impalad(impalad)
+    if query_options is not None:
+      client.set_configuration(query_options)
+    start_time = int(round(time.time() * 1000))
+    client.execute(query)
+    end_time = int(round(time.time() * 1000))
+    return end_time - start_time
+
   def execute_query_using_client(self, client, query, vector):
     self.change_database(client, vector.get_value('table_format'))
     query_options = vector.get_value('exec_option')
diff --git a/tests/metadata/test_catalogd_debug_actions.py b/tests/metadata/test_catalogd_debug_actions.py
new file mode 100644
index 0000000..0cf3c8b
--- /dev/null
+++ b/tests/metadata/test_catalogd_debug_actions.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+
+class TestDebugActions(ImpalaTestSuite):
+
+  @pytest.mark.execute_serially
+  def test_catalogd_debug_actions(self, unique_database):
+    self.client.execute("refresh tpcds.store_sales")
+    self.client.execute(
+      "create table {0}.test like functional.alltypes".format(unique_database))
+    self.client.execute("insert into {0}.test partition (year,month) "
+      "select * from functional.alltypes".format(unique_database))
+    self.client.execute("compute stats {0}.test".format(unique_database))
+    self.__run_debug_action("refresh tpcds.store_sales",
+      debug_action="catalogd_refresh_hdfs_listing_delay:SLEEP@50", delta=2000)
+    self.__run_debug_action("refresh tpcds.store_sales",
+      debug_action="catalogd_refresh_hdfs_listing_delay:JITTER@50@0.5", delta=2000)
+    self.__run_debug_action(
+      "alter table {0}.test recover partitions".format(unique_database),
+      debug_action="catalogd_table_recover_delay:SLEEP@3000", delta=2000)
+    # the variance of compute stats statement could itself be within few hundred
+    # millisecs hence adding additional delay of 4000 doesn't necessarily slow down the
+    # query by 4000 ms always.
+    self.__run_debug_action("compute stats {0}.test".format(unique_database),
+      debug_action="catalogd_update_stats_delay:SLEEP@4000", delta=3000)
+
+  def __run_debug_action(self, query, debug_action, delta):
+    """Test makes sure that the given debug_action is set is indeed causing the query
+    to run slower."""
+    time_taken_before = self.exec_and_time(query)
+    time_taken_after = self.exec_and_time(query, {"debug_action": debug_action})
+    assert (time_taken_after - time_taken_before) > delta