You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/10/22 06:01:39 UTC
[impala] 01/02: IMPALA-10219: Expose DEBUG_ACTION query option in
catalog
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 15c3b13e9730479e096275d974000ae9fe8fbb83
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Sat Oct 3 17:01:35 2020 -0700
IMPALA-10219: Expose DEBUG_ACTION query option in catalog
This patches enables DEBUG_ACTION in the catalog service's
java code. Specifically, DEBUG_ACTION query option is now
exposed to TResetMetadataRequest and TExecDdlRequest
so that we can inject delays while executing refresh
or ddl statements.
For example,
1. To inject a delay of 100ms per HDFS list operation
during refresh statement set the following query option:
set debug_action=catalogd_refresh_hdfs_listing_delay:SLEEP@100;
2. To inject a delay of 100ms in alter table recover
partitions statement:
set debug_action=catalogd_table_recover_delay:SLEEP@100;
3. To inject a delay of 100ms in compute stats statement
set debug_action=catalogd_update_stats_delay:SLEEP@100;
Note that this option only adds the delay during the
update_stats phase of the compute stats execution.
Testing:
1. Added a test which sets the query option and makes
sure that command takes more time than without query option.
2. Added unit tests for the debugAction implementation
logic.
Change-Id: Ia7196b1ce76415a5faf3fa8575a26d22b2bf50b1
Reviewed-on: http://gerrit.cloudera.org:8080/16548
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/catalog-op-executor.cc | 1 +
be/src/util/debug-util.cc | 4 +
common/thrift/CatalogService.thrift | 6 +
.../impala/catalog/CatalogServiceCatalog.java | 12 +-
.../apache/impala/catalog/FileMetadataLoader.java | 21 +++-
.../java/org/apache/impala/catalog/HdfsTable.java | 74 +++++++----
.../org/apache/impala/catalog/IcebergTable.java | 2 +-
.../impala/catalog/ParallelFileMetadataLoader.java | 10 +-
.../org/apache/impala/common/FileSystemUtil.java | 13 +-
.../apache/impala/service/CatalogOpExecutor.java | 26 ++--
.../java/org/apache/impala/service/Frontend.java | 10 ++
.../java/org/apache/impala/util/DebugUtils.java | 137 +++++++++++++++++++++
.../events/MetastoreEventsProcessorTest.java | 2 +-
.../org/apache/impala/util/DebugUtilsTest.java | 67 ++++++++++
tests/common/impala_test_suite.py | 11 ++
tests/metadata/test_catalogd_debug_actions.py | 50 ++++++++
16 files changed, 389 insertions(+), 57 deletions(-)
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index f22dfad..0ffd708 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -132,6 +132,7 @@ Status CatalogOpExecutor::ExecComputeStats(
TDdlExecRequest& update_stats_req = catalog_op_req.ddl_params;
update_stats_req.__set_ddl_type(TDdlType::ALTER_TABLE);
update_stats_req.__set_sync_ddl(compute_stats_request.sync_ddl);
+ update_stats_req.__set_debug_action(compute_stats_request.ddl_params.debug_action);
const TComputeStatsParams& compute_stats_params =
compute_stats_request.ddl_params.compute_stats_params;
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index e176852..af10ddd 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -352,6 +352,10 @@ static bool ParseProbability(const string& prob_str, bool* should_execute) {
return true;
}
+/// The catalog java code also implements a equivalent method for processing the debug
+/// actions in the Java code. See DebugUtils.java for more details. Any changes to the
+/// implementation logic here like adding a new type of action, should make changes in
+/// the DebugUtils.java too.
Status DebugActionImpl(
const string& debug_action, const char* label, const std::vector<string>& args) {
const DebugActionTokens& action_list = TokenizeDebugActions(debug_action);
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 5acdb7f..ad316b9 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -156,6 +156,9 @@ struct TDdlExecRequest {
// Parameters for replaying an exported testcase.
25: optional JniCatalog.TCopyTestCaseReq copy_test_case_params
+
+ // Passes the debug actions to catalogd if the query option is set.
+ 26: optional string debug_action
}
// Response from executing a TDdlExecRequest
@@ -256,6 +259,9 @@ struct TResetMetadataRequest {
// If set, refreshes partition objects which are modified externally.
// Applicable only when refreshing the table.
9: optional bool refresh_updated_hms_partitions
+
+ // debug_action is set from the query_option when available.
+ 10: optional string debug_action
}
// Response from TResetMetadataRequest
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 5c6bf1a..38aed8f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -95,6 +95,7 @@ import org.apache.impala.thrift.TPartitionKeyValue;
import org.apache.impala.thrift.TPartitionStats;
import org.apache.impala.thrift.TPrincipalType;
import org.apache.impala.thrift.TPrivilege;
+import org.apache.impala.thrift.TResetMetadataRequest;
import org.apache.impala.thrift.TTable;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.thrift.TTableType;
@@ -2275,7 +2276,7 @@ public class CatalogServiceCatalog extends Catalog {
* {@code refreshUpdatedPartitions} argument.
*/
public TCatalogObject reloadTable(Table tbl, String reason) throws CatalogException {
- return reloadTable(tbl, false, reason);
+ return reloadTable(tbl, new TResetMetadataRequest(), reason);
}
/**
@@ -2287,8 +2288,8 @@ public class CatalogServiceCatalog extends Catalog {
* If {@code refreshUpdatedParts} is true, the refresh logic detects updated
* partitions in metastore and reloads them too.
*/
- public TCatalogObject reloadTable(Table tbl, boolean refreshUpdatedParts, String reason)
- throws CatalogException {
+ public TCatalogObject reloadTable(Table tbl, TResetMetadataRequest request,
+ String reason) throws CatalogException {
LOG.info(String.format("Refreshing table metadata: %s", tbl.getFullName()));
Preconditions.checkState(!(tbl instanceof IncompleteTable));
String dbName = tbl.getDb().getName();
@@ -2312,7 +2313,8 @@ public class CatalogServiceCatalog extends Catalog {
}
if (tbl instanceof HdfsTable) {
((HdfsTable) tbl)
- .load(true, msClient.getHiveClient(), msTbl, refreshUpdatedParts, reason);
+ .load(true, msClient.getHiveClient(), msTbl,
+ request.refresh_updated_hms_partitions, request.debug_action, reason);
} else {
tbl.load(true, msClient.getHiveClient(), msTbl, reason);
}
@@ -3305,7 +3307,7 @@ public class CatalogServiceCatalog extends Catalog {
.map(HdfsPartition.Builder::new)
.collect(Collectors.toList());
new ParallelFileMetadataLoader(
- table, partBuilders, reqWriteIdList, validTxnList, logPrefix).load();
+ table, partBuilders, reqWriteIdList, validTxnList, null, logPrefix).load();
for (HdfsPartition.Builder builder : partBuilders) {
// Let's retrieve the original partition instance from builder because this is
// stored in the keys of 'partToPartialInfoMap'.
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index 2daca11..c1cba7f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Reference;
@@ -73,6 +72,7 @@ public class FileMetadataLoader {
private List<FileDescriptor> loadedInsertDeltaFds_;
private List<FileDescriptor> loadedDeleteDeltaFds_;
private LoadStats loadStats_;
+ private String debugAction_;
/**
* @param partDir the dir for which to fetch file metadata
@@ -177,16 +177,17 @@ public class FileMetadataLoader {
(oldFdsByRelPath_.isEmpty() || forceRefreshLocations);
String msg = String.format("%s file metadata%s from path %s",
- oldFdsByRelPath_.isEmpty() ? "Loading" : "Refreshing",
- listWithLocations ? " with eager location-fetching" : "",
- partDir_);
+ oldFdsByRelPath_.isEmpty() ? "Loading" : "Refreshing",
+ listWithLocations ? " with eager location-fetching" : "", partDir_);
LOG.trace(msg);
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(msg)) {
RemoteIterator<? extends FileStatus> fileStatuses;
if (listWithLocations) {
- fileStatuses = FileSystemUtil.listFiles(fs, partDir_, recursive_);
+ fileStatuses = FileSystemUtil
+ .listFiles(fs, partDir_, recursive_, debugAction_);
} else {
- fileStatuses = FileSystemUtil.listStatus(fs, partDir_, recursive_);
+ fileStatuses = FileSystemUtil
+ .listStatus(fs, partDir_, recursive_, debugAction_);
// TODO(todd): we could look at the result of listing without locations, and if
// we see that a substantial number of the files have changed, it may be better
@@ -280,6 +281,14 @@ public class FileMetadataLoader {
(fd.getModificationTime() != status.getModificationTime());
}
+ /**
+ * Enables injection of a debug actions to introduce delays in HDFS listStatus or
+ * listFiles call during the file-metadata loading.
+ */
+ public void setDebugAction(String debugAction) {
+ this.debugAction_ = debugAction;
+ }
+
// File/Block metadata loading stats for a single HDFS path.
public static class LoadStats {
private final Path partDir_;
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 361de4e..75b68df 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -34,6 +34,7 @@ import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -87,6 +88,7 @@ import org.apache.impala.thrift.TTableType;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.AvroSchemaConverter;
import org.apache.impala.util.AvroSchemaUtils;
+import org.apache.impala.util.DebugUtils;
import org.apache.impala.util.FsPermissionCache;
import org.apache.impala.util.FsPermissionChecker;
import org.apache.impala.util.HdfsCachingUtil;
@@ -684,6 +686,17 @@ public class HdfsTable extends Table implements FeFsTable {
}
/**
+ * Similar to
+ * {@link #loadFileMetadataForPartitions(IMetaStoreClient, Collection, boolean)}
+ * but without any injecting the debug actions.
+ */
+ private long loadFileMetadataForPartitions(IMetaStoreClient client,
+ Collection<HdfsPartition.Builder> partBuilders, boolean isRefresh)
+ throws CatalogException {
+ return loadFileMetadataForPartitions(client, partBuilders, isRefresh, null);
+ }
+
+ /**
* Helper method to load the block locations for each partition in 'parts'.
* New file descriptor lists are loaded and the partitions are updated in place.
*
@@ -692,8 +705,8 @@ public class HdfsTable extends Table implements FeFsTable {
* @return time in nanoseconds spent in loading file metadata.
*/
private long loadFileMetadataForPartitions(IMetaStoreClient client,
- Collection<HdfsPartition.Builder> partBuilders, boolean isRefresh)
- throws CatalogException {
+ Collection<HdfsPartition.Builder> partBuilders, boolean isRefresh,
+ String debugActions) throws CatalogException {
final Clock clock = Clock.defaultClock();
long startTime = clock.getTick();
@@ -714,9 +727,9 @@ public class HdfsTable extends Table implements FeFsTable {
// we'll throw an exception here and end up bailing out of whatever catalog operation
// we're in the middle of. This could cause a partial metadata update -- eg we may
// have refreshed the top-level table properties without refreshing the files.
- new ParallelFileMetadataLoader(
- this, partBuilders, validWriteIds_, validTxnList, logPrefix)
- .load();
+ ParallelFileMetadataLoader loader = new ParallelFileMetadataLoader(
+ this, partBuilders, validWriteIds_, validTxnList, debugActions, logPrefix);
+ loader.load();
// TODO(todd): would be good to log a summary of the loading process:
// - how many block locations did we reuse/load individually/load via batch
@@ -1059,7 +1072,7 @@ public class HdfsTable extends Table implements FeFsTable {
throws TableLoadingException {
load(reuseMetadata, client, msTbl, /* loadPartitionFileMetadata */
true, /* loadTableSchema*/true, false,
- /* partitionsToUpdate*/null, reason);
+ /* partitionsToUpdate*/null, null, reason);
}
public void load(boolean reuseMetadata, IMetaStoreClient client,
@@ -1067,7 +1080,16 @@ public class HdfsTable extends Table implements FeFsTable {
String reason) throws TableLoadingException {
load(reuseMetadata, client, msTbl, /* loadPartitionFileMetadata */
true, /* loadTableSchema*/true, refreshUpdatedPartitions,
- /* partitionsToUpdate*/null, reason);
+ /* partitionsToUpdate*/null, null, reason);
+ }
+
+ public void load(boolean reuseMetadata, IMetaStoreClient hiveClient,
+ org.apache.hadoop.hive.metastore.api.Table msTbl,
+ boolean refreshUpdatedPartitions, String debugAction, String reason)
+ throws CatalogException {
+ load(reuseMetadata, hiveClient, msTbl, /* loadPartitionFileMetadata */
+ true, /* loadTableSchema*/true, refreshUpdatedPartitions,
+ /* partitionsToUpdate*/null, debugAction, reason);
}
/**
@@ -1097,8 +1119,8 @@ public class HdfsTable extends Table implements FeFsTable {
public void load(boolean reuseMetadata, IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl,
boolean loadParitionFileMetadata, boolean loadTableSchema,
- boolean refreshUpdatedPartitions, Set<String> partitionsToUpdate, String reason)
- throws TableLoadingException {
+ boolean refreshUpdatedPartitions, Set<String> partitionsToUpdate,
+ @Nullable String debugAction, String reason) throws TableLoadingException {
final Timer.Context context =
getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
String annotation = String.format("%s metadata for %s%s partition(s) of %s.%s (%s)",
@@ -1133,14 +1155,15 @@ public class HdfsTable extends Table implements FeFsTable {
storageMetadataLoadTime_ += updateMdFromHmsTable(msTbl);
if (msTbl.getPartitionKeysSize() == 0) {
if (loadParitionFileMetadata) {
- storageMetadataLoadTime_ += updateUnpartitionedTableFileMd(client);
+ storageMetadataLoadTime_ += updateUnpartitionedTableFileMd(client,
+ debugAction);
} else { // Update the single partition stats in case table stats changes.
updateUnpartitionedTableStats();
}
} else {
storageMetadataLoadTime_ += updatePartitionsFromHms(
client, partitionsToUpdate, loadParitionFileMetadata,
- refreshUpdatedPartitions);
+ refreshUpdatedPartitions, debugAction);
}
LOG.info("Incrementally loaded table metadata for: " + getFullName());
} else {
@@ -1223,7 +1246,7 @@ public class HdfsTable extends Table implements FeFsTable {
* This is optimized for the case where few files have changed. See
* {@link FileMetadataLoader#load} for details.
*/
- private long updateUnpartitionedTableFileMd(IMetaStoreClient client)
+ private long updateUnpartitionedTableFileMd(IMetaStoreClient client, String debugAction)
throws CatalogException {
Preconditions.checkState(getNumClusteringCols() == 0);
if (LOG.isTraceEnabled()) {
@@ -1245,7 +1268,7 @@ public class HdfsTable extends Table implements FeFsTable {
// partition instance to local catalog coordinators.
partBuilder.setPrevId(oldPartition.getId());
long fileMdLoadTime = loadFileMetadataForPartitions(client,
- ImmutableList.of(partBuilder), /*isRefresh=*/true);
+ ImmutableList.of(partBuilder), /*isRefresh=*/true, debugAction);
setUnpartitionedTableStats(partBuilder);
addPartition(partBuilder.build());
return fileMdLoadTime;
@@ -1277,7 +1300,7 @@ public class HdfsTable extends Table implements FeFsTable {
*/
private long updatePartitionsFromHms(IMetaStoreClient client,
Set<String> partitionsToUpdate, boolean loadPartitionFileMetadata,
- boolean refreshUpdatedPartitions) throws Exception {
+ boolean refreshUpdatedPartitions, String debugAction) throws Exception {
if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + getFullName());
org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
Preconditions.checkNotNull(msTbl);
@@ -1285,9 +1308,9 @@ public class HdfsTable extends Table implements FeFsTable {
Preconditions.checkState(loadPartitionFileMetadata || partitionsToUpdate == null);
PartitionDeltaUpdater deltaUpdater =
refreshUpdatedPartitions ? new PartBasedDeltaUpdater(client,
- loadPartitionFileMetadata, partitionsToUpdate)
+ loadPartitionFileMetadata, partitionsToUpdate, debugAction)
: new PartNameBasedDeltaUpdater(client, loadPartitionFileMetadata,
- partitionsToUpdate);
+ partitionsToUpdate, debugAction);
deltaUpdater.apply();
return deltaUpdater.loadTimeForFileMdNs_;
}
@@ -1310,12 +1333,14 @@ public class HdfsTable extends Table implements FeFsTable {
// if loadFileMd_ flag is set, files for these partitions will also be
// reloaded.
private final Set<String> partitionsToUpdate_;
+ private final String debugAction_;
PartitionDeltaUpdater(IMetaStoreClient client, boolean loadPartitionFileMetadata,
- Set<String> partitionsToUpdate) {
+ Set<String> partitionsToUpdate, String debugAction) {
this.client_ = client;
this.loadFileMd_ = loadPartitionFileMetadata;
this.partitionsToUpdate_ = partitionsToUpdate;
+ this.debugAction_ = debugAction;
}
/**
@@ -1407,7 +1432,7 @@ public class HdfsTable extends Table implements FeFsTable {
.collect(Collectors.toList());
}
loadTimeForFileMdNs_ += loadFileMetadataForPartitions(client_,
- partitionsToLoadFiles,/* isRefresh=*/true);
+ partitionsToLoadFiles,/* isRefresh=*/true, debugAction_);
updatePartitions(partitionsToLoadFiles);
}
}
@@ -1434,8 +1459,8 @@ public class HdfsTable extends Table implements FeFsTable {
public PartBasedDeltaUpdater(
IMetaStoreClient client, boolean loadPartitionFileMetadata,
- Set<String> partitionsToUpdate) throws Exception {
- super(client, loadPartitionFileMetadata, partitionsToUpdate);
+ Set<String> partitionsToUpdate, String debugAction) throws Exception {
+ super(client, loadPartitionFileMetadata, partitionsToUpdate, debugAction);
Stopwatch sw = Stopwatch.createStarted();
List<Partition> partitionList;
if (partitionsToUpdate != null) {
@@ -1533,8 +1558,8 @@ public class HdfsTable extends Table implements FeFsTable {
public PartNameBasedDeltaUpdater(
IMetaStoreClient client, boolean loadPartitionFileMetadata,
- Set<String> partitionsToUpdate) throws Exception {
- super(client, loadPartitionFileMetadata, partitionsToUpdate);
+ Set<String> partitionsToUpdate, String debugAction) throws Exception {
+ super(client, loadPartitionFileMetadata, partitionsToUpdate, debugAction);
// Retrieve all the partition names from the Hive Metastore. We need this to
// identify the delta between partitions of the local HdfsTable and the table entry
// in the Hive Metastore. Note: This is a relatively "cheap" operation
@@ -2230,8 +2255,10 @@ public class HdfsTable extends Table implements FeFsTable {
* Returns the HDFS paths corresponding to HdfsTable partitions that don't exist in
* the Hive Metastore. An HDFS path is represented as a list of strings values, one per
* partition key column.
+ * @param debugAction
*/
- public List<List<String>> getPathsWithoutPartitions() throws CatalogException {
+ public List<List<String>> getPathsWithoutPartitions(@Nullable String debugAction)
+ throws CatalogException {
Set<List<LiteralExpr>> existingPartitions = new HashSet<>();
// Get the list of partition values of existing partitions in Hive Metastore.
for (HdfsPartition partition: partitionMap_.values()) {
@@ -2247,6 +2274,7 @@ public class HdfsTable extends Table implements FeFsTable {
try {
getAllPartitionsNotInHms(basePath, partitionKeys, existingPartitions,
partitionsNotInHms);
+ DebugUtils.executeDebugAction(debugAction, DebugUtils.RECOVER_PARTITIONS_DELAY);
} catch (Exception e) {
throw new CatalogException(String.format("Failed to recover partitions for %s " +
"with exception:%s.", getFullName(), e));
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index e134d22..04e1bf9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -232,7 +232,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
loadSchemaFromIceberg();
// Loading hdfs table after loaded schema from Iceberg,
// in case we create external Iceberg table skipping column info in sql.
- hdfsTable_.load(false, msClient, msTable_, true, true, false, null, reason);
+ hdfsTable_.load(false, msClient, msTable_, true, true, false, null, null, reason);
pathHashToFileDescMap_ = Utils.loadAllPartition(this);
loadAllColumnStats(msClient);
} catch (Exception e) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
index 616bb50..8ac4e2d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
@@ -33,19 +33,14 @@ import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.impala.catalog.FeFsTable.Utils;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
-import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Pair;
-import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.BackendConfig;
-import org.apache.impala.thrift.TValidWriteIdList;
import org.apache.impala.util.ThreadNameAnnotator;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
@@ -76,8 +71,8 @@ public class ParallelFileMetadataLoader {
public ParallelFileMetadataLoader(HdfsTable table,
Collection<HdfsPartition.Builder> partBuilders,
- ValidWriteIdList writeIdList, ValidTxnList validTxnList, String logPrefix)
- throws CatalogException {
+ ValidWriteIdList writeIdList, ValidTxnList validTxnList, String debugAction,
+ String logPrefix) throws CatalogException {
if (writeIdList != null || validTxnList != null) {
// make sure that both either both writeIdList and validTxnList are set or both
// of them are not.
@@ -104,6 +99,7 @@ public class ParallelFileMetadataLoader {
boolean hasCachedPartition = Iterables.any(e.getValue(),
HdfsPartition.Builder::isMarkedCached);
loader.setForceRefreshBlockLocations(hasCachedPartition);
+ loader.setDebugAction(debugAction);
loaders_.put(e.getKey(), loader);
}
this.logPrefix_ = logPrefix;
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index b4a41b2..ea01c6a 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.impala.catalog.HdfsCompression;
+import org.apache.impala.util.DebugUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +54,7 @@ import java.util.UUID;
* Common utility functions for operating on FileSystem objects.
*/
public class FileSystemUtil {
+
private static final Configuration CONF = new Configuration();
private static final Logger LOG = LoggerFactory.getLogger(FileSystemUtil.class);
@@ -564,7 +566,7 @@ public class FileSystemUtil {
* Note that the order (breadth-first vs depth-first, sorted vs not) is undefined.
*/
public static RemoteIterator<? extends FileStatus> listStatus(FileSystem fs, Path p,
- boolean recursive) throws IOException {
+ boolean recursive, String debugAction) throws IOException {
try {
if (recursive) {
// The Hadoop FileSystem API doesn't provide a recursive listStatus call that
@@ -584,12 +586,12 @@ public class FileSystemUtil {
// even though it returns LocatedFileStatus objects with "fake" blocks which we
// will ignore.
if (isS3AFileSystem(fs)) {
- return listFiles(fs, p, true);
+ return listFiles(fs, p, true, debugAction);
}
-
+ DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY);
return new FilterIterator(p, new RecursingIterator(fs, p));
}
-
+ DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY);
return new FilterIterator(p, fs.listStatusIterator(p));
} catch (FileNotFoundException e) {
if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + p.toString(), e);
@@ -601,8 +603,9 @@ public class FileSystemUtil {
* Wrapper around FileSystem.listFiles(), similar to the listStatus() wrapper above.
*/
public static RemoteIterator<? extends FileStatus> listFiles(FileSystem fs, Path p,
- boolean recursive) throws IOException {
+ boolean recursive, String debugAction) throws IOException {
try {
+ DebugUtils.executeDebugAction(debugAction, DebugUtils.REFRESH_HDFS_LISTING_DELAY);
return new FilterIterator(p, fs.listFiles(p, recursive));
} catch (FileNotFoundException e) {
if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + p.toString(), e);
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 7f5aae2..05ff417 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -42,6 +42,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
@@ -197,6 +198,7 @@ import org.apache.impala.thrift.TUpdateCatalogResponse;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.AcidUtils.TblTransaction;
import org.apache.impala.util.CompressionUtil;
+import org.apache.impala.util.DebugUtils;
import org.apache.impala.util.FunctionUtils;
import org.apache.impala.util.HdfsCachingUtil;
import org.apache.impala.util.IcebergUtil;
@@ -358,7 +360,7 @@ public class CatalogOpExecutor {
TAlterTableParams alter_table_params = ddlRequest.getAlter_table_params();
tTableName = Optional.of(alter_table_params.getTable_name());
catalogOpMetric_.increment(ddl_type, tTableName);
- alterTable(alter_table_params, response);
+ alterTable(alter_table_params, ddlRequest.getDebug_action(), response);
break;
case ALTER_VIEW:
TCreateOrAlterViewParams alter_view_params = ddlRequest.getAlter_view_params();
@@ -632,7 +634,8 @@ public class CatalogOpExecutor {
* table metadata, except for RENAME, ADD PARTITION and DROP PARTITION. This call is
* thread-safe, i.e. concurrent operations on the same table are serialized.
*/
- private void alterTable(TAlterTableParams params, TDdlExecResponse response)
+ private void alterTable(TAlterTableParams params, @Nullable String debugAction,
+ TDdlExecResponse response)
throws ImpalaException {
// When true, loads the file/block metadata.
boolean reloadFileMetadata = false;
@@ -813,7 +816,7 @@ public class CatalogOpExecutor {
Preconditions.checkState(params.isSetUpdate_stats_params());
Reference<Long> numUpdatedColumns = new Reference<>(0L);
alterTableUpdateStats(tbl, params.getUpdate_stats_params(),
- numUpdatedPartitions, numUpdatedColumns);
+ numUpdatedPartitions, numUpdatedColumns, debugAction);
reloadTableSchema = true;
addSummary(response, "Updated " + numUpdatedPartitions.getRef() +
" partition(s) and " + numUpdatedColumns.getRef() + " column(s).");
@@ -834,7 +837,7 @@ public class CatalogOpExecutor {
}
break;
case RECOVER_PARTITIONS:
- alterTableRecoverPartitions(tbl);
+ alterTableRecoverPartitions(tbl, debugAction);
addSummary(response, "Partitions have been recovered.");
break;
case SET_OWNER:
@@ -942,7 +945,8 @@ public class CatalogOpExecutor {
getMetaStoreTable(msClient, tbl);
if (tbl instanceof HdfsTable) {
((HdfsTable) tbl).load(true, msClient.getHiveClient(), msTbl,
- reloadFileMetadata, reloadTableSchema, false, partitionsToUpdate, reason);
+ reloadFileMetadata, reloadTableSchema, false, partitionsToUpdate, null,
+ reason);
} else {
tbl.load(true, msClient.getHiveClient(), msTbl, reason);
}
@@ -1059,7 +1063,8 @@ public class CatalogOpExecutor {
* and 'numUpdatedColumns', respectively.
*/
private void alterTableUpdateStats(Table table, TAlterTableUpdateStatsParams params,
- Reference<Long> numUpdatedPartitions, Reference<Long> numUpdatedColumns)
+ Reference<Long> numUpdatedPartitions, Reference<Long> numUpdatedColumns,
+ @Nullable String debugAction)
throws ImpalaException {
Preconditions.checkState(table.getLock().isHeldByCurrentThread());
Preconditions.checkState(params.isSetTable_stats() || params.isSetColumn_stats());
@@ -1104,6 +1109,7 @@ public class CatalogOpExecutor {
throw ex;
}
}
+ DebugUtils.executeDebugAction(debugAction, DebugUtils.UPDATE_STATS_DELAY);
}
private void alterTableUpdateStatsInner(Table table,
@@ -3701,13 +3707,15 @@ public class CatalogOpExecutor {
* Recover partitions of specified table.
* Add partitions to metastore which exist in HDFS but not in metastore.
*/
- private void alterTableRecoverPartitions(Table tbl) throws ImpalaException {
+ private void alterTableRecoverPartitions(Table tbl, @Nullable String debugAction)
+ throws ImpalaException {
Preconditions.checkArgument(tbl.getLock().isHeldByCurrentThread());
if (!(tbl instanceof HdfsTable)) {
throw new CatalogException("Table " + tbl.getFullName() + " is not an HDFS table");
}
HdfsTable hdfsTable = (HdfsTable) tbl;
- List<List<String>> partitionsNotInHms = hdfsTable.getPathsWithoutPartitions();
+ List<List<String>> partitionsNotInHms = hdfsTable
+ .getPathsWithoutPartitions(debugAction);
if (partitionsNotInHms.isEmpty()) return;
List<Partition> hmsPartitions = Lists.newArrayList();
@@ -4327,7 +4335,7 @@ public class CatalogOpExecutor {
// 2: If no need for a full table reload then fetch partition level
// writeIds and reload only the ones that changed.
updatedThriftTable = catalog_
- .reloadTable(tbl, req.refresh_updated_hms_partitions, cmdString);
+ .reloadTable(tbl, req, cmdString);
}
} else {
// Table was loaded from scratch, so it's already "refreshed".
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 69301f9..3c0f57f 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -667,11 +667,21 @@ public class Frontend {
clientRequest.getRedacted_stmt() : clientRequest.getStmt());
ddl.getDdl_params().setHeader(header);
ddl.getDdl_params().setSync_ddl(ddl.isSync_ddl());
+ // forward debug_actions to the catalogd
+ if (result.getQuery_options().isSetDebug_action()) {
+ ddl.getDdl_params()
+ .setDebug_action(result.getQuery_options().getDebug_action());
+ }
}
if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) {
ddl.getReset_metadata_params().setSync_ddl(ddl.isSync_ddl());
ddl.getReset_metadata_params().setRefresh_updated_hms_partitions(
result.getQuery_options().isRefresh_updated_hms_partitions());
+ // forward debug_actions to the catalogd
+ if (result.getQuery_options().isSetDebug_action()) {
+ ddl.getReset_metadata_params()
+ .setDebug_action(result.getQuery_options().getDebug_action());
+ }
}
}
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
new file mode 100644
index 0000000..3c48940
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import java.util.List;
+import java.util.Random;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is the DebugAction equivalent from the backend (see DebugActionImpl in
+ * debug-util.cc). This is useful to execute certain debug actions (like Sleep, Jitter)
+ * which can be executed from the code. The debug actions are passed to the CatalogService
+ * using a query option (debug_action).
+ */
+public class DebugUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DebugUtils.class);
+ private static final Random random = new Random();
+
+ // debug action label for introducing HDFS listing delay during listFiles or statuses.
+ public static final String REFRESH_HDFS_LISTING_DELAY
+ = "catalogd_refresh_hdfs_listing_delay";
+
+ // debug action label for introducing delay in alter table recover partitions command.
+ public static final String RECOVER_PARTITIONS_DELAY = "catalogd_table_recover_delay";
+
+ // debug action label for introducing delay in update stats command.
+ public static final String UPDATE_STATS_DELAY = "catalogd_update_stats_delay";
+
+ /**
+ * Given list of debug actions, execute the debug action pertaining to the given label.
+ * The debugActions string is of the format specified for the query_option/configuration
+ * debug_actions. It is generally of the format
+ * LABEL:ACTION@ACTION_PARAMS|LABEL:ACTION@ACTION_PARAMS.
+ * For example, if the debug action configuration is:
+ * CATALOGD_HDFS_LISTING_DELAY:SLEEP@100|CATALOGD_HMS_RPC_DELAY:JITTER@100@0.2
+ * Then a when a label "CATALOGD_HDFS_LISTING_DELAY" is provided, this method will sleep
+ * for 100 milli-seconds. If the label CATALOGD_HMS_RPC_DELAY is provided, this method
+ * will sleep for a random value between 1-100 milli-seconds with a probability of 0.2.
+ *
+ * @param debugActions the debug actions with the format given in the description
+ * above.
+ * @param label the label of action which needs to be executed.
+ */
+ public static void executeDebugAction(String debugActions, String label) {
+ if (Strings.isNullOrEmpty(debugActions)) {
+ return;
+ }
+ List<String> actions = Splitter.on('|').splitToList(debugActions);
+ for (String action : actions) {
+ List<String> components = Splitter.on(':').splitToList(action);
+ if (components.isEmpty()) continue;
+ if (!components.get(0).equalsIgnoreCase(label)) continue;
+ // found the debug action for the given label
+ // get the debug action params
+ Preconditions.checkState(components.size() > 1,
+ "Invalid debug action " + action);
+ List<String> actionParams = Splitter.on('@').splitToList(components.get(1));
+ Preconditions.checkState(actionParams.size() > 1,
+ "Illegal debug action format found in " + debugActions + " for label"
+ + label);
+ switch (actionParams.get(0)) {
+ case "SLEEP":
+ // the debug action params should be of the format SLEEP@<millis>
+ Preconditions.checkState(actionParams.size() == 2);
+ try {
+ int timeToSleepMs = Integer.parseInt(actionParams.get(1).trim());
+ LOG.trace("Sleeping for {} msec to execute debug action {}",
+ timeToSleepMs, label);
+ Thread.sleep(timeToSleepMs);
+ } catch (NumberFormatException ex) {
+ LOG.error("Invalid number format in debug action {}", action);
+ } catch (InterruptedException e) {
+ LOG.warn("Sleep interrupted for the debug action {}", label);
+ }
+ break;
+ case "JITTER":
+ // the JITTER debug action is of format JITTER@<millis>[@<probability>}
+ Preconditions.checkState(actionParams.size() <= 3);
+ try {
+ int maxTimeToSleepMs = Integer.parseInt(actionParams.get(1).trim());
+ boolean shouldExecute = true;
+ if (actionParams.size() == 3) {
+ shouldExecute = parseProbability(actionParams.get(2));
+ }
+ if (!shouldExecute) {
+ continue;
+ }
+ long timeToSleepMs = random.nextInt(maxTimeToSleepMs);
+ LOG.trace("Sleeping for {} msec to execute debug action {}",
+ timeToSleepMs, action);
+ Thread.sleep(timeToSleepMs);
+ } catch (NumberFormatException ex) {
+ LOG.error("Invalid number format in debug action {}", action);
+ } catch (InterruptedException ex) {
+ LOG.warn("Sleep interrupted for the debug action {}", label);
+ }
+ break;
+ default:
+ LOG.error("Debug action {} is not implemented", actionParams.get(0));
+ }
+ }
+ }
+
+
+ /**
+ * Parses the probability action parameter of a debug action.
+ *
+ * @return true if the action should be executed, else false.
+ */
+ private static boolean parseProbability(String probability) {
+ double p = Double.parseDouble(probability.trim());
+ if (p <= 0 || p > 1.0) {
+ return false;
+ }
+ return random.nextDouble() < p;
+ }
+}
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 27e1ab1..50e7f85 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -911,7 +911,7 @@ public class MetastoreEventsProcessorTest {
}
List<String> filesCopied = new ArrayList<>();
RemoteIterator<? extends FileStatus> it = FileSystemUtil
- .listStatus(srcFs, src, true);
+ .listStatus(srcFs, src, true, null);
while (it.hasNext()) {
FileStatus status = it.next();
if (status.isDirectory()) continue;
diff --git a/fe/src/test/java/org/apache/impala/util/DebugUtilsTest.java b/fe/src/test/java/org/apache/impala/util/DebugUtilsTest.java
new file mode 100644
index 0000000..9a19c84
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/util/DebugUtilsTest.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the debug actions implementation.
+ */
+public class DebugUtilsTest {
+
+ @Test
+ public void testSleepDebugAction() {
+ DebugUtils.executeDebugAction("TEST_SLEEP_ACTION:SLEEP@1", "test_sleep_action");
+ long startTime = System.currentTimeMillis();
+ DebugUtils
+ .executeDebugAction("TEST_SLEEP_ACTION:SLEEP@100|SOME_OTHER_ACTION:SLEEP@10",
+ "SOME_OTHER_ACTION");
+ long endTime = System.currentTimeMillis();
+ // make sure you are executing the right sleep action
+ Assert.assertTrue(endTime - startTime < 100 && endTime - startTime >= 10);
+ // make sure that code doesn't throw if label is not found
+ DebugUtils.executeDebugAction("TEST_SLEEP_ACTION:SLEEP@1", "INVALID_LABEL");
+ // make sure that code doesn't throw if there is a unsupported action type
+ DebugUtils.executeDebugAction("TEST_SLEEP_ACTION:NOT_FOUND@1", "TEST_SLEEP_ACTION");
+ }
+
+ @Test(expected = Exception.class)
+ public void testSleepDebugActionNegative() throws Exception {
+ DebugUtils.executeDebugAction("TEST_SLEEP_ACTION:SLEEP10", "TEST_SLEEP_ACTION");
+ DebugUtils.executeDebugAction("TEST_SLEEP_ACTION|SLEEP10", "TEST_SLEEP_ACTION");
+ DebugUtils.executeDebugAction("TEST_SLEEP_ACTION@SLEEP:10", "TEST_SLEEP_ACTION");
+ }
+
+ @Test
+ public void testJitter() {
+ DebugUtils.executeDebugAction("TEST_JITTER_ACTION:JITTER@1", "test_jitter_action");
+ long startTime = System.currentTimeMillis();
+ DebugUtils.executeDebugAction(
+ "SOME_OTHER_ACTION:SLEEP@100|TEST_JITTER_ACTION:JITTER@10@0.2",
+ "test_jitter_action");
+ long endTime = System.currentTimeMillis();
+ Assert.assertTrue(endTime - startTime < 100);
+ }
+
+ @Test(expected = Exception.class)
+ public void testJitterNegative() throws Exception {
+ DebugUtils.executeDebugAction("TEST_JITTER_ACTION@JITTER:1", "test_jitter_action");
+ DebugUtils.executeDebugAction("TEST_JITTER_ACTION:JITTER", "test_jitter_action");
+ }
+}
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index ac11ed3..5d1a1fa 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -842,6 +842,17 @@ class ImpalaTestSuite(BaseTestSuite):
def execute_query(self, query, query_options=None):
return self.__execute_query(self.client, query, query_options)
+ def exec_and_time(self, query, query_options=None, impalad=0):
+ """Executes a given query on the given impalad and returns the time taken in
+ millisecondsas seen by the client."""
+ client = self.create_client_for_nth_impalad(impalad)
+ if query_options is not None:
+ client.set_configuration(query_options)
+ start_time = int(round(time.time() * 1000))
+ client.execute(query)
+ end_time = int(round(time.time() * 1000))
+ return end_time - start_time
+
def execute_query_using_client(self, client, query, vector):
self.change_database(client, vector.get_value('table_format'))
query_options = vector.get_value('exec_option')
diff --git a/tests/metadata/test_catalogd_debug_actions.py b/tests/metadata/test_catalogd_debug_actions.py
new file mode 100644
index 0000000..0cf3c8b
--- /dev/null
+++ b/tests/metadata/test_catalogd_debug_actions.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+
+class TestDebugActions(ImpalaTestSuite):
+
+ @pytest.mark.execute_serially
+ def test_catalogd_debug_actions(self, unique_database):
+ self.client.execute("refresh tpcds.store_sales")
+ self.client.execute(
+ "create table {0}.test like functional.alltypes".format(unique_database))
+ self.client.execute("insert into {0}.test partition (year,month) "
+ "select * from functional.alltypes".format(unique_database))
+ self.client.execute("compute stats {0}.test".format(unique_database))
+ self.__run_debug_action("refresh tpcds.store_sales",
+ debug_action="catalogd_refresh_hdfs_listing_delay:SLEEP@50", delta=2000)
+ self.__run_debug_action("refresh tpcds.store_sales",
+ debug_action="catalogd_refresh_hdfs_listing_delay:JITTER@50@0.5", delta=2000)
+ self.__run_debug_action(
+ "alter table {0}.test recover partitions".format(unique_database),
+ debug_action="catalogd_table_recover_delay:SLEEP@3000", delta=2000)
+ # the variance of compute stats statement could itself be within few hundred
+ # millisecs hence adding additional delay of 4000 doesn't necessarily slow down the
+ # query by 4000 ms always.
+ self.__run_debug_action("compute stats {0}.test".format(unique_database),
+ debug_action="catalogd_update_stats_delay:SLEEP@4000", delta=3000)
+
+ def __run_debug_action(self, query, debug_action, delta):
+ """Test makes sure that the given debug_action is set is indeed causing the query
+ to run slower."""
+ time_taken_before = self.exec_and_time(query)
+ time_taken_after = self.exec_and_time(query, {"debug_action": debug_action})
+ assert (time_taken_after - time_taken_before) > delta