You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2018/10/13 03:27:49 UTC

[4/5] impala git commit: IMPALA-7622: adds profile metrics for incremental stats

IMPALA-7622: adds profile metrics for incremental stats

Reapplies change after fixing where frontend profile is placed in runtime
profile.

When computing incremental statistics by fetching the stats directly
from catalogd, a potentially expensive RPC is made from the impalad
coordinator to catalogd. This change adds metrics to the frontend
section of the profile to track how long the request takes, the size
of the compressed bytes received, and the number of partitions received.

The profile for a 'compute incremental ...' command on a table with
no statistics looks like this:

Frontend:
     - StatsFetch.CompressedBytes: 0
     - StatsFetch.TotalPartitions: 24
     - StatsFetch.NumPartitionsWithStats: 0
     - StatsFetch.Time: 26ms

And the profile looks as follows when the table has stats, so the stats
are fetched:

Frontend:
     - StatsFetch.CompressedBytes: 24622
     - StatsFetch.TotalPartitions: 23
     - StatsFetch.NumPartitionsWithStats: 23
     - StatsFetch.Time: 14ms

Testing:
- manual inspection
- e2e test to check the profile

Change-Id: I94559a749500d44aa6aad564134d55c39e1d5273
Reviewed-on: http://gerrit.cloudera.org:8080/11670
Reviewed-by: Tianyi Wang <tw...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/97f02829
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/97f02829
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/97f02829

Branch: refs/heads/master
Commit: 97f028299c9d9d7493bdbeaacbf0a288678f9371
Parents: a80ec4a
Author: Vuk Ercegovac <ve...@cloudera.com>
Authored: Wed Sep 26 16:14:43 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Oct 12 23:44:42 2018 +0000

----------------------------------------------------------------------
 .../impala/analysis/ComputeStatsStmt.java       | 43 ++++++++++++++++-
 tests/common/custom_cluster_test_suite.py       |  2 +-
 tests/custom_cluster/test_pull_stats.py         | 51 ++++++++++++++++++++
 3 files changed, 93 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/97f02829/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 36f88f2..24f387c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -46,15 +46,18 @@ import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.CatalogOpExecutor;
+import org.apache.impala.service.FrontendProfile;
 import org.apache.impala.thrift.TComputeStatsParams;
 import org.apache.impala.thrift.TErrorCode;
 import org.apache.impala.thrift.TGetPartitionStatsResponse;
 import org.apache.impala.thrift.TPartitionStats;
 import org.apache.impala.thrift.TTableName;
+import org.apache.impala.thrift.TUnit;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -114,6 +117,21 @@ public class ComputeStatsStmt extends StatementBase {
   private static String AVRO_SCHEMA_MSG_SUFFIX = "Please re-create the table with " +
           "column definitions, e.g., using the result of 'SHOW CREATE TABLE'";
 
+  // Metrics collected when fetching incremental statistics from Catalogd. All metrics
+  // are per query.
+  private static final String STATS_FETCH_PREFIX = "StatsFetch";
+  // Time (ms) needed to fetch all partitions stats from catalogd.
+  private static final String STATS_FETCH_TIME = STATS_FETCH_PREFIX + ".Time";
+  // Number of compressed bytes received for all partitions.
+  private static final String STATS_FETCH_COMPRESSED_BYTES =
+      STATS_FETCH_PREFIX + ".CompressedBytes";
+  // Number of partitions sent from Catalogd.
+  private static final String STATS_FETCH_TOTAL_PARTITIONS =
+      STATS_FETCH_PREFIX + ".TotalPartitions";
+  // Number of partitions sent from Catalogd that include statistics.
+  private static final String STATS_FETCH_NUM_PARTITIONS_WITH_STATS =
+      STATS_FETCH_PREFIX + ".NumPartitionsWithStats";
+
   protected final TableName tableName_;
   protected final TableSampleClause sampleParams_;
 
@@ -627,8 +645,6 @@ public class ComputeStatsStmt extends StatementBase {
    * - incremental statistics are present
    * - the partition is whitelisted in 'partitions'
    * - the partition is present in the local impalad catalog
-   * TODO(vercegovac): Add metrics to track time spent for these rpc's when fetching
-   *                   from catalog. Look into adding to timeline.
    * TODO(vercegovac): Look into parallelizing the fetch while child-queries are
    *                   running. Easiest would be to move this fetch to the backend.
    */
@@ -638,6 +654,10 @@ public class ComputeStatsStmt extends StatementBase {
     Preconditions.checkState(BackendConfig.INSTANCE.pullIncrementalStatistics()
         && !RuntimeEnv.INSTANCE.isTestEnv());
     if (partitions.isEmpty()) return Collections.emptyMap();
+    Stopwatch sw = new Stopwatch().start();
+    int numCompressedBytes = 0;
+    int totalPartitions = 0;
+    int numPartitionsWithStats = 0;
     try {
       TGetPartitionStatsResponse response =
           analyzer.getCatalog().getPartitionStats(table.getTableName());
@@ -657,16 +677,19 @@ public class ComputeStatsStmt extends StatementBase {
       // local catalogs are returned.
       Map<Long, TPartitionStats> partitionStats =
           Maps.newHashMapWithExpectedSize(partitions.size());
+      totalPartitions = partitions.size();
       for (FeFsPartition part: partitions) {
         ByteBuffer compressedStats = response.partition_stats.get(
             FeCatalogUtils.getPartitionName(part));
         if (compressedStats != null) {
           byte[] compressedStatsBytes = new byte[compressedStats.remaining()];
+          numCompressedBytes += compressedStatsBytes.length;
           compressedStats.get(compressedStatsBytes);
           TPartitionStats remoteStats =
               PartitionStatsUtil.partStatsFromCompressedBytes(
                   compressedStatsBytes, part);
           if (remoteStats != null && remoteStats.isSetIntermediate_col_stats()) {
+            ++numPartitionsWithStats;
             partitionStats.put(part.getId(), remoteStats);
           }
         }
@@ -675,10 +698,26 @@ public class ComputeStatsStmt extends StatementBase {
     } catch (Exception e) {
       Throwables.propagateIfInstanceOf(e, AnalysisException.class);
       throw new AnalysisException("Error fetching partition statistics", e);
+    } finally {
+      recordFetchMetrics(numCompressedBytes, totalPartitions, numPartitionsWithStats, sw);
     }
   }
 
   /**
+   * Adds metrics to the frontend profile when fetching incremental stats from catalogd.
+   */
+  private static void recordFetchMetrics(int numCompressedBytes,
+      int totalPartitions, int numPartitionsWithStats, Stopwatch stopwatch) {
+    FrontendProfile profile = FrontendProfile.getCurrentOrNull();
+    if (profile == null) return;
+    profile.addToCounter(STATS_FETCH_COMPRESSED_BYTES, TUnit.BYTES, numCompressedBytes);
+    profile.addToCounter(STATS_FETCH_TOTAL_PARTITIONS, TUnit.NONE, totalPartitions);
+    profile.addToCounter(STATS_FETCH_NUM_PARTITIONS_WITH_STATS, TUnit.NONE,
+        numPartitionsWithStats);
+    profile.addToCounter(STATS_FETCH_TIME, TUnit.TIME_MS, stopwatch.elapsedMillis());
+  }
+
+  /**
    * Analyzes the TABLESAMPLE clause and computes the files sample to set
    * 'effectiveSamplePerc_'.
    * Returns the TABLESAMPLE SQL to be used for all child queries or an empty string if

http://git-wip-us.apache.org/repos/asf/impala/blob/97f02829/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 4274abf..0140303 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -194,7 +194,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
 
     if pytest.config.option.pull_incremental_statistics:
       cmd.append("--impalad_args=%s --catalogd_args=%s" %
-                 ("--pull_incremental_statistcs", "--pull_incremental_statistics"))
+                 ("--pull_incremental_statistics", "--pull_incremental_statistics"))
 
     default_query_option_kvs = []
     # Put any defaults first, then any arguments after that so they can override defaults.

http://git-wip-us.apache.org/repos/asf/impala/blob/97f02829/tests/custom_cluster/test_pull_stats.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_pull_stats.py b/tests/custom_cluster/test_pull_stats.py
index e470ead..b852f3d 100644
--- a/tests/custom_cluster/test_pull_stats.py
+++ b/tests/custom_cluster/test_pull_stats.py
@@ -31,3 +31,54 @@ class TestPullStatistics(CustomClusterTestSuite):
                                     catalogd_args="--pull_incremental_statistics=true")
   def test_pull_stats(self, vector, unique_database):
     self.run_test_case('QueryTest/compute-stats-incremental', vector, unique_database)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="--pull_incremental_statistics=true",
+                                    catalogd_args="--pull_incremental_statistics=true")
+  def test_pull_stats_profile(self, vector, unique_database):
+    """Checks that the frontend profile includes metrics when computing
+       incremental statistics.
+    """
+    try:
+      client = self.cluster.impalads[0].service.create_beeswax_client()
+      create = "create table test like functional.alltypes"
+      load = "insert into test partition(year, month) select * from functional.alltypes"
+      insert = """insert into test partition(year=2009, month=1) values
+                  (29349999, true, 4, 4, 4, 40,4.400000095367432,40.4,
+                  "10/21/09","4","2009-10-21 03:24:09.600000000")"""
+      stats_all = "compute incremental stats test"
+      stats_part = "compute incremental stats test partition (year=2009,month=1)"
+
+      # Checks that profile does not have metrics for incremental stats when
+      # the operation is not 'compute incremental stats'.
+      self.execute_query_expect_success(client, "use %s" % unique_database)
+      profile = self.execute_query_expect_success(client, create).runtime_profile
+      assert profile.count("StatsFetch") == 0
+      # Checks that incremental stats metrics are present when 'compute incremental
+      # stats' is run. Since the table has no stats, expect that no bytes are fetched.
+      self.execute_query_expect_success(client, load)
+      profile = self.execute_query_expect_success(client, stats_all).runtime_profile
+      assert profile.count("StatsFetch") > 1
+      assert profile.count("StatsFetch.CompressedBytes: 0") == 1
+      # Checks that bytes fetched is non-zero since incremental stats are present now
+      # and should have been fetched.
+      self.execute_query_expect_success(client, insert)
+      profile = self.execute_query_expect_success(client, stats_part).runtime_profile
+      assert profile.count("StatsFetch") > 1
+      assert profile.count("StatsFetch.CompressedBytes") == 1
+      assert profile.count("StatsFetch.CompressedBytes: 0") == 0
+      # Adds a partition, computes stats, and checks that the metrics in the profile
+      # reflect the operation.
+      alter = "alter table test add partition(year=2011, month=1)"
+      insert_new_partition = """
+          insert into test partition(year=2011, month=1) values
+          (29349999, true, 4, 4, 4, 40,4.400000095367432,40.4,
+          "10/21/09","4","2009-10-21 03:24:09.600000000")
+          """
+      self.execute_query_expect_success(client, alter)
+      self.execute_query_expect_success(client, insert_new_partition)
+      profile = self.execute_query_expect_success(client, stats_all).runtime_profile
+      assert profile.count("StatsFetch.TotalPartitions: 25") == 1
+      assert profile.count("StatsFetch.NumPartitionsWithStats: 24") == 1
+    finally:
+      client.close()