You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/10/02 17:33:23 UTC
[3/8] impala git commit: Revert "IMPALA-7622: adds profile metrics
when fetching incremental stats"
Revert "IMPALA-7622: adds profile metrics when fetching incremental stats"
Breaks downstream dependence on profile (1/2 of changes).
This reverts commit 235748316c5cada5c58b3e84a4e20ee57f1c4a49.
Change-Id: I80b4c0e4b8487572285ac788ab0195896f221842
Reviewed-on: http://gerrit.cloudera.org:8080/11551
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d918b2ae
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d918b2ae
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d918b2ae
Branch: refs/heads/master
Commit: d918b2aeb582ca465dc3e5066a77a7b4dab39641
Parents: 10bffe2
Author: Vuk Ercegovac <ve...@cloudera.com>
Authored: Mon Oct 1 10:34:01 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon Oct 1 21:33:43 2018 +0000
----------------------------------------------------------------------
.../impala/analysis/ComputeStatsStmt.java | 43 +----------------
tests/common/custom_cluster_test_suite.py | 2 +-
tests/custom_cluster/test_pull_stats.py | 51 --------------------
3 files changed, 3 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/d918b2ae/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 24f387c..36f88f2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -46,18 +46,15 @@ import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.CatalogOpExecutor;
-import org.apache.impala.service.FrontendProfile;
import org.apache.impala.thrift.TComputeStatsParams;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TGetPartitionStatsResponse;
import org.apache.impala.thrift.TPartitionStats;
import org.apache.impala.thrift.TTableName;
-import org.apache.impala.thrift.TUnit;
import org.apache.log4j.Logger;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -117,21 +114,6 @@ public class ComputeStatsStmt extends StatementBase {
private static String AVRO_SCHEMA_MSG_SUFFIX = "Please re-create the table with " +
"column definitions, e.g., using the result of 'SHOW CREATE TABLE'";
- // Metrics collected when fetching incremental statistics from Catalogd. All metrics
- // are per query.
- private static final String STATS_FETCH_PREFIX = "StatsFetch";
- // Time (ms) needed to fetch all partitions stats from catalogd.
- private static final String STATS_FETCH_TIME = STATS_FETCH_PREFIX + ".Time";
- // Number of compressed bytes received for all partitions.
- private static final String STATS_FETCH_COMPRESSED_BYTES =
- STATS_FETCH_PREFIX + ".CompressedBytes";
- // Number of partitions sent from Catalogd.
- private static final String STATS_FETCH_TOTAL_PARTITIONS =
- STATS_FETCH_PREFIX + ".TotalPartitions";
- // Number of partitions sent from Catalogd that include statistics.
- private static final String STATS_FETCH_NUM_PARTITIONS_WITH_STATS =
- STATS_FETCH_PREFIX + ".NumPartitionsWithStats";
-
protected final TableName tableName_;
protected final TableSampleClause sampleParams_;
@@ -645,6 +627,8 @@ public class ComputeStatsStmt extends StatementBase {
* - incremental statistics are present
* - the partition is whitelisted in 'partitions'
* - the partition is present in the local impalad catalog
+ * TODO(vercegovac): Add metrics to track time spent for these rpc's when fetching
+ * from catalog. Look into adding to timeline.
* TODO(vercegovac): Look into parallelizing the fetch while child-queries are
* running. Easiest would be to move this fetch to the backend.
*/
@@ -654,10 +638,6 @@ public class ComputeStatsStmt extends StatementBase {
Preconditions.checkState(BackendConfig.INSTANCE.pullIncrementalStatistics()
&& !RuntimeEnv.INSTANCE.isTestEnv());
if (partitions.isEmpty()) return Collections.emptyMap();
- Stopwatch sw = new Stopwatch().start();
- int numCompressedBytes = 0;
- int totalPartitions = 0;
- int numPartitionsWithStats = 0;
try {
TGetPartitionStatsResponse response =
analyzer.getCatalog().getPartitionStats(table.getTableName());
@@ -677,19 +657,16 @@ public class ComputeStatsStmt extends StatementBase {
// local catalogs are returned.
Map<Long, TPartitionStats> partitionStats =
Maps.newHashMapWithExpectedSize(partitions.size());
- totalPartitions = partitions.size();
for (FeFsPartition part: partitions) {
ByteBuffer compressedStats = response.partition_stats.get(
FeCatalogUtils.getPartitionName(part));
if (compressedStats != null) {
byte[] compressedStatsBytes = new byte[compressedStats.remaining()];
- numCompressedBytes += compressedStatsBytes.length;
compressedStats.get(compressedStatsBytes);
TPartitionStats remoteStats =
PartitionStatsUtil.partStatsFromCompressedBytes(
compressedStatsBytes, part);
if (remoteStats != null && remoteStats.isSetIntermediate_col_stats()) {
- ++numPartitionsWithStats;
partitionStats.put(part.getId(), remoteStats);
}
}
@@ -698,26 +675,10 @@ public class ComputeStatsStmt extends StatementBase {
} catch (Exception e) {
Throwables.propagateIfInstanceOf(e, AnalysisException.class);
throw new AnalysisException("Error fetching partition statistics", e);
- } finally {
- recordFetchMetrics(numCompressedBytes, totalPartitions, numPartitionsWithStats, sw);
}
}
/**
- * Adds metrics to the frontend profile when fetching incremental stats from catalogd.
- */
- private static void recordFetchMetrics(int numCompressedBytes,
- int totalPartitions, int numPartitionsWithStats, Stopwatch stopwatch) {
- FrontendProfile profile = FrontendProfile.getCurrentOrNull();
- if (profile == null) return;
- profile.addToCounter(STATS_FETCH_COMPRESSED_BYTES, TUnit.BYTES, numCompressedBytes);
- profile.addToCounter(STATS_FETCH_TOTAL_PARTITIONS, TUnit.NONE, totalPartitions);
- profile.addToCounter(STATS_FETCH_NUM_PARTITIONS_WITH_STATS, TUnit.NONE,
- numPartitionsWithStats);
- profile.addToCounter(STATS_FETCH_TIME, TUnit.TIME_MS, stopwatch.elapsedMillis());
- }
-
- /**
* Analyzes the TABLESAMPLE clause and computes the files sample to set
* 'effectiveSamplePerc_'.
* Returns the TABLESAMPLE SQL to be used for all child queries or an empty string if
http://git-wip-us.apache.org/repos/asf/impala/blob/d918b2ae/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index fd2c69e..8fc24c2 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -187,7 +187,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
if pytest.config.option.pull_incremental_statistics:
cmd.append("--impalad_args=%s --catalogd_args=%s" %
- ("--pull_incremental_statistics", "--pull_incremental_statistics"))
+ ("--pull_incremental_statistcs", "--pull_incremental_statistics"))
default_query_option_kvs = []
# Put any defaults first, then any arguments after that so they can override defaults.
http://git-wip-us.apache.org/repos/asf/impala/blob/d918b2ae/tests/custom_cluster/test_pull_stats.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_pull_stats.py b/tests/custom_cluster/test_pull_stats.py
index b852f3d..e470ead 100644
--- a/tests/custom_cluster/test_pull_stats.py
+++ b/tests/custom_cluster/test_pull_stats.py
@@ -31,54 +31,3 @@ class TestPullStatistics(CustomClusterTestSuite):
catalogd_args="--pull_incremental_statistics=true")
def test_pull_stats(self, vector, unique_database):
self.run_test_case('QueryTest/compute-stats-incremental', vector, unique_database)
-
- @pytest.mark.execute_serially
- @CustomClusterTestSuite.with_args(impalad_args="--pull_incremental_statistics=true",
- catalogd_args="--pull_incremental_statistics=true")
- def test_pull_stats_profile(self, vector, unique_database):
- """Checks that the frontend profile includes metrics when computing
- incremental statistics.
- """
- try:
- client = self.cluster.impalads[0].service.create_beeswax_client()
- create = "create table test like functional.alltypes"
- load = "insert into test partition(year, month) select * from functional.alltypes"
- insert = """insert into test partition(year=2009, month=1) values
- (29349999, true, 4, 4, 4, 40,4.400000095367432,40.4,
- "10/21/09","4","2009-10-21 03:24:09.600000000")"""
- stats_all = "compute incremental stats test"
- stats_part = "compute incremental stats test partition (year=2009,month=1)"
-
- # Checks that profile does not have metrics for incremental stats when
- # the operation is not 'compute incremental stats'.
- self.execute_query_expect_success(client, "use %s" % unique_database)
- profile = self.execute_query_expect_success(client, create).runtime_profile
- assert profile.count("StatsFetch") == 0
- # Checks that incremental stats metrics are present when 'compute incremental
- # stats' is run. Since the table has no stats, expect that no bytes are fetched.
- self.execute_query_expect_success(client, load)
- profile = self.execute_query_expect_success(client, stats_all).runtime_profile
- assert profile.count("StatsFetch") > 1
- assert profile.count("StatsFetch.CompressedBytes: 0") == 1
- # Checks that bytes fetched is non-zero since incremental stats are present now
- # and should have been fetched.
- self.execute_query_expect_success(client, insert)
- profile = self.execute_query_expect_success(client, stats_part).runtime_profile
- assert profile.count("StatsFetch") > 1
- assert profile.count("StatsFetch.CompressedBytes") == 1
- assert profile.count("StatsFetch.CompressedBytes: 0") == 0
- # Adds a partition, computes stats, and checks that the metrics in the profile
- # reflect the operation.
- alter = "alter table test add partition(year=2011, month=1)"
- insert_new_partition = """
- insert into test partition(year=2011, month=1) values
- (29349999, true, 4, 4, 4, 40,4.400000095367432,40.4,
- "10/21/09","4","2009-10-21 03:24:09.600000000")
- """
- self.execute_query_expect_success(client, alter)
- self.execute_query_expect_success(client, insert_new_partition)
- profile = self.execute_query_expect_success(client, stats_all).runtime_profile
- assert profile.count("StatsFetch.TotalPartitions: 25") == 1
- assert profile.count("StatsFetch.NumPartitionsWithStats: 24") == 1
- finally:
- client.close()