You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2022/09/08 22:11:28 UTC

[impala] 01/02: IMPALA-11490: Add more metrics for event processor

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit bc92661bd3105cb378a3d140e247207959916d16
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Sun Sep 4 09:25:17 2022 +0800

    IMPALA-11490: Add more metrics for event processor
    
    This patch adds more metrics to debug event processing lagging behind.
    The latest event id in HMS is added so users can compare it with the
    last synced event id to know how many events are waiting to be synced.
    The event time of the last synced event and latest event in HMS are also
    added. Users can compare them to know how long catalogd is lagging
    behind. The update of the latest event id and event time are done in a
    dedicated thread in case the event-processor thread is blocked by slow
    metadata reloading or waiting for table locks.
    
    This patch also fixes the wrong metrics on events fetching and
    processing duration. Previously the method we used is
    Timer.getMeanRate() which returns the mean rate at which the duration is
    recorded. The correct method should be Timer.getSnapshot().getMean(). By
    getting the snapshot, we can also expose metrics of the 75th/95th/99th
    percentiles.
    
    To facilitate metrics collection, the last durations of events fetching
    and processing are also exposed.
    
    Tests:
     - Manually verified the metrics when running some Hive workloads
    
    Change-Id: I0e7d40a0d8e140e6b0698936e97b454cb9abdc1b
    Reviewed-on: http://gerrit.cloudera.org:8080/18937
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/util/event-metrics.cc                       |  98 +++++++++++++++++-
 be/src/util/event-metrics.h                        |  50 ++++++++-
 common/thrift/JniCatalog.thrift                    |  41 ++++++--
 common/thrift/metrics.json                         | 114 ++++++++++++++++++++-
 .../impala/catalog/events/MetastoreEvents.java     |   2 +
 .../catalog/events/MetastoreEventsProcessor.java   | 100 ++++++++++++++++--
 6 files changed, 375 insertions(+), 30 deletions(-)

diff --git a/be/src/util/event-metrics.cc b/be/src/util/event-metrics.cc
index c878f0ca0..061025c6d 100644
--- a/be/src/util/event-metrics.cc
+++ b/be/src/util/event-metrics.cc
@@ -36,9 +36,25 @@ string MetastoreEventMetrics::NUMBER_EVENTS_SKIPPED_METRIC_NAME =
 string MetastoreEventMetrics::EVENT_PROCESSOR_STATUS_METRIC_NAME =
     "events-processor.status";
 string MetastoreEventMetrics::EVENTS_FETCH_DURATION_MEAN_METRIC_NAME =
-    "events-processor.avg-events-fetch-duration";
+    "events-processor.events-fetch-duration-avg";
+string MetastoreEventMetrics::EVENTS_FETCH_DURATION_P75_METRIC_NAME =
+    "events-processor.events-fetch-duration-p75";
+string MetastoreEventMetrics::EVENTS_FETCH_DURATION_P95_METRIC_NAME =
+    "events-processor.events-fetch-duration-p95";
+string MetastoreEventMetrics::EVENTS_FETCH_DURATION_P99_METRIC_NAME =
+    "events-processor.events-fetch-duration-p99";
+string MetastoreEventMetrics::EVENTS_FETCH_LAST_DURATION_METRIC_NAME =
+    "events-processor.events-fetch-duration-latest";
 string MetastoreEventMetrics::EVENTS_PROCESS_DURATION_MEAN_METRIC_NAME =
-    "events-processor.avg-events-process-duration";
+    "events-processor.events-process-duration-avg";
+string MetastoreEventMetrics::EVENTS_PROCESS_DURATION_P75_METRIC_NAME =
+    "events-processor.events-process-duration-p75";
+string MetastoreEventMetrics::EVENTS_PROCESS_DURATION_P95_METRIC_NAME =
+    "events-processor.events-process-duration-p95";
+string MetastoreEventMetrics::EVENTS_PROCESS_DURATION_P99_METRIC_NAME =
+    "events-processor.events-process-duration-p99";
+string MetastoreEventMetrics::EVENTS_PROCESS_LAST_DURATION_METRIC_NAME =
+    "events-processor.events-process-duration-latest";
 
 string MetastoreEventMetrics::EVENTS_RECEIVED_1MIN_METRIC_NAME =
     "events-processor.events-received-1min-rate";
@@ -48,12 +64,27 @@ string MetastoreEventMetrics::EVENTS_RECEIVED_15MIN_METRIC_NAME =
     "events-processor.events-received-15min-rate";
 string MetastoreEventMetrics::LAST_SYNCED_EVENT_ID_METRIC_NAME =
     "events-processor.last-synced-event-id";
+string MetastoreEventMetrics::LAST_SYNCED_EVENT_TIME_METRIC_NAME =
+    "events-processor.last-synced-event-time";
+string MetastoreEventMetrics::LATEST_EVENT_ID_METRIC_NAME =
+    "events-processor.latest-event-id";
+string MetastoreEventMetrics::LATEST_EVENT_TIME_METRIC_NAME =
+    "events-processor.latest-event-time";
 
 IntCounter* MetastoreEventMetrics::NUM_EVENTS_RECEIVED_COUNTER = nullptr;
 IntCounter* MetastoreEventMetrics::NUM_EVENTS_SKIPPED_COUNTER = nullptr;
 
 DoubleGauge* MetastoreEventMetrics::EVENTS_FETCH_DURATION_MEAN = nullptr;
+DoubleGauge* MetastoreEventMetrics::EVENTS_FETCH_DURATION_P75 = nullptr;
+DoubleGauge* MetastoreEventMetrics::EVENTS_FETCH_DURATION_P95 = nullptr;
+DoubleGauge* MetastoreEventMetrics::EVENTS_FETCH_DURATION_P99 = nullptr;
+DoubleGauge* MetastoreEventMetrics::EVENTS_FETCH_LAST_DURATION = nullptr;
+
 DoubleGauge* MetastoreEventMetrics::EVENTS_PROCESS_DURATION_MEAN = nullptr;
+DoubleGauge* MetastoreEventMetrics::EVENTS_PROCESS_DURATION_P75 = nullptr;
+DoubleGauge* MetastoreEventMetrics::EVENTS_PROCESS_DURATION_P95 = nullptr;
+DoubleGauge* MetastoreEventMetrics::EVENTS_PROCESS_DURATION_P99 = nullptr;
+DoubleGauge* MetastoreEventMetrics::EVENTS_PROCESS_LAST_DURATION = nullptr;
 
 StringProperty* MetastoreEventMetrics::EVENT_PROCESSOR_STATUS = nullptr;
 
@@ -61,6 +92,9 @@ DoubleGauge* MetastoreEventMetrics::EVENTS_RECEIVED_1MIN_RATE = nullptr;
 DoubleGauge* MetastoreEventMetrics::EVENTS_RECEIVED_5MIN_RATE = nullptr;
 DoubleGauge* MetastoreEventMetrics::EVENTS_RECEIVED_15MIN_RATE = nullptr;
 IntCounter* MetastoreEventMetrics::LAST_SYNCED_EVENT_ID = nullptr;
+IntCounter* MetastoreEventMetrics::LAST_SYNCED_EVENT_TIME = nullptr;
+IntCounter* MetastoreEventMetrics::LATEST_EVENT_ID = nullptr;
+IntCounter* MetastoreEventMetrics::LATEST_EVENT_TIME = nullptr;
 
 // Initialize all the metrics for the events metric group
 void MetastoreEventMetrics::InitMetastoreEventMetrics(MetricGroup* metric_group) {
@@ -75,10 +109,29 @@ void MetastoreEventMetrics::InitMetastoreEventMetrics(MetricGroup* metric_group)
       event_metrics->AddCounter(NUMBER_EVENTS_RECEIVED_METRIC_NAME, 0);
   NUM_EVENTS_SKIPPED_COUNTER =
       event_metrics->AddCounter(NUMBER_EVENTS_SKIPPED_METRIC_NAME, 0);
+
   EVENTS_FETCH_DURATION_MEAN =
       event_metrics->AddDoubleGauge(EVENTS_FETCH_DURATION_MEAN_METRIC_NAME, 0.0);
+  EVENTS_FETCH_DURATION_P75 =
+      event_metrics->AddDoubleGauge(EVENTS_FETCH_DURATION_P75_METRIC_NAME, 0.0);
+  EVENTS_FETCH_DURATION_P95 =
+      event_metrics->AddDoubleGauge(EVENTS_FETCH_DURATION_P95_METRIC_NAME, 0.0);
+  EVENTS_FETCH_DURATION_P99 =
+      event_metrics->AddDoubleGauge(EVENTS_FETCH_DURATION_P99_METRIC_NAME, 0.0);
+  EVENTS_FETCH_LAST_DURATION =
+      event_metrics->AddDoubleGauge(EVENTS_FETCH_LAST_DURATION_METRIC_NAME, 0.0);
+
   EVENTS_PROCESS_DURATION_MEAN =
       event_metrics->AddDoubleGauge(EVENTS_PROCESS_DURATION_MEAN_METRIC_NAME, 0.0);
+  EVENTS_PROCESS_DURATION_P75 =
+      event_metrics->AddDoubleGauge(EVENTS_PROCESS_DURATION_P75_METRIC_NAME, 0.0);
+  EVENTS_PROCESS_DURATION_P95 =
+      event_metrics->AddDoubleGauge(EVENTS_PROCESS_DURATION_P95_METRIC_NAME, 0.0);
+  EVENTS_PROCESS_DURATION_P99 =
+      event_metrics->AddDoubleGauge(EVENTS_PROCESS_DURATION_P99_METRIC_NAME, 0.0);
+  EVENTS_PROCESS_LAST_DURATION =
+      event_metrics->AddDoubleGauge(EVENTS_PROCESS_LAST_DURATION_METRIC_NAME, 0.0);
+
   EVENTS_RECEIVED_1MIN_RATE =
       event_metrics->AddDoubleGauge(EVENTS_RECEIVED_1MIN_METRIC_NAME, 0.0);
   EVENTS_RECEIVED_5MIN_RATE =
@@ -87,6 +140,12 @@ void MetastoreEventMetrics::InitMetastoreEventMetrics(MetricGroup* metric_group)
       event_metrics->AddDoubleGauge(EVENTS_RECEIVED_15MIN_METRIC_NAME, 0.0);
   LAST_SYNCED_EVENT_ID =
       event_metrics->AddCounter(LAST_SYNCED_EVENT_ID_METRIC_NAME, 0);
+  LAST_SYNCED_EVENT_TIME =
+      event_metrics->AddCounter(LAST_SYNCED_EVENT_TIME_METRIC_NAME, 0);
+  LATEST_EVENT_ID =
+      event_metrics->AddCounter(LATEST_EVENT_ID_METRIC_NAME, 0);
+  LATEST_EVENT_TIME =
+      event_metrics->AddCounter(LATEST_EVENT_TIME_METRIC_NAME, 0);
 }
 
 void MetastoreEventMetrics::refresh(TEventProcessorMetrics* response) {
@@ -106,9 +165,33 @@ void MetastoreEventMetrics::refresh(TEventProcessorMetrics* response) {
   if (response->__isset.events_fetch_duration_mean) {
     EVENTS_FETCH_DURATION_MEAN->SetValue(response->events_fetch_duration_mean);
   }
+  if (response->__isset.events_fetch_duration_p75) {
+    EVENTS_FETCH_DURATION_P75->SetValue(response->events_fetch_duration_p75);
+  }
+  if (response->__isset.events_fetch_duration_p95) {
+    EVENTS_FETCH_DURATION_P95->SetValue(response->events_fetch_duration_p95);
+  }
+  if (response->__isset.events_fetch_duration_p99) {
+    EVENTS_FETCH_DURATION_P99->SetValue(response->events_fetch_duration_p99);
+  }
+  if (response->__isset.last_events_fetch_duration) {
+    EVENTS_FETCH_LAST_DURATION->SetValue(response->last_events_fetch_duration);
+  }
   if (response->__isset.events_process_duration_mean) {
     EVENTS_PROCESS_DURATION_MEAN->SetValue(response->events_process_duration_mean);
   }
+  if (response->__isset.events_process_duration_p75) {
+    EVENTS_PROCESS_DURATION_P75->SetValue(response->events_process_duration_p75);
+  }
+  if (response->__isset.events_process_duration_p95) {
+    EVENTS_PROCESS_DURATION_P95->SetValue(response->events_process_duration_p95);
+  }
+  if (response->__isset.events_process_duration_p99) {
+    EVENTS_PROCESS_DURATION_P99->SetValue(response->events_process_duration_p99);
+  }
+  if (response->__isset.last_events_process_duration) {
+    EVENTS_PROCESS_LAST_DURATION->SetValue(response->last_events_process_duration);
+  }
   if (response->__isset.events_received_1min_rate) {
     EVENTS_RECEIVED_1MIN_RATE->SetValue(response->events_received_1min_rate);
   }
@@ -118,8 +201,17 @@ void MetastoreEventMetrics::refresh(TEventProcessorMetrics* response) {
   if (response->__isset.events_received_15min_rate) {
     EVENTS_RECEIVED_15MIN_RATE->SetValue(response->events_received_15min_rate);
   }
-  if(response->__isset.last_synced_event_id){
+  if (response->__isset.last_synced_event_id) {
     LAST_SYNCED_EVENT_ID->SetValue(response->last_synced_event_id);
   }
+  if (response->__isset.last_synced_event_time) {
+    LAST_SYNCED_EVENT_TIME->SetValue(response->last_synced_event_time);
+  }
+  if (response->__isset.latest_event_id) {
+    LATEST_EVENT_ID->SetValue(response->latest_event_id);
+  }
+  if (response->__isset.latest_event_time) {
+    LATEST_EVENT_TIME->SetValue(response->latest_event_time);
+  }
 }
 } // namespace impala
diff --git a/be/src/util/event-metrics.h b/be/src/util/event-metrics.h
index e6fa1e1df..e314c9bcf 100644
--- a/be/src/util/event-metrics.h
+++ b/be/src/util/event-metrics.h
@@ -42,11 +42,23 @@ class MetastoreEventMetrics {
   /// Total number of events skipped so far
   static IntCounter* NUM_EVENTS_SKIPPED_COUNTER;
 
-  /// Mean duration required to fetch a batch of events
+  /// Mean/p75/p95/p99 duration required to fetch a batch of events
   static DoubleGauge* EVENTS_FETCH_DURATION_MEAN;
+  static DoubleGauge* EVENTS_FETCH_DURATION_P75;
+  static DoubleGauge* EVENTS_FETCH_DURATION_P95;
+  static DoubleGauge* EVENTS_FETCH_DURATION_P99;
 
-  /// Mean duration required to process the fetched batch of events
+  /// Duration of fetching the last event batch
+  static DoubleGauge* EVENTS_FETCH_LAST_DURATION;
+
+  /// Mean/p75/p95/p99 duration required to process the fetched batch of events
   static DoubleGauge* EVENTS_PROCESS_DURATION_MEAN;
+  static DoubleGauge* EVENTS_PROCESS_DURATION_P75;
+  static DoubleGauge* EVENTS_PROCESS_DURATION_P95;
+  static DoubleGauge* EVENTS_PROCESS_DURATION_P99;
+
+  /// Duration of processing the last event batch
+  static DoubleGauge* EVENTS_PROCESS_LAST_DURATION;
 
   /// The current status of Metastore events processor.
   /// See MetastoreEventProcessor.EventProcessorStatus for possible state values
@@ -64,6 +76,15 @@ class MetastoreEventMetrics {
   /// Last metastore event id that the catalog server synced to.
   static IntCounter* LAST_SYNCED_EVENT_ID;
 
+  /// Last metastore event time that the catalog server synced to.
+  static IntCounter* LAST_SYNCED_EVENT_TIME;
+
+  /// Latest metastore event id
+  static IntCounter* LATEST_EVENT_ID;
+
+  /// Latest metastore event time
+  static IntCounter* LATEST_EVENT_TIME;
+
  private:
   /// Following metric names must match with the key in metrics.json
 
@@ -76,11 +97,23 @@ class MetastoreEventMetrics {
   /// metric name for event processor status
   static std::string EVENT_PROCESSOR_STATUS_METRIC_NAME;
 
-  /// metric name for the mean time taken for events fetch metric
+  /// metric name for the mean/p75/p95/p99 time taken for events fetch metric
   static std::string EVENTS_FETCH_DURATION_MEAN_METRIC_NAME;
+  static std::string EVENTS_FETCH_DURATION_P75_METRIC_NAME;
+  static std::string EVENTS_FETCH_DURATION_P95_METRIC_NAME;
+  static std::string EVENTS_FETCH_DURATION_P99_METRIC_NAME;
 
-  /// metric name for the mean time taken for events processing metric
+  /// metric name for the duration of fetching the last event batch
+  static std::string EVENTS_FETCH_LAST_DURATION_METRIC_NAME;
+
+  /// metric name for the mean/p75/p95/p99 time taken for events processing metric
   static std::string EVENTS_PROCESS_DURATION_MEAN_METRIC_NAME;
+  static std::string EVENTS_PROCESS_DURATION_P75_METRIC_NAME;
+  static std::string EVENTS_PROCESS_DURATION_P95_METRIC_NAME;
+  static std::string EVENTS_PROCESS_DURATION_P99_METRIC_NAME;
+
+  /// metric name for the duration of processing the last event batch
+  static std::string EVENTS_PROCESS_LAST_DURATION_METRIC_NAME;
 
   /// metric name for EWMA of number of events in last 1 min
   static std::string EVENTS_RECEIVED_1MIN_METRIC_NAME;
@@ -93,6 +126,15 @@ class MetastoreEventMetrics {
 
   /// Metric name for last metastore event id that the catalog server synced to.
   static std::string LAST_SYNCED_EVENT_ID_METRIC_NAME;
+
+  /// Metric name for the event time of the last synced metastore event
+  static std::string LAST_SYNCED_EVENT_TIME_METRIC_NAME;
+
+  /// Metric name for the latest metastore event id
+  static std::string LATEST_EVENT_ID_METRIC_NAME;
+
+  /// Metric name for the event time of the latest metastore event
+  static std::string LATEST_EVENT_TIME_METRIC_NAME;
 };
 
 } // namespace impala
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 24cd7856d..7f68dfaf3 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -868,26 +868,47 @@ struct TEventProcessorMetrics {
   // Total number of events skipped so far
   3: optional i64 events_skipped
 
-  // Mean time in sec for the fetching metastore events
+  // Time in sec for the fetching metastore events
   4: optional double events_fetch_duration_mean
+  5: optional double events_fetch_duration_p75
+  6: optional double events_fetch_duration_p95
+  7: optional double events_fetch_duration_p99
 
-  // Mean time in sec for processing a given batch of events
-  5: optional double events_process_duration_mean
+  // Duration in sec for fetching the last event batch
+  8: optional double last_events_fetch_duration
 
-  // Average number of events received in 1 min
-  6: optional double events_received_1min_rate
+  // Time in sec for processing a given batch of events
+  9: optional double events_process_duration_mean
+  10: optional double events_process_duration_p75
+  11: optional double events_process_duration_p95
+  12: optional double events_process_duration_p99
 
-  // Average number of events received in 1 min
-  7: optional double events_received_5min_rate
+  // Duration in sec for processing the last event batch
+  13: optional double last_events_process_duration
 
   // Average number of events received in 1 min
-  8: optional double events_received_15min_rate
+  14: optional double events_received_1min_rate
+
+  // Average number of events received in 5 min
+  15: optional double events_received_5min_rate
+
+  // Average number of events received in 15 min
+  16: optional double events_received_15min_rate
 
   // Average number events skipped in a polling interval
-  9: optional double events_skipped_per_poll_mean
+  17: optional double events_skipped_per_poll_mean
 
   // Last metastore event id that the catalog server synced to
-  10: optional i64 last_synced_event_id
+  18: optional i64 last_synced_event_id
+
+  // Event time of the last synced event
+  19: optional i64 last_synced_event_time
+
+  // Latest metastore event id
+  20: optional i64 latest_event_id
+
+  // Event time of the latest metastore event
+  21: optional i64 latest_event_time
 }
 
 struct TCatalogHmsCacheApiMetrics {
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 5780c838d..d84453ca8 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2690,7 +2690,47 @@
     "label": "Average duration to fetch metastore events",
     "units": "TIME_S",
     "kind": "GAUGE",
-    "key": "events-processor.avg-events-fetch-duration"
+    "key": "events-processor.events-fetch-duration-avg"
+  },
+  {
+    "description": "75th percentile of the time taken to fetch a batch of metastore events",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "P75 duration to fetch metastore events",
+    "units": "TIME_S",
+    "kind": "GAUGE",
+    "key": "events-processor.events-fetch-duration-p75"
+  },
+  {
+    "description": "95th percentile of the time taken to fetch a batch of metastore events",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "P95 duration to fetch metastore events",
+    "units": "TIME_S",
+    "kind": "GAUGE",
+    "key": "events-processor.events-fetch-duration-p95"
+  },
+  {
+    "description": "99th percentile of the time taken to fetch a batch of metastore events",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "P99 duration to fetch metastore events",
+    "units": "TIME_S",
+    "kind": "GAUGE",
+    "key": "events-processor.events-fetch-duration-p99"
+  },
+  {
+    "description": "Last time taken to fetch a batch of metastore events",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "Last duration to fetch a batch of metastore events",
+    "units": "TIME_S",
+    "kind": "GAUGE",
+    "key": "events-processor.events-fetch-duration-latest"
   },
   {
     "description": "Average time taken to process a batch of events received from metastore",
@@ -2700,7 +2740,47 @@
     "label": "Average duration to process a batch of metastore events",
     "units": "TIME_S",
     "kind": "GAUGE",
-    "key": "events-processor.avg-events-process-duration"
+    "key": "events-processor.events-process-duration-avg"
+  },
+  {
+    "description": "75th percentile of the time taken to process a batch of events received from metastore",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "P75 duration to process a batch of metastore events",
+    "units": "TIME_S",
+    "kind": "GAUGE",
+    "key": "events-processor.events-process-duration-p75"
+  },
+  {
+    "description": "95th percentile of the time taken to process a batch of events received from metastore",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "P95 duration to process a batch of metastore events",
+    "units": "TIME_S",
+    "kind": "GAUGE",
+    "key": "events-processor.events-process-duration-p95"
+  },
+  {
+    "description": "99th percentile of the time taken to process a batch of events received from metastore",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "P99 duration to process a batch of metastore events",
+    "units": "TIME_S",
+    "kind": "GAUGE",
+    "key": "events-processor.events-process-duration-p99"
+  },
+  {
+    "description": "Last time taken to process a batch of events received from metastore",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "Last duration to process a batch of metastore events",
+    "units": "TIME_S",
+    "kind": "GAUGE",
+    "key": "events-processor.events-process-duration-latest"
   },
   {
     "description": "Exponentially weighted moving average (EWMA) of number of events received in last 1 min",
@@ -2742,6 +2822,36 @@
     "kind" : "COUNTER",
     "key" : "events-processor.last-synced-event-id"
   },
+  {
+    "description": "Last metastore event time that the catalog server processed and synced to",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "Last Synced Event Time",
+    "units": "NONE",
+    "kind" : "COUNTER",
+    "key" : "events-processor.last-synced-event-time"
+  },
+  {
+    "description": "Latest event id in Hive metastore",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "Latest Event Id",
+    "units": "NONE",
+    "kind" : "COUNTER",
+    "key" : "events-processor.latest-event-id"
+  },
+  {
+    "description": "Event time of the latest event in Hive metastore",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "Latest Event Time",
+    "units": "NONE",
+    "kind" : "COUNTER",
+    "key" : "events-processor.latest-event-time"
+  },
   {
     "description": "Total number of executor groups that have at least one executor",
     "contexts": [
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index ec788b009..2c8e6c98d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -470,6 +470,8 @@ public class MetastoreEvents {
 
     public long getEventId() { return eventId_; }
 
+    public long getEventTime() { return event_.getEventTime(); }
+
     public MetastoreEventType getEventType() { return eventType_; }
 
     /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 5cfc2df13..264caab4e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -18,6 +18,7 @@
 package org.apache.impala.catalog.events;
 
 import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Snapshot;
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -49,6 +50,7 @@ import org.apache.impala.catalog.events.MetastoreEvents.DropDatabaseEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
 import org.apache.impala.common.Metrics;
+import org.apache.impala.common.PrintUtils;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.thrift.TEventProcessorMetrics;
@@ -250,6 +252,8 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
   // number of batch events generated
   public static final String NUMBER_OF_BATCH_EVENTS = "batch-events-created";
 
+  private static final long SECOND_IN_NANOS = 1000 * 1000 * 1000L;
+
   /**
    * Wrapper around {@link
    * MetastoreEventsProcessor#getNextMetastoreEventsInBatches(CatalogServiceCatalog,
@@ -489,6 +493,15 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
 
   // keeps track of the last event id which we have synced to
   private final AtomicLong lastSyncedEventId_ = new AtomicLong(-1);
+  private final AtomicLong lastSyncedEventTimeMs_ = new AtomicLong(0);
+
+  // The event id and eventTime of the latest event in HMS. Only used in metrics to show
+  // how far we are lagging behind.
+  private final AtomicLong latestEventId_ = new AtomicLong(-1);
+  private final AtomicLong latestEventTimeMs_ = new AtomicLong(0);
+
+  // The duration in nanoseconds of the processing of the last event batch.
+  private final AtomicLong lastEventProcessDurationNs_ = new AtomicLong(0);
 
   // polling interval in seconds. Note this is a time we wait AFTER each fetch call
   private final long pollingFrequencyInSec_;
@@ -645,6 +658,10 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
         pollingFrequencyInSec_));
     scheduler_.scheduleWithFixedDelay(this::processEvents, pollingFrequencyInSec_,
         pollingFrequencyInSec_, TimeUnit.SECONDS);
+    // Update latestEventId in another thread in case that the processEvents() thread is
+    // blocked by slow metadata reloading or waiting for table locks.
+    scheduler_.scheduleWithFixedDelay(this::updateLatestEventId, pollingFrequencyInSec_,
+        pollingFrequencyInSec_, TimeUnit.SECONDS);
   }
 
   /**
@@ -849,6 +866,40 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
     }
   }
 
+  /**
+   * Update the latest event id regularly so we know how far we are lagging behind.
+   */
+  private void updateLatestEventId() {
+    EventProcessorStatus currentStatus = eventProcessorStatus_;
+    if (currentStatus != EventProcessorStatus.ACTIVE) {
+      return;
+    }
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      CurrentNotificationEventId currentNotificationEventId =
+          msClient.getHiveClient().getCurrentNotificationEventId();
+      long currentEventId = currentNotificationEventId.getEventId();
+      // no new events since we last polled
+      if (currentEventId <= latestEventId_.get()) {
+        return;
+      }
+      // Fetch the last event to get its eventTime.
+      NotificationEventRequest eventRequest = new NotificationEventRequest();
+      eventRequest.setLastEvent(currentEventId - 1);
+      eventRequest.setMaxEvents(1);
+      NotificationEventResponse response = MetastoreShim
+          .getNextNotification(msClient.getHiveClient(), eventRequest);
+      NotificationEvent event = response.getEventsIterator().next();
+      Preconditions.checkState(event.getEventId() == currentEventId);
+      LOG.info("Latest event in HMS: id={}, time={}", currentEventId,
+          event.getEventTime());
+      latestEventId_.set(currentEventId);
+      latestEventTimeMs_.set(event.getEventTime());
+    } catch (Exception e) {
+      LOG.error("Unable to update current notification event id. Last value: {}",
+          latestEventId_, e);
+    }
+  }
+
   /**
    * Gets the current event processor metrics along with its status. If the status is
    * not active the metrics are skipped. Only the status is sent
@@ -860,25 +911,48 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
     eventProcessorMetrics.setStatus(currentStatus.toString());
     eventProcessorMetrics.setLast_synced_event_id(getLastSyncedEventId());
     if (currentStatus != EventProcessorStatus.ACTIVE) return eventProcessorMetrics;
+    // The following counters are only updated when event-processor is active.
+    eventProcessorMetrics.setLast_synced_event_time(lastSyncedEventTimeMs_.get());
+    eventProcessorMetrics.setLatest_event_id(latestEventId_.get());
+    eventProcessorMetrics.setLatest_event_time(latestEventTimeMs_.get());
 
     long eventsReceived = metrics_.getMeter(EVENTS_RECEIVED_METRIC).getCount();
     long eventsSkipped = metrics_.getCounter(EVENTS_SKIPPED_METRIC).getCount();
-    double avgFetchDuration =
-        metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).getMeanRate();
-    double avgProcessDuration =
-        metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).getMeanRate();
+    eventProcessorMetrics.setEvents_received(eventsReceived);
+    eventProcessorMetrics.setEvents_skipped(eventsSkipped);
+
+    Snapshot fetchDuration =
+        metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).getSnapshot();
+    double avgFetchDuration = fetchDuration.getMean() / SECOND_IN_NANOS;
+    double p75FetchDuration = fetchDuration.get75thPercentile() / SECOND_IN_NANOS;
+    double p95FetchDuration = fetchDuration.get95thPercentile() / SECOND_IN_NANOS;
+    double p99FetchDuration = fetchDuration.get99thPercentile() / SECOND_IN_NANOS;
+    eventProcessorMetrics.setEvents_fetch_duration_mean(avgFetchDuration);
+    eventProcessorMetrics.setEvents_fetch_duration_p75(p75FetchDuration);
+    eventProcessorMetrics.setEvents_fetch_duration_p95(p95FetchDuration);
+    eventProcessorMetrics.setEvents_fetch_duration_p99(p99FetchDuration);
+
+    Snapshot processDuration =
+        metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).getSnapshot();
+    double avgProcessDuration = processDuration.getMean() / SECOND_IN_NANOS;
+    double p75ProcessDuration = processDuration.get75thPercentile() / SECOND_IN_NANOS;
+    double p95ProcessDuration = processDuration.get95thPercentile() / SECOND_IN_NANOS;
+    double p99ProcessDuration = processDuration.get99thPercentile() / SECOND_IN_NANOS;
+    eventProcessorMetrics.setEvents_process_duration_mean(avgProcessDuration);
+    eventProcessorMetrics.setEvents_process_duration_p75(p75ProcessDuration);
+    eventProcessorMetrics.setEvents_process_duration_p95(p95ProcessDuration);
+    eventProcessorMetrics.setEvents_process_duration_p99(p99ProcessDuration);
+
+    double lastProcessDuration = lastEventProcessDurationNs_.get() /
+        (double) SECOND_IN_NANOS;
+    eventProcessorMetrics.setLast_events_process_duration(lastProcessDuration);
+
     double avgNumberOfEventsReceived1Min =
         metrics_.getMeter(EVENTS_RECEIVED_METRIC).getOneMinuteRate();
     double avgNumberOfEventsReceived5Min =
         metrics_.getMeter(EVENTS_RECEIVED_METRIC).getFiveMinuteRate();
     double avgNumberOfEventsReceived15Min =
         metrics_.getMeter(EVENTS_RECEIVED_METRIC).getFifteenMinuteRate();
-
-
-    eventProcessorMetrics.setEvents_received(eventsReceived);
-    eventProcessorMetrics.setEvents_skipped(eventsSkipped);
-    eventProcessorMetrics.setEvents_fetch_duration_mean(avgFetchDuration);
-    eventProcessorMetrics.setEvents_process_duration_mean(avgProcessDuration);
     eventProcessorMetrics.setEvents_received_1min_rate(avgNumberOfEventsReceived1Min);
     eventProcessorMetrics.setEvents_received_5min_rate(avgNumberOfEventsReceived5Min);
     eventProcessorMetrics.setEvents_received_15min_rate(avgNumberOfEventsReceived15Min);
@@ -935,6 +1009,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
           event.processIfEnabled();
           deleteEventLog_.garbageCollect(event.getEventId());
           lastSyncedEventId_.set(event.getEventId());
+          lastSyncedEventTimeMs_.set(event.getEventTime());
         }
       }
     } catch (CatalogException e) {
@@ -942,7 +1017,10 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
           "Unable to process event %d of type %s. Event processing will be stopped.",
           lastProcessedEvent.getEventId(), lastProcessedEvent.getEventType()), e);
     } finally {
-      context.stop();
+      long elapsed_ns = context.stop();
+      lastEventProcessDurationNs_.set(elapsed_ns);
+      LOG.info("Time elapsed in processing event batch: {}",
+          PrintUtils.printTimeNs(elapsed_ns));
     }
   }