You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2024/01/04 19:39:57 UTC

(impala) 02/03: IMPALA-11553: Add event specific metrics in the table metrics

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 324a1aa37e4abecc73e8ccd39cca75cfcc54791e
Author: Sai Hemanth Gantasala <sa...@cloudera.com>
AuthorDate: Thu Sep 7 18:09:19 2023 -0700

    IMPALA-11553: Add event specific metrics in the table metrics
    
    This patch adds an event specific metric "avg-events-process-duration"
    at the table level metrics. This metric is also extended to last 1min,
    5mins, 15mins duration. This metric is useful to identify the average
    events processed duration on the table. This is helpful to identify if
    a particular table is causing event procssor lagging and as a temporary
    workaround, event processing can be disabled on that table.
    
    Another metric is also added in the event processor summary page,
    "events-consuming-delay-ms", is the time difference in milliseconds of
    the event created in the metastore and event processed by event
    processor. This is another useful metric to gauge how the event
    processor is lagging.
    
    Tests:
      - Manually verified the metrics on catalogD UI page when running some
    hive workloads.
    
    Change-Id: I2428029361e610a0fcd8ed11be2ab771f03b00dd
    Reviewed-on: http://gerrit.cloudera.org:8080/20473
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../main/java/org/apache/impala/catalog/Table.java | 10 +++
 .../impala/catalog/events/MetastoreEvents.java     | 73 +++++++++++++++-------
 .../catalog/events/MetastoreEventsProcessor.java   |  9 +++
 tests/custom_cluster/test_events_custom_configs.py |  4 +-
 tests/webserver/test_web_pages.py                  |  4 ++
 5 files changed, 76 insertions(+), 24 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index eb8a863bd..97e3f4042 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -61,6 +61,7 @@ import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.HdfsCachingUtil;
 
+import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -196,6 +197,12 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
 
   public static final AtomicInteger LOADING_TABLES = new AtomicInteger(0);
 
+  // Table property key to determine the table's events process duration
+  public static final String TBL_EVENTS_PROCESS_DURATION = "events-process-duration";
+
+  // The last sync event id of the table
+  public static final String LAST_SYNC_EVENT_ID = "last-sync-event-id";
+
   // this field represents the last event id in metastore upto which this table is
   // synced. It is used if the flag sync_to_latest_event_on_ddls is set to true.
   // Making it as volatile so that read and write of this variable are thread safe.
@@ -382,6 +389,9 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
     metrics_.addTimer(HMS_LOAD_TBL_SCHEMA);
     metrics_.addTimer(LOAD_DURATION_ALL_COLUMN_STATS);
     metrics_.addCounter(NUMBER_OF_INFLIGHT_EVENTS);
+    metrics_.addTimer(TBL_EVENTS_PROCESS_DURATION);
+    metrics_.addGauge(LAST_SYNC_EVENT_ID,
+        (Gauge<Long>) () -> Long.valueOf(lastSyncedEventId_));
   }
 
   public Metrics getMetrics() { return metrics_; }
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 0e40c37bf..f6ffdb1b5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.catalog.events;
 
+import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -70,6 +71,8 @@ import org.apache.impala.common.Reference;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.hive.common.MutableValidWriteIdList;
 
+import static org.apache.impala.catalog.Table.TBL_EVENTS_PROCESS_DURATION;
+
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.thrift.TPartitionKeyValue;
@@ -1092,6 +1095,25 @@ public class MetastoreEvents {
       }
       return false;
     }
+
+    @Override
+    protected void process() throws MetastoreNotificationException, CatalogException {
+      Timer.Context context = null;
+      org.apache.impala.catalog.Table tbl = catalog_.getTableNoThrow(dbName_, tblName_);
+      if (tbl != null) {
+        context = tbl.getMetrics().getTimer(TBL_EVENTS_PROCESS_DURATION).time();
+      }
+      try {
+        processTableEvent();
+      } finally {
+        if (context != null) {
+          context.stop();
+        }
+      }
+    }
+
+    protected abstract void processTableEvent() throws MetastoreNotificationException,
+        CatalogException;
   }
 
   /**
@@ -1199,7 +1221,7 @@ public class MetastoreEvents {
      * overridden. Else, it will ignore the event
      */
     @Override
-    public void process() throws MetastoreNotificationException {
+    public void processTableEvent() throws MetastoreNotificationException {
       // check if the table exists already. This could happen in corner cases of the
       // table being dropped and recreated with the same name or in case this event is
       // a self-event (see description of self-event in the class documentation of
@@ -1348,7 +1370,7 @@ public class MetastoreEvents {
     }
 
     @Override
-    public void process() throws MetastoreNotificationException {
+    public void processTableEvent() throws MetastoreNotificationException {
       if (isSelfEvent()) {
         infoLog("Not processing the insert event as it is a self-event");
         return;
@@ -1515,7 +1537,8 @@ public class MetastoreEvents {
      * table on the tblName from the event
      */
     @Override
-    public void process() throws MetastoreNotificationException, CatalogException {
+    public void processTableEvent() throws MetastoreNotificationException,
+        CatalogException {
       if (isRename_) {
         processRename();
         return;
@@ -1778,9 +1801,10 @@ public class MetastoreEvents {
      * not a huge problem since the tables will eventually be created when the
      * create events are processed but there will be a non-zero amount of time when the
      * table will not be existing in catalog.
+     * TODO: IMPALA-12646, to track average process time for drop operations.
      */
     @Override
-    public void process() throws MetastoreNotificationException {
+    public void processTableEvent() throws MetastoreNotificationException {
       Reference<Boolean> tblRemovedLater = new Reference<>();
       boolean removedTable;
       removedTable = catalogOpExecutor_
@@ -2105,9 +2129,11 @@ public class MetastoreEvents {
     }
 
     @Override
-    public void process() throws MetastoreNotificationException, CatalogException {
+    public void processTableEvent() throws MetastoreNotificationException,
+        CatalogException {
       // bail out early if there are not partitions to process
       if (addedPartitions_.isEmpty()) {
+        metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
         infoLog("Partition list is empty. Ignoring this event.");
         return;
       }
@@ -2241,7 +2267,8 @@ public class MetastoreEvents {
     }
 
     @Override
-    public void process() throws MetastoreNotificationException, CatalogException {
+    public void processTableEvent() throws MetastoreNotificationException,
+        CatalogException {
       if (isSelfEvent()) {
         infoLog("Not processing the event as it is a self-event");
         return;
@@ -2260,11 +2287,11 @@ public class MetastoreEvents {
             + "parameters which can be ignored.");
         return;
       }
-
       // Reload the whole table if it's a transactional table or materialized view.
-      // Materialized views are treated as a special case because it's possible to receive
-      // partition event on MVs, but they are regular views in Impala. That cause problems
-      // on the reloading partition logic which expects it to be a HdfsTable.
+      // Materialized views are treated as a special case because it's possible to
+      // receive partition event on MVs, but they are regular views in Impala. That
+      // cause problems on the reloading partition logic which expects it to be a
+      // HdfsTable.
       if (AcidUtils.isTransactionalTable(msTbl_.getParameters())
           || MetaStoreUtils.isMaterializedViewTable(msTbl_)) {
         reloadTransactionalTable();
@@ -2279,11 +2306,11 @@ public class MetastoreEvents {
           reloadPartitions(Arrays.asList(partitionAfter_),
               FileMetadataLoadOpts.LOAD_IF_SD_CHANGED, "ALTER_PARTITION event");
         } catch (CatalogException e) {
-          throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
-                  + "partition on table {} partition {} failed. Event processing cannot "
-                  + "continue. Issue an invalidate command to reset the event processor "
-                  + "state.", getFullyQualifiedTblName(),
-              HdfsTable.constructPartitionName(tPartSpec)), e);
+          throw new MetastoreNotificationNeedsInvalidateException(
+              debugString("Refresh partition on table {} partition {} failed. Event " +
+                  "processing cannot continue. Issue an invalidate command to reset " +
+                  "the event processor state.", getFullyQualifiedTblName(),
+                  HdfsTable.constructPartitionName(tPartSpec)), e);
         }
       }
     }
@@ -2381,12 +2408,12 @@ public class MetastoreEvents {
     List<T> getBatchEvents() { return batchedEvents_; }
 
     @Override
-    protected void process() throws MetastoreNotificationException, CatalogException {
+    protected void processTableEvent() throws MetastoreNotificationException,
+        CatalogException {
       if (isSelfEvent()) {
         infoLog("Not processing the event as it is a self-event");
         return;
       }
-
       // Ignore the event if this is a trivial event. See javadoc for
       // isTrivialAlterPartitionEvent() for examples.
       List<T> eventsToProcess = new ArrayList<>();
@@ -2503,12 +2530,15 @@ public class MetastoreEvents {
     }
 
     @Override
-    public void process() throws MetastoreNotificationException, CatalogException {
+    public void processTableEvent() throws MetastoreNotificationException,
+        CatalogException {
       // we have seen cases where a add_partition event is generated with empty
       // partition list (see IMPALA-8547 for details. Make sure that droppedPartitions
       // list is not empty
       if (droppedPartitions_.isEmpty()) {
+        metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
         infoLog("Partition list is empty. Ignoring this event.");
+        return;
       }
       try {
         // Reload the whole table if it's a transactional table or materialized view.
@@ -2587,7 +2617,7 @@ public class MetastoreEvents {
     }
 
     @Override
-    protected void process() throws MetastoreNotificationException {
+    protected void processTableEvent() throws MetastoreNotificationException {
       if (msTbl_ == null) {
         debugLog("Ignoring the event since table {} is not found",
             getFullyQualifiedTblName());
@@ -2686,7 +2716,7 @@ public class MetastoreEvents {
     }
 
     @Override
-    public void process() throws MetastoreNotificationException {
+    public void processTableEvent() throws MetastoreNotificationException {
       if (isSelfEvent() || isOlderEvent()) {
         metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
             .inc(getNumberOfEvents());
@@ -2695,7 +2725,6 @@ public class MetastoreEvents {
                 .getCount());
         return;
       }
-
       if (isRefresh_) {
         if (reloadPartition_ != null) {
           processPartitionReload();
@@ -2875,7 +2904,7 @@ public class MetastoreEvents {
     }
 
     @Override
-    protected void process() throws MetastoreNotificationException {
+    protected void processTableEvent() throws MetastoreNotificationException {
       try {
         if (partitionName_ == null) {
           reloadTableFromCatalog("Commit Compaction event", true);
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 3a5a6efd7..aa49cfc47 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -268,6 +268,11 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
   // number of batch events generated
   public static final String NUMBER_OF_BATCH_EVENTS = "batch-events-created";
 
+  // metric to measure the delay in msec, between the event created in metastore and time
+  // it took to be consumed by the event processor
+  public static final String AVG_DELAY_IN_CONSUMING_EVENTS = "events-consuming" +
+      "-delay";
+
   private static final long SECOND_IN_NANOS = 1000 * 1000 * 1000L;
 
   // List of event types to skip while fetching notification events from metastore
@@ -647,6 +652,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
     metrics_
         .addGauge(DELETE_EVENT_LOG_SIZE, (Gauge<Integer>) deleteEventLog_::size);
     metrics_.addCounter(NUMBER_OF_BATCH_EVENTS);
+    metrics_.addTimer(AVG_DELAY_IN_CONSUMING_EVENTS);
   }
 
   /**
@@ -1162,6 +1168,9 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
           deleteEventLog_.garbageCollect(event.getEventId());
           lastSyncedEventId_.set(event.getEventId());
           lastSyncedEventTimeSecs_.set(event.getEventTime());
+          metrics_.getTimer(AVG_DELAY_IN_CONSUMING_EVENTS).update(
+              (System.currentTimeMillis() / 1000) - event.getEventTime(),
+                  TimeUnit.SECONDS);
         }
       }
     } catch (CatalogException e) {
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index 04efc7255..31ee6880a 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -992,8 +992,8 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
         "Failing query(impala={}): {}".format(use_impala_client, stmt)
     else:
       # hive was used to run the stmts, any events generated should not have been deemed
-      # as self events
-      assert events_skipped == events_skipped_after
+      # as self events unless there are empty partition add/drop events
+      assert events_skipped <= events_skipped_after
 
   def __get_tbl_location(self, db_name, tbl_name):
     assert self.hive_client is not None
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 553ce0218..b46490bfd 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -54,6 +54,7 @@ class TestWebPage(ImpalaTestSuite):
   PROMETHEUS_METRICS_URL = "http://localhost:{0}/metrics_prometheus"
   QUERIES_URL = "http://localhost:{0}/queries"
   HEALTHZ_URL = "http://localhost:{0}/healthz"
+  EVENT_PROCESSOR_URL = "http://localhost:{0}/events"
 
   # log4j changes do not apply to the statestore since it doesn't
   # have an embedded JVM. So we make two sets of ports to test the
@@ -339,8 +340,11 @@ class TestWebPage(ImpalaTestSuite):
     self.__test_table_metrics(unique_database, "foo_part", "total-file-size-bytes")
     self.__test_table_metrics(unique_database, "foo_part", "num-files")
     self.__test_table_metrics(unique_database, "foo_part", "alter-duration")
+    self.__test_table_metrics(unique_database, "foo_part", "events-process-duration")
     self.__test_catalog_tablesfilesusage(unique_database, "foo_part", "1")
     self.__test_catalog_tables_loading_time(unique_database, "foo_part")
+    self.get_and_check_status(self.EVENT_PROCESSOR_URL, "events-consuming-delay",
+        ports_to_test=self.CATALOG_TEST_PORT)
 
   def __test_catalog_object(self, db_name, tbl_name, cluster_properties):
     """Tests the /catalog_object endpoint for the given db/table. Runs