You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2022/01/17 14:34:05 UTC

[GitHub] [hive] klcopp commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

klcopp commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r782811556



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -142,6 +144,9 @@ public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     this.workerName = getWorkerId();
     setName(workerName);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON);

Review comment:
       COMPACTOR_INITIATOR_ON could be false the HS2 running this Worker, but metrics should still be collected.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -87,14 +106,15 @@ public void init(AtomicBoolean stop) throws Exception {
     cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
             conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM),
             COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON);

Review comment:
       COMPACTOR_INITIATOR_ON also controls whether the Cleaner runs, so this line is unnecessary... but if you want to leave it in for posterity/the future, I understand.

##########
File path: standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
##########
@@ -2926,11 +2945,15 @@ PartitionsResponse get_partitions_req(1:PartitionsRequest req)
   void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1)
+  CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName, 2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType type) throws(1:MetaException o1)

Review comment:
       If there's any doubt that the parameters might be changed, I recommend introducing a CompactionMetricsDataRequest object

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java
##########
@@ -39,199 +45,396 @@
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
-import java.util.EnumMap;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_DELTAS;
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_OBSOLETE_DELTAS;
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_SMALL_DELTAS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public class TestDeltaFilesMetrics extends CompactorTest  {
 
   private void setUpHiveConf() {
-    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED, true);
-    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE, 2);
-    HiveConf.setTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_CACHE_DURATION, 7200, TimeUnit.SECONDS);
-    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD, 100);
-    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD, 100);
-    HiveConf.setTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, 1, TimeUnit.SECONDS);
+    MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD, 1);
+    MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD, 1);
+    MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_REPORTING_INTERVAL, 1,
+        TimeUnit.SECONDS);
+    MetastoreConf.setDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD, 0.15f);
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, true);
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON, true);
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
   }
 
-  private void initAndCollectFirstMetrics() throws Exception {
-    MetricsFactory.close();
-    MetricsFactory.init(conf);
+  @After
+  public void tearDown() throws Exception {
+    DeltaFilesMetricReporter.close();
+  }
 
-    DeltaFilesMetricReporter.init(conf);
 
-    TezCounters tezCounters = new TezCounters();
-    tezCounters.findCounter(NUM_OBSOLETE_DELTAS + "", "default.acid/p=1").setValue(200);
-    tezCounters.findCounter(NUM_OBSOLETE_DELTAS + "", "default.acid/p=2").setValue(100);
-    tezCounters.findCounter(NUM_OBSOLETE_DELTAS + "", "default.acid/p=3").setValue(150);
-    tezCounters.findCounter(NUM_OBSOLETE_DELTAS + "", "default.acid_v2").setValue(250);
+  static void verifyMetricsMatch(Map<String, String> expected, Map<String, String> actual) {
+    Assert.assertTrue("Actual metrics " + actual + " don't match expected: " + expected,
+        equivalent(expected, actual));
+  }
 
-    tezCounters.findCounter(NUM_DELTAS + "", "default.acid/p=1").setValue(150);
-    tezCounters.findCounter(NUM_DELTAS + "", "default.acid/p=2").setValue(100);
-    tezCounters.findCounter(NUM_DELTAS + "", "default.acid/p=3").setValue(250);
-    tezCounters.findCounter(NUM_DELTAS + "", "default.acid_v2").setValue(200);
+  private static boolean equivalent(Map<String, String> lhs, Map<String, String> rhs) {
+    return lhs.size() == rhs.size() && Maps.difference(lhs, rhs).areEqual();
+  }
 
-    tezCounters.findCounter(NUM_SMALL_DELTAS + "", "default.acid/p=1").setValue(250);
-    tezCounters.findCounter(NUM_SMALL_DELTAS + "", "default.acid/p=2").setValue(200);
-    tezCounters.findCounter(NUM_SMALL_DELTAS + "", "default.acid/p=3").setValue(150);
-    tezCounters.findCounter(NUM_SMALL_DELTAS + "", "default.acid_v2").setValue(100);
+  static Map<String, String> gaugeToMap(String metric) throws Exception {
+    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    ObjectName oname = new ObjectName(DeltaFilesMetricReporter.OBJECT_NAME_PREFIX + metric);
+    MBeanInfo mbeanInfo = mbs.getMBeanInfo(oname);
 
-    DeltaFilesMetricReporter.getInstance().submit(tezCounters, null);
-    Thread.sleep(1000);
+    Map<String, String> result = new HashMap<>();
+    for (MBeanAttributeInfo attr : mbeanInfo.getAttributes()) {
+      result.put(attr.getName(), String.valueOf(mbs.getAttribute(oname, attr.getName())));
+    }
+    return result;
   }
 
-  @After
-  public void tearDown() {
-    DeltaFilesMetricReporter.close();
+  @Override
+  boolean useHive130DeltaDirName() {
+    return false;
   }
 
   @Test
-  public void testDeltaFilesMetric() throws Exception {
+  public void testDeltaFileMetricPartitionedTable() throws Exception {
     setUpHiveConf();
-    initAndCollectFirstMetrics();
+    String dbName = "default";
+    String tblName = "dp";
+    String partName = "ds=part1";
 
+    Table t = newTable(dbName, tblName, true);
+    List<LockComponent> components = new ArrayList<>();
+
+    Partition p = newPartition(t, "part1");
+    addBaseFile(t, p, 20L, 20);
+    addDeltaFile(t, p, 21L, 22L, 2);
+    addDeltaFile(t, p, 23L, 24L, 20);
+
+    components.add(createLockComponent(dbName, tblName, partName));
+
+    burnThroughTransactions(dbName, tblName, 23);
+    long txnid = openTxn();
+
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    Assert.assertEquals(LockState.ACQUIRED, res.getState());
+
+    long writeid = allocateWriteId(dbName, tblName, txnid);
+    Assert.assertEquals(24, writeid);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    startInitiator();
+
+    TimeUnit.SECONDS.sleep(1);

Review comment:
       Since the reporting interval is also set to 1 s, just to be on the safe side, maybe increase this sleep to 1.5 s?

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                .metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + StringUtils.stringifyException(e));

Review comment:
       We should be really careful about throwing exceptions... if metric collection fails, the Initiator/Worker/Cleaner should continue with no issues.
   (Also, IMO logging a warning is enough, but it's up to you)

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -308,7 +307,22 @@
       "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" FROM \"TXN_COMPONENTS\" " +
           "INNER JOIN \"TXNS\" ON \"TC_TXNID\" = \"TXN_ID\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED +
       " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" HAVING COUNT(\"TXN_ID\") > ?";
-
+  private static final String SELECT_COMPACTION_METRICS_CACHE_QUERY =
+      "SELECT \"CMC_METRIC_VALUE\", \"CMC_VERSION\" FROM \"COMPACTION_METRICS_CACHE\" " +
+      "WHERE \"CMC_DATABASE\" = ? AND \"CMC_TABLE\" = ? AND \"CMC_METRIC_TYPE\" = ?";
+  private static final String NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY =
+      "* FROM \"COMPACTION_METRICS_CACHE\" WHERE \"CMC_METRIC_TYPE\" = ? ORDER BY \"CMC_METRIC_VALUE\" DESC";

Review comment:
       Since the COMPACTION_METRICS_CACHE schema might change in the future, it's better to select each column by name instead of "select *"

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_DELTAS, numDeltas,

Review comment:
       For example, if the threshold is deltas=5, there are deltas=7 in the cache, but now there are 2 deltas in this AcidDir: Will the cache entry be removed?

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
##########
@@ -8831,6 +8833,34 @@ public void mark_failed(CompactionInfoStruct cr) throws MetaException {
     getTxnHandler().markFailed(CompactionInfo.compactionStructToInfo(cr));
   }
 
+  @Override
+  public CompactionMetricsDataResponse get_compaction_metrics_data(String dbName, String tblName, String partitionName, CompactionMetricsMetricType type) throws MetaException {
+    CompactionMetricsData metricsData =
+        getTxnHandler().getCompactionMetricsData(dbName, tblName, partitionName,
+            CompactionMetricsDataConverter.thriftCompactionMetricType2DbType(type));
+    CompactionMetricsDataResponse response = new CompactionMetricsDataResponse();
+    if (metricsData != null) {
+      response.setData(CompactionMetricsDataConverter.dataToStruct(metricsData));
+    }
+    return response;
+  }
+
+  @Override
+  public boolean update_compaction_metrics_data(CompactionMetricsDataStruct struct, int version) throws MetaException {
+      return getTxnHandler().updateCompactionMetricsData(CompactionMetricsDataConverter.structToData(struct), version);

Review comment:
       Could `struct` be null?

##########
File path: standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
##########
@@ -661,6 +661,16 @@ CREATE TABLE COMPLETED_COMPACTIONS (
 
 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION);
 
+-- HIVE-25842
+CREATE TABLE COMPACTION_METRICS_CACHE (
+  CMC_DATABASE varchar(128) NOT NULL,
+  CMC_TABLE varchar(128) NOT NULL,
+  CMC_PARTITION varchar(767),
+  CMC_METRIC_TYPE varchar(128) NOT NULL,
+  CMC_METRIC_VALUE integer NOT NULL,

Review comment:
       Is it a possibility that in the future, for whatever reason, we'll want this value to be NULL?

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
##########
@@ -81,6 +81,7 @@
   public void setUp() throws Exception {
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, true);
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true);
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);

Review comment:
       Does this do anything?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -22,7 +22,24 @@
 import org.apache.hadoop.hive.common.StringableMap;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;

Review comment:
       +1, thanks!

##########
File path: standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
##########
@@ -2926,11 +2945,15 @@ PartitionsResponse get_partitions_req(1:PartitionsRequest req)
   void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1)
+  CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName, 2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType type) throws(1:MetaException o1)
+  bool update_compaction_metrics_data(1: CompactionMetricsDataStruct data, 2: i32 version) throws(1:MetaException o1)
+  void add_compaction_metrics_data(1: CompactionMetricsDataStruct data) throws(1:MetaException o1)
+  void remove_compaction_metrics_data(1: string dbName, 2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType type) throws(1:MetaException o1)

Review comment:
       Same as above re: CompactionMetricsDataRequest

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -142,6 +144,9 @@ public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     this.workerName = getWorkerId();
     setName(workerName);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&

Review comment:
       Maybe add a short comment highlighting that the **HMS** metrics need to be on.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {

Review comment:
       Should be >=, since the threshold definition is "The minimum number of active delta files a table/partition must have..."
   
   Same for the others.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -671,6 +679,13 @@ private String getWorkerId() {
     return name.toString();
   }
 
+  private void updateDeltaFilesMetrics(AcidDirectory directory, String dbName, String tableName, String partName,
+      CompactionType type) {
+    if (metricsEnabled) {
+      DeltaFilesMetricReporter.updateMetricsFromWorker(directory, dbName, tableName, partName, type, conf, msc);

Review comment:
       The DeltaFilesMetricReporter instance is located in memory of the HMS, but this Worker thread is probably running on an HS2... how does this work?

##########
File path: service/src/java/org/apache/hive/service/server/HiveServer2.java
##########
@@ -214,9 +214,6 @@ public synchronized void init(HiveConf hiveConf) {
     try {
       if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
         MetricsFactory.init(hiveConf);
-        if (MetastoreConf.getBoolVar(hiveConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-          DeltaFilesMetricReporter.init(hiveConf);

Review comment:
       There is also an unnecessary DeltaFilesMetricReporter.close() in this file.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -310,81 +143,6 @@ private static String getDeltaCountKey(String dbName, String tableName, String p
     return key.toString();
   }
 
-  private static void logDeltaDirMetrics(AcidDirectory dir, Configuration conf, int numObsoleteDeltas, int numDeltas,
-      int numSmallDeltas) {
-    long loggerFrequency = HiveConf
-        .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ACID_METRICS_LOGGER_FREQUENCY, TimeUnit.MILLISECONDS);
-    if (loggerFrequency <= 0) {
-      return;
-    }
-    long currentTime = System.currentTimeMillis();
-    if (lastSuccessfulLoggingTime == 0 || currentTime >= lastSuccessfulLoggingTime + loggerFrequency) {
-      lastSuccessfulLoggingTime = currentTime;
-      if (numDeltas >= HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ACTIVE_DELTA_DIR_THRESHOLD)) {
-        LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " active delta directories. This can " +
-            "cause performance degradation.");
-      }
-
-      if (numObsoleteDeltas >=
-          HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_OBSOLETE_DELTA_DIR_THRESHOLD)) {
-        LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " obsolete delta directories. This can " +
-            "indicate compaction cleaner issues.");
-      }
-
-      if (numSmallDeltas >= HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_SMALL_DELTA_DIR_THRESHOLD)) {
-        LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " small delta directories. This can " +
-            "indicate performance degradation and there might be a problem with your streaming setup.");
-      }
-    }
-  }
-
-  private static int getNumObsoleteDeltas(AcidDirectory dir, long checkThresholdInSec) throws IOException {
-    int numObsoleteDeltas = 0;
-    for (Path obsolete : dir.getObsolete()) {
-      FileStatus stat = dir.getFs().getFileStatus(obsolete);
-      if (System.currentTimeMillis() - stat.getModificationTime() >= checkThresholdInSec * 1000) {
-        numObsoleteDeltas++;
-      }
-    }
-    return numObsoleteDeltas;
-  }
-
-  public static void createCountersForAcidMetrics(TezCounters tezCounters, JobConf jobConf) {
-    if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) &&
-      MetastoreConf.getBoolVar(jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-
-      Arrays.stream(DeltaFilesMetricType.values())
-        .filter(type -> jobConf.get(type.name()) != null)
-        .forEach(type ->
-            Splitter.on(ENTRY_SEPARATOR).withKeyValueSeparator(KEY_VALUE_SEPARATOR).split(jobConf.get(type.name())).forEach(
-              (path, cnt) -> tezCounters.findCounter(type.value, path).setValue(Long.parseLong(cnt))
-            )
-        );
-    }
-  }
-
-  public static void addAcidMetricsToConfObj(EnumMap<DeltaFilesMetricType,
-      Queue<Pair<String, Integer>>> deltaFilesStats, Configuration conf) {
-    try {
-      deltaFilesStats.forEach((type, value) -> conf
-          .set(type.name(), Joiner.on(ENTRY_SEPARATOR).withKeyValueSeparator(KEY_VALUE_SEPARATOR).join(value)));
-
-    } catch (Exception e) {
-      LOG.warn("Couldn't add Delta metrics to conf object", e);
-    }
-  }
-
-  public static void backPropagateAcidMetrics(JobConf jobConf, Configuration conf) {
-    if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) &&
-      MetastoreConf.getBoolVar(jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-      try {
-        Arrays.stream(DeltaFilesMetricType.values()).filter(type -> conf.get(type.name()) != null)
-            .forEach(type -> jobConf.set(type.name(), conf.get(type.name())));
-      } catch (Exception e) {
-        LOG.warn("Couldn't back propagate Delta metrics to jobConf object", e);
-      }
-    }
-  }
 
   private static long getBaseSize(AcidDirectory dir) throws IOException {
     long baseSize = 0;

Review comment:
       What do you think about getting rid of this bit?
   ```
   for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles()) {
           baseSize += origStat.getFileStatus().getLen();
   ```

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_DELTAS, numDeltas,
+            txnHandler);
+      }
+
+      if (numSmallDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_SMALL_DELTAS,
+            numSmallDeltas, txnHandler);
+      }
+
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, txnHandler);
+      }
+
+      LOG.debug("Finished updating delta file metrics from initiator.\n deltaPctThreshold = {}, deltasThreshold = {}, "
+          + "obsoleteDeltasThreshold = {}, numDeltas = {}, numSmallDeltas = {},  numObsoleteDeltas = {}",
+          deltaPctThreshold, deltasThreshold, obsoleteDeltasThreshold, numDeltas, numSmallDeltas, numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
+    }
+  }
+
+  public static void updateMetricsFromWorker(AcidDirectory directory, String dbName, String tableName, String partitionName,
+      CompactionType type, Configuration conf, IMetaStoreClient client) {
+    LOG.debug("Updating delta file metrics from worker");
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // we have an instance of the AcidDirectory before the compaction worker was started
+      // from this we can get how many delta directories existed
+      // the previously active delta directories are now moved to obsolete
+      int numObsoleteDeltas = directory.getCurrentDirectories().size();
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, client);
+      }
+
+      // We don't know the size of the newly create delta directories, that would require a fresh AcidDirectory
+      // Clear the small delta num counter from the cache for this key
+      client.removeCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_SMALL_DELTAS);
+
+      // The new number of active delta dirs are either 0, 1 or 2.
+      // If we ran MAJOR compaction, no new delta is created, just base dir
+      // If we ran MINOR compaction, we can have 1 or 2 new delta dirs, depending on whether we had deltas or
+      // delete deltas.
+      if (type == CompactionType.MAJOR) {
+        client.removeCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS);
+      } else {
+        int numNewDeltas = 0;
+        // check whether we had deltas
+        if (directory.getDeleteDeltas().size() > 0) {
+          numNewDeltas++;
+        }
+
+        // if the size of the current dirs is bigger than the size of delete deltas, it means we have active deltas
+        if (directory.getCurrentDirectories().size() > directory.getDeleteDeltas().size()) {
+          numNewDeltas++;
+        }
+
+        // recalculate the delta count
+        CompactionMetricsDataStruct prevDelta =
+            client.getCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS)
+                .getData();
+        int deltaNum = numNewDeltas;
+        if (prevDelta != null) {
+          deltaNum += prevDelta.getMetricvalue() - directory.getCurrentDirectories().size();
+        }
+        if (deltaNum > deltasThreshold) {
+          updateMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS, deltaNum, client);
+        } else {
+          client.removeCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS);
+        }
+      }
+
+      LOG.debug("Finished updating delta file metrics from worker.\n deltasThreshold = {}, "
+              + "obsoleteDeltasThreshold = {}, numObsoleteDeltas = {}",
+          deltasThreshold, obsoleteDeltasThreshold, numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
+    }
+  }
+
+  public static void updateMetricsFromCleaner(String dbName, String tableName, String partitionName,
+      int deletedFilesCount, Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from cleaner");
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    try {
+      CompactionMetricsData prevObsoleteDelta =
+          txnHandler.getCompactionMetricsData(dbName, tableName, partitionName,
+              CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS);
+      int numObsoleteDeltas = 0;
+      if (prevObsoleteDelta != null) {
+        numObsoleteDeltas = prevObsoleteDelta.getMetricValue() - deletedFilesCount;
+        if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+          updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS,
+              numObsoleteDeltas, txnHandler);
+        } else {
+          txnHandler.removeCompactionMetricsData(dbName, tableName, partitionName,
+              CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS);
+        }
+      }
+
+      LOG.debug("Finished updating delta file metrics from cleaner.\n obsoleteDeltasThreshold = {}, "
+              + "numObsoleteDeltas = {}", obsoleteDeltasThreshold, numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
+    }
+  }
+
+  private static void updateMetrics(String dbName, String tblName, String partitionName,
+      CompactionMetricsData.MetricType type, int numDeltas, TxnStore txnHandler) throws MetaException {
+    CompactionMetricsData delta = txnHandler.getCompactionMetricsData(dbName, tblName, partitionName, type);

Review comment:
       Nit: The name `delta` is a bit confusing

##########
File path: standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
##########
@@ -2926,11 +2945,15 @@ PartitionsResponse get_partitions_req(1:PartitionsRequest req)
   void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1)
+  CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName, 2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType type) throws(1:MetaException o1)
+  bool update_compaction_metrics_data(1: CompactionMetricsDataStruct data, 2: i32 version) throws(1:MetaException o1)

Review comment:
       Just a thought - we're inserting data (not updating) iff version == 1, otherwise it's an update. Do you think it would make sense to combine update_compaction_metrics_data and add_compaction_metrics_data?

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                .metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + StringUtils.stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getCompactionMetricsData(dbName, tblName, partitionName, type);
+    }
+  }
+
+  @Override
+  public List<CompactionMetricsData> getTopCompactionMetricsDataPerType(int limit)
+      throws MetaException {
+    Connection dbConn = null;
+    List<CompactionMetricsData> metricsDataList = new ArrayList<>();
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        for (CompactionMetricsData.MetricType type : CompactionMetricsData.MetricType.values()) {
+          String query = sqlGenerator.addLimitClause(limit, NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY);
+          try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+            pstmt.setString(1, type.toString());
+            ResultSet resultSet = pstmt.executeQuery();
+            while (resultSet.next()) {
+              CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+              metricsDataList.add(builder
+                  .dbName(resultSet.getString(1))
+                  .tblName(resultSet.getString(2))
+                  .partitionName(resultSet.getString(3))
+                  .metricType(type)
+                  .metricValue(resultSet.getInt(5))
+                  .version(resultSet.getInt(6))
+                  .build());
+            }
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to getCompactionMetricsDataForType");
+        checkRetryable(e, "getCompactionMetricsDataForType");
+        throw new MetaException("Unable to execute getCompactionMetricsDataForType()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getTopCompactionMetricsDataPerType(limit);
+    }
+    return metricsDataList;
+  }
+
+  @Override
+  public boolean updateCompactionMetricsData(CompactionMetricsData data, int version) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY;
+        if (data.getPartitionName() != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setInt(1, data.getMetricValue());
+          pstmt.setInt(2, data.getVersion());
+          pstmt.setString(3, data.getDbName());
+          pstmt.setString(4, data.getTblName());
+          pstmt.setString(5, data.getMetricType().toString());
+          pstmt.setInt(6, version);
+          if (data.getPartitionName() != null) {
+            pstmt.setString(7, data.getPartitionName());
+          }
+          boolean updateRes = pstmt.executeUpdate() > 0;
+          dbConn.commit();
+          return updateRes;
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "updateCompactionMetricsData(" + data + ", " + version + ")");
+        throw new MetaException("Unable to execute updateCompactionMetricsData()" + stringifyException(e));

Review comment:
       Same as above (re: throwing exceptions)

##########
File path: common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
##########
@@ -34,10 +34,10 @@
   /**
    * Initializes static Metrics instance.
    */
-  public synchronized static void init(HiveConf conf) throws Exception {
+  public synchronized static void init(Configuration conf) throws Exception {

Review comment:
       Why are the changes in this file necessary?

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                .metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + StringUtils.stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getCompactionMetricsData(dbName, tblName, partitionName, type);
+    }
+  }
+
+  @Override
+  public List<CompactionMetricsData> getTopCompactionMetricsDataPerType(int limit)
+      throws MetaException {
+    Connection dbConn = null;
+    List<CompactionMetricsData> metricsDataList = new ArrayList<>();
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        for (CompactionMetricsData.MetricType type : CompactionMetricsData.MetricType.values()) {
+          String query = sqlGenerator.addLimitClause(limit, NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY);
+          try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+            pstmt.setString(1, type.toString());
+            ResultSet resultSet = pstmt.executeQuery();
+            while (resultSet.next()) {
+              CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+              metricsDataList.add(builder
+                  .dbName(resultSet.getString(1))
+                  .tblName(resultSet.getString(2))
+                  .partitionName(resultSet.getString(3))
+                  .metricType(type)
+                  .metricValue(resultSet.getInt(5))
+                  .version(resultSet.getInt(6))
+                  .build());
+            }
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to getCompactionMetricsDataForType");
+        checkRetryable(e, "getCompactionMetricsDataForType");
+        throw new MetaException("Unable to execute getCompactionMetricsDataForType()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getTopCompactionMetricsDataPerType(limit);
+    }
+    return metricsDataList;
+  }
+
+  @Override
+  public boolean updateCompactionMetricsData(CompactionMetricsData data, int version) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY;
+        if (data.getPartitionName() != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setInt(1, data.getMetricValue());
+          pstmt.setInt(2, data.getVersion());
+          pstmt.setString(3, data.getDbName());
+          pstmt.setString(4, data.getTblName());
+          pstmt.setString(5, data.getMetricType().toString());
+          pstmt.setInt(6, version);
+          if (data.getPartitionName() != null) {
+            pstmt.setString(7, data.getPartitionName());
+          }
+          boolean updateRes = pstmt.executeUpdate() > 0;
+          dbConn.commit();
+          return updateRes;
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "updateCompactionMetricsData(" + data + ", " + version + ")");
+        throw new MetaException("Unable to execute updateCompactionMetricsData()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      updateCompactionMetricsData(data, version);
+    }
+    return true;
+  }
+
+  @Override
+  public void addCompactionMetricsData(CompactionMetricsData data) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        try (PreparedStatement pstmt = dbConn.prepareStatement(INSERT_COMPACTION_METRICS_CACHE_QUERY)) {
+          pstmt.setString(1, data.getDbName());
+          pstmt.setString(2, data.getTblName());
+          pstmt.setString(3, data.getPartitionName());

Review comment:
       Might be setting partition name to the string "null" – is this what we want?

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                .metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + StringUtils.stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getCompactionMetricsData(dbName, tblName, partitionName, type);
+    }
+  }
+
+  @Override
+  public List<CompactionMetricsData> getTopCompactionMetricsDataPerType(int limit)
+      throws MetaException {
+    Connection dbConn = null;
+    List<CompactionMetricsData> metricsDataList = new ArrayList<>();
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        for (CompactionMetricsData.MetricType type : CompactionMetricsData.MetricType.values()) {
+          String query = sqlGenerator.addLimitClause(limit, NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY);
+          try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+            pstmt.setString(1, type.toString());
+            ResultSet resultSet = pstmt.executeQuery();
+            while (resultSet.next()) {
+              CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+              metricsDataList.add(builder
+                  .dbName(resultSet.getString(1))
+                  .tblName(resultSet.getString(2))
+                  .partitionName(resultSet.getString(3))
+                  .metricType(type)
+                  .metricValue(resultSet.getInt(5))
+                  .version(resultSet.getInt(6))
+                  .build());
+            }
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to getCompactionMetricsDataForType");
+        checkRetryable(e, "getCompactionMetricsDataForType");
+        throw new MetaException("Unable to execute getCompactionMetricsDataForType()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getTopCompactionMetricsDataPerType(limit);
+    }
+    return metricsDataList;
+  }
+
+  @Override
+  public boolean updateCompactionMetricsData(CompactionMetricsData data, int version) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY;
+        if (data.getPartitionName() != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setInt(1, data.getMetricValue());
+          pstmt.setInt(2, data.getVersion());
+          pstmt.setString(3, data.getDbName());
+          pstmt.setString(4, data.getTblName());
+          pstmt.setString(5, data.getMetricType().toString());
+          pstmt.setInt(6, version);
+          if (data.getPartitionName() != null) {
+            pstmt.setString(7, data.getPartitionName());
+          }
+          boolean updateRes = pstmt.executeUpdate() > 0;
+          dbConn.commit();
+          return updateRes;
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "updateCompactionMetricsData(" + data + ", " + version + ")");
+        throw new MetaException("Unable to execute updateCompactionMetricsData()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      updateCompactionMetricsData(data, version);
+    }
+    return true;
+  }
+
+  @Override
+  public void addCompactionMetricsData(CompactionMetricsData data) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        try (PreparedStatement pstmt = dbConn.prepareStatement(INSERT_COMPACTION_METRICS_CACHE_QUERY)) {
+          pstmt.setString(1, data.getDbName());
+          pstmt.setString(2, data.getTblName());
+          pstmt.setString(3, data.getPartitionName());
+          pstmt.setString(4, data.getMetricType().toString());
+          pstmt.setInt(5, data.getMetricValue());
+          pstmt.setInt(6, data.getVersion());
+          pstmt.executeUpdate();
+          dbConn.commit();
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "addCompactionMetricsData(" + data + ")");
+        throw new MetaException("Unable to execute addCompactionMetricsData()" + stringifyException(e));

Review comment:
       Same as above (re: throwing exceptions)

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                .metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + StringUtils.stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getCompactionMetricsData(dbName, tblName, partitionName, type);
+    }
+  }
+
+  @Override
+  public List<CompactionMetricsData> getTopCompactionMetricsDataPerType(int limit)
+      throws MetaException {
+    Connection dbConn = null;
+    List<CompactionMetricsData> metricsDataList = new ArrayList<>();
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        for (CompactionMetricsData.MetricType type : CompactionMetricsData.MetricType.values()) {
+          String query = sqlGenerator.addLimitClause(limit, NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY);
+          try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+            pstmt.setString(1, type.toString());
+            ResultSet resultSet = pstmt.executeQuery();
+            while (resultSet.next()) {
+              CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+              metricsDataList.add(builder
+                  .dbName(resultSet.getString(1))
+                  .tblName(resultSet.getString(2))
+                  .partitionName(resultSet.getString(3))
+                  .metricType(type)
+                  .metricValue(resultSet.getInt(5))
+                  .version(resultSet.getInt(6))
+                  .build());
+            }
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to getCompactionMetricsDataForType");
+        checkRetryable(e, "getCompactionMetricsDataForType");
+        throw new MetaException("Unable to execute getCompactionMetricsDataForType()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getTopCompactionMetricsDataPerType(limit);
+    }
+    return metricsDataList;
+  }
+
+  @Override
+  public boolean updateCompactionMetricsData(CompactionMetricsData data, int version) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY;
+        if (data.getPartitionName() != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setInt(1, data.getMetricValue());
+          pstmt.setInt(2, data.getVersion());
+          pstmt.setString(3, data.getDbName());
+          pstmt.setString(4, data.getTblName());
+          pstmt.setString(5, data.getMetricType().toString());
+          pstmt.setInt(6, version);
+          if (data.getPartitionName() != null) {
+            pstmt.setString(7, data.getPartitionName());
+          }
+          boolean updateRes = pstmt.executeUpdate() > 0;
+          dbConn.commit();
+          return updateRes;
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "updateCompactionMetricsData(" + data + ", " + version + ")");
+        throw new MetaException("Unable to execute updateCompactionMetricsData()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      updateCompactionMetricsData(data, version);
+    }
+    return true;
+  }
+
+  @Override
+  public void addCompactionMetricsData(CompactionMetricsData data) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        try (PreparedStatement pstmt = dbConn.prepareStatement(INSERT_COMPACTION_METRICS_CACHE_QUERY)) {
+          pstmt.setString(1, data.getDbName());
+          pstmt.setString(2, data.getTblName());
+          pstmt.setString(3, data.getPartitionName());
+          pstmt.setString(4, data.getMetricType().toString());
+          pstmt.setInt(5, data.getMetricValue());
+          pstmt.setInt(6, data.getVersion());
+          pstmt.executeUpdate();
+          dbConn.commit();
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "addCompactionMetricsData(" + data + ")");
+        throw new MetaException("Unable to execute addCompactionMetricsData()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      addCompactionMetricsData(data);
+    }
+  }
+
+  public void removeCompactionMetricsData(String dbName, String tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = DELETE_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          pstmt.executeUpdate();
+          dbConn.commit();
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "removeCompactionMetricsData(" + dbName + ", " +  tblName + ", " + partitionName + ", " +
+            type + ")");
+        throw new MetaException("Unable to execute removeCompactionMetricsData()" + stringifyException(e));

Review comment:
       Same as above (re: throwing exceptions)

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -398,13 +156,6 @@ private static long getBaseSize(AcidDirectory dir) throws IOException {
     return baseSize;
   }
 
-  private static long getModificationTime(AcidUtils.ParsedDirectory dir, FileSystem fs) throws IOException {
-    return dir.getFiles(fs, Ref.from(false)).stream()
-      .map(HadoopShims.HdfsFileStatusWithId::getFileStatus)
-      .mapToLong(FileStatus::getModificationTime)
-      .max()
-      .orElse(new Date().getTime());
-  }
 
   private static long getDirSize(AcidUtils.ParsedDirectory dir, FileSystem fs) throws IOException {

Review comment:
       I'm a bit concerned about this method. Do you think it might slow down the Initiator/Worker/Cleaner?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -34,99 +26,60 @@
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CompactionMetricsDataStruct;
+import org.apache.hadoop.hive.metastore.api.CompactionMetricsMetricType;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.metastore.txn.CompactionMetricsData;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.io.AcidDirectory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-
 import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hive.common.util.Ref;
-import org.apache.tez.common.counters.CounterGroup;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.counters.TezCounters;
+import org.apache.thrift.TException;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import java.io.IOException;
-import java.io.Serializable;
 import java.lang.management.ManagementFactory;
-import java.util.Arrays;
 import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.EnumMap;
-import java.util.HashMap;
 import java.util.List;
-import java.util.PriorityQueue;
-import java.util.Queue;
-
-import java.util.Set;
+import java.util.Map;
 import java.util.concurrent.Executors;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_DELTAS;
 import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_OBSOLETE_DELTAS;
 import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_SMALL_DELTAS;
 
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_DELTAS;
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_OBSOLETE_DELTAS;
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_SMALL_DELTAS;
-
 /**
  * Collects and publishes ACID compaction related metrics.
- * Everything should be behind 2 feature flags: {@link HiveConf.ConfVars#HIVE_SERVER2_METRICS_ENABLED} and
+ * Everything should be behind 2 feature flags: {@link MetastoreConf.ConfVars#METRICS_ENABLED} and
  * {@link MetastoreConf.ConfVars#METASTORE_ACIDMETRICS_EXT_ON}.
- * First we store the information in the jobConf, then in Tez Counters, then in a cache stored here, then in a custom
- * MBean.
+ * First we store the information in the HMS backend DB COMPACTION_METRICS_CACHE table, then in a custom MBean.

Review comment:
       Maybe also mention that these values are logged...

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_DELTAS, numDeltas,
+            txnHandler);
+      }
+
+      if (numSmallDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_SMALL_DELTAS,
+            numSmallDeltas, txnHandler);
+      }
+
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, txnHandler);
+      }
+
+      LOG.debug("Finished updating delta file metrics from initiator.\n deltaPctThreshold = {}, deltasThreshold = {}, "
+          + "obsoleteDeltasThreshold = {}, numDeltas = {}, numSmallDeltas = {},  numObsoleteDeltas = {}",
+          deltaPctThreshold, deltasThreshold, obsoleteDeltasThreshold, numDeltas, numSmallDeltas, numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
+    }
+  }
+
+  public static void updateMetricsFromWorker(AcidDirectory directory, String dbName, String tableName, String partitionName,
+      CompactionType type, Configuration conf, IMetaStoreClient client) {
+    LOG.debug("Updating delta file metrics from worker");
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // we have an instance of the AcidDirectory before the compaction worker was started
+      // from this we can get how many delta directories existed
+      // the previously active delta directories are now moved to obsolete
+      int numObsoleteDeltas = directory.getCurrentDirectories().size();
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, client);
+      }
+
+      // We don't know the size of the newly create delta directories, that would require a fresh AcidDirectory
+      // Clear the small delta num counter from the cache for this key
+      client.removeCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_SMALL_DELTAS);
+
+      // The new number of active delta dirs are either 0, 1 or 2.
+      // If we ran MAJOR compaction, no new delta is created, just base dir
+      // If we ran MINOR compaction, we can have 1 or 2 new delta dirs, depending on whether we had deltas or
+      // delete deltas.
+      if (type == CompactionType.MAJOR) {
+        client.removeCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS);
+      } else {
+        int numNewDeltas = 0;
+        // check whether we had deltas
+        if (directory.getDeleteDeltas().size() > 0) {
+          numNewDeltas++;
+        }
+
+        // if the size of the current dirs is bigger than the size of delete deltas, it means we have active deltas
+        if (directory.getCurrentDirectories().size() > directory.getDeleteDeltas().size()) {
+          numNewDeltas++;
+        }
+
+        // recalculate the delta count
+        CompactionMetricsDataStruct prevDelta =
+            client.getCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS)
+                .getData();
+        int deltaNum = numNewDeltas;
+        if (prevDelta != null) {
+          deltaNum += prevDelta.getMetricvalue() - directory.getCurrentDirectories().size();

Review comment:
       There is a high chance that directory.getCurrentDirectories().size() > prevDelta.getMetricvalue() , i.e. more deltas were inserted since the initiator queued this compaction... in this case deltaNum shouldn't be changed

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();

Review comment:
       Should probably be:
   int numObsoleteDeltas = dir.getObsolete().size() + dir.getAbortedDirectories.size()
   
   And the Cleaner's update should also subtract aborted directories.
   
   I'm open to discuss this, though

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa
     }
     StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
-    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, dir.getObsolete().size());
+    return success;

Review comment:
       - Base directories and original files may be in the list of obsolete files the Cleaner deletes. Do you think it would be worth filtering the obsolete/aborted files’ names for “delta”?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa
     }
     StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
-    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, dir.getObsolete().size());

Review comment:
       should probably be only updated if success == true (I believe this means that at least 1 dir was removed)

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -139,157 +92,37 @@ public static DeltaFilesMetricReporter getInstance() {
     return InstanceHolder.instance;
   }
 
-  public static synchronized void init(HiveConf conf) throws Exception {
-    getInstance().configure(conf);
+  public static synchronized void init(Configuration conf, TxnStore txnHandler) throws Exception {
+    if (!initialized) {
+      getInstance().configure(conf, txnHandler);
+      initialized = true;
+    }
   }
 
-  private void configure(HiveConf conf) throws Exception {
+  private void configure(Configuration conf, TxnStore txnHandler) throws Exception {
     long reportingInterval =
-        HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
-    hiveEntitySeparator = conf.getVar(HiveConf.ConfVars.HIVE_ENTITY_SEPARATOR);
+        MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
+
+    maxCacheSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_MAX_CACHE_SIZE);
 
-    initCachesForMetrics(conf);
     initObjectsForMetrics();
 
     ThreadFactory threadFactory =
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter %d").build();
-    executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
-    executorService.scheduleAtFixedRate(new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS);
+    reporterExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+    reporterExecutorService.scheduleAtFixedRate(new ReportingTask(txnHandler), 0, reportingInterval, TimeUnit.SECONDS);
 
     LOG.info("Started DeltaFilesMetricReporter thread");

Review comment:
       This is a duplicate log line

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa
     }
     StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
-    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, dir.getObsolete().size());

Review comment:
       More readable solution : Update only if dir.getObsolete().size() > 0 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org