You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2022/01/04 12:20:30 UTC

[GitHub] [hive] lcspinter opened a new pull request #2916: HIVE-25842: Reimplement delta file metric collection

lcspinter opened a new pull request #2916:
URL: https://github.com/apache/hive/pull/2916


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://cwiki.apache.org/confluence/display/Hive/HowToContribute
     2. Ensure that you have created an issue on the Hive project JIRA: https://issues.apache.org/jira/projects/HIVE/summary
     3. Ensure you have added or run the appropriate tests for your PR: 
     4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]HIVE-XXXXX:  Your PR title ...'.
     5. Be sure to keep the PR description updated to reflect all changes.
     6. Please write your PR title to summarize what this PR proposes.
     7. If possible, provide a concise example to reproduce the issue for a faster review.
   
   -->
   
   ### What changes were proposed in this pull request?
   Move delta metric collection from Tez side to compaction side. All delta file metrics are collected during initiator, worker and cleaner phase.
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   Metrics are collected only when a Tez query runs a table (select * and select count( * ) don't update the metrics)
   Metrics aren't updated after compaction or cleaning after compaction, so users will probably see "issues" with compaction (like many active or obsolete or small deltas) that don't exist.
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description, screenshot and/or a reproducable example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Hive versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   Manual test, unit test
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r787884954



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_DELTAS, numDeltas,
+            txnHandler);
+      }
+
+      if (numSmallDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_SMALL_DELTAS,
+            numSmallDeltas, txnHandler);
+      }
+
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, txnHandler);
+      }
+
+      LOG.debug("Finished updating delta file metrics from initiator.\n deltaPctThreshold = {}, deltasThreshold = {}, "
+          + "obsoleteDeltasThreshold = {}, numDeltas = {}, numSmallDeltas = {},  numObsoleteDeltas = {}",
+          deltaPctThreshold, deltasThreshold, obsoleteDeltasThreshold, numDeltas, numSmallDeltas, numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
+    }
+  }
+
+  public static void updateMetricsFromWorker(AcidDirectory directory, String dbName, String tableName, String partitionName,
+      CompactionType type, Configuration conf, IMetaStoreClient client) {
+    LOG.debug("Updating delta file metrics from worker");
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // we have an instance of the AcidDirectory before the compaction worker was started
+      // from this we can get how many delta directories existed
+      // the previously active delta directories are now moved to obsolete
+      int numObsoleteDeltas = directory.getCurrentDirectories().size();
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, client);
+      }
+
+      // We don't know the size of the newly create delta directories, that would require a fresh AcidDirectory
+      // Clear the small delta num counter from the cache for this key
+      client.removeCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_SMALL_DELTAS);
+
+      // The new number of active delta dirs are either 0, 1 or 2.
+      // If we ran MAJOR compaction, no new delta is created, just base dir
+      // If we ran MINOR compaction, we can have 1 or 2 new delta dirs, depending on whether we had deltas or
+      // delete deltas.
+      if (type == CompactionType.MAJOR) {
+        client.removeCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS);
+      } else {
+        int numNewDeltas = 0;
+        // check whether we had deltas
+        if (directory.getDeleteDeltas().size() > 0) {
+          numNewDeltas++;
+        }
+
+        // if the size of the current dirs is bigger than the size of delete deltas, it means we have active deltas
+        if (directory.getCurrentDirectories().size() > directory.getDeleteDeltas().size()) {
+          numNewDeltas++;
+        }
+
+        // recalculate the delta count
+        CompactionMetricsDataStruct prevDelta =
+            client.getCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS)
+                .getData();
+        int deltaNum = numNewDeltas;
+        if (prevDelta != null) {
+          deltaNum += prevDelta.getMetricvalue() - directory.getCurrentDirectories().size();

Review comment:
       I corrected this part.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] klcopp commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
klcopp commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r787033085



##########
File path: standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
##########
@@ -661,6 +661,16 @@ CREATE TABLE COMPLETED_COMPACTIONS (
 
 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION);
 
+-- HIVE-25842
+CREATE TABLE COMPACTION_METRICS_CACHE (
+  CMC_DATABASE varchar(128) NOT NULL,
+  CMC_TABLE varchar(128) NOT NULL,
+  CMC_PARTITION varchar(767),
+  CMC_METRIC_TYPE varchar(128) NOT NULL,
+  CMC_METRIC_VALUE integer NOT NULL,

Review comment:
       Currently yes, but if the use of this table is expanded in the future, do you think there's any chance we'll want to allow nulls?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] klcopp commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
klcopp commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r789465573



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -412,7 +421,11 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa
     }
     StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
-    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    if (dir.getObsolete().size() > 0) {
+      updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, obsoleteDirs);

Review comment:
       I regret suggesting that we include aborted directories in the obsolete count. 
   1. There are other metrics about aborted directories.
   2. previouslyActiveDeltas - (obsolete + aborted) != currentlyActiveDeltas, so the active delta count would be off.
   
   My bad :/




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] klcopp commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
klcopp commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r787044907



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -435,55 +187,71 @@ private void initObjectsForMetrics() throws Exception {
         .getObjectName());
   }
 
-  private void initCachesForMetrics(HiveConf conf) {
-    int maxCacheSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE);
-    long duration = HiveConf.getTimeVar(conf,
-        HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_CACHE_DURATION, TimeUnit.SECONDS);
-
-    deltaTopN = new PriorityBlockingQueue<>(maxCacheSize, getComparator());
-    smallDeltaTopN = new PriorityBlockingQueue<>(maxCacheSize, getComparator());
-    obsoleteDeltaTopN = new PriorityBlockingQueue<>(maxCacheSize, getComparator());
-
-    deltaCache = CacheBuilder.newBuilder()
-      .expireAfterWrite(duration, TimeUnit.SECONDS)
-      .removalListener(notification -> removalPredicate(deltaTopN, notification))
-      .softValues()
-      .build();
-
-    smallDeltaCache = CacheBuilder.newBuilder()
-      .expireAfterWrite(duration, TimeUnit.SECONDS)
-      .removalListener(notification -> removalPredicate(smallDeltaTopN, notification))
-      .softValues()
-      .build();
-
-    obsoleteDeltaCache = CacheBuilder.newBuilder()
-      .expireAfterWrite(duration, TimeUnit.SECONDS)
-      .removalListener(notification -> removalPredicate(obsoleteDeltaTopN, notification))
-      .softValues()
-      .build();
-  }
-
-  private static Comparator<Pair<String, Integer>> getComparator() {
-    return Comparator.comparing(Pair::getValue);
-  }
+  private final class ReportingTask implements Runnable {
 
-  private void removalPredicate(BlockingQueue<Pair<String, Integer>> topN, RemovalNotification notification) {
-    topN.removeIf(item -> item.getKey().equals(notification.getKey()));
-  }
+    private final TxnStore txnHandler;
 
-  private final class ReportingTask implements Runnable {
+    private ReportingTask(TxnStore txnHandler) {
+      this.txnHandler = txnHandler;
+    }
     @Override
     public void run() {
       Metrics metrics = MetricsFactory.getInstance();
       if (metrics != null) {
-        obsoleteDeltaCache.cleanUp();
-        obsoleteDeltaObject.updateAll(obsoleteDeltaCache.asMap());
+        try {
+          LOG.debug("Called reporting task.");
+          List<CompactionMetricsData> deltas = txnHandler.getTopCompactionMetricsDataPerType(maxCacheSize);
+          Map<String, Integer> deltasMap = deltas.stream()
+              .filter(d -> d.getMetricType() == CompactionMetricsData.MetricType.NUM_DELTAS).collect(
+              Collectors.toMap(item -> getDeltaCountKey(item.getDbName(), item.getTblName(), item.getPartitionName()),
+                  CompactionMetricsData::getMetricValue));
+          deltaObject.updateAll(deltasMap);
+
+          Map<String, Integer> smallDeltasMap = deltas.stream()
+              .filter(d -> d.getMetricType() == CompactionMetricsData.MetricType.NUM_SMALL_DELTAS).collect(
+              Collectors.toMap(item -> getDeltaCountKey(item.getDbName(), item.getTblName(), item.getPartitionName()),
+                  CompactionMetricsData::getMetricValue));
+          smallDeltaObject.updateAll(smallDeltasMap);
+
+          Map<String, Integer> obsoleteDeltasMap = deltas.stream()
+              .filter(d -> d.getMetricType() == CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS).collect(
+              Collectors.toMap(item -> getDeltaCountKey(item.getDbName(), item.getTblName(), item.getPartitionName()),
+                  CompactionMetricsData::getMetricValue));
+          obsoleteDeltaObject.updateAll(obsoleteDeltasMap);
+        } catch (MetaException e) {

Review comment:
       Maybe catch all Throwables here just in case? (and also in run())




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r787544691



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -435,55 +187,71 @@ private void initObjectsForMetrics() throws Exception {
         .getObjectName());
   }
 
-  private void initCachesForMetrics(HiveConf conf) {
-    int maxCacheSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE);
-    long duration = HiveConf.getTimeVar(conf,
-        HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_CACHE_DURATION, TimeUnit.SECONDS);
-
-    deltaTopN = new PriorityBlockingQueue<>(maxCacheSize, getComparator());
-    smallDeltaTopN = new PriorityBlockingQueue<>(maxCacheSize, getComparator());
-    obsoleteDeltaTopN = new PriorityBlockingQueue<>(maxCacheSize, getComparator());
-
-    deltaCache = CacheBuilder.newBuilder()
-      .expireAfterWrite(duration, TimeUnit.SECONDS)
-      .removalListener(notification -> removalPredicate(deltaTopN, notification))
-      .softValues()
-      .build();
-
-    smallDeltaCache = CacheBuilder.newBuilder()
-      .expireAfterWrite(duration, TimeUnit.SECONDS)
-      .removalListener(notification -> removalPredicate(smallDeltaTopN, notification))
-      .softValues()
-      .build();
-
-    obsoleteDeltaCache = CacheBuilder.newBuilder()
-      .expireAfterWrite(duration, TimeUnit.SECONDS)
-      .removalListener(notification -> removalPredicate(obsoleteDeltaTopN, notification))
-      .softValues()
-      .build();
-  }
-
-  private static Comparator<Pair<String, Integer>> getComparator() {
-    return Comparator.comparing(Pair::getValue);
-  }
+  private final class ReportingTask implements Runnable {
 
-  private void removalPredicate(BlockingQueue<Pair<String, Integer>> topN, RemovalNotification notification) {
-    topN.removeIf(item -> item.getKey().equals(notification.getKey()));
-  }
+    private final TxnStore txnHandler;
 
-  private final class ReportingTask implements Runnable {
+    private ReportingTask(TxnStore txnHandler) {
+      this.txnHandler = txnHandler;
+    }
     @Override
     public void run() {
       Metrics metrics = MetricsFactory.getInstance();
       if (metrics != null) {
-        obsoleteDeltaCache.cleanUp();
-        obsoleteDeltaObject.updateAll(obsoleteDeltaCache.asMap());
+        try {
+          LOG.debug("Called reporting task.");
+          List<CompactionMetricsData> deltas = txnHandler.getTopCompactionMetricsDataPerType(maxCacheSize);
+          Map<String, Integer> deltasMap = deltas.stream()
+              .filter(d -> d.getMetricType() == CompactionMetricsData.MetricType.NUM_DELTAS).collect(
+              Collectors.toMap(item -> getDeltaCountKey(item.getDbName(), item.getTblName(), item.getPartitionName()),
+                  CompactionMetricsData::getMetricValue));
+          deltaObject.updateAll(deltasMap);
+
+          Map<String, Integer> smallDeltasMap = deltas.stream()
+              .filter(d -> d.getMetricType() == CompactionMetricsData.MetricType.NUM_SMALL_DELTAS).collect(
+              Collectors.toMap(item -> getDeltaCountKey(item.getDbName(), item.getTblName(), item.getPartitionName()),
+                  CompactionMetricsData::getMetricValue));
+          smallDeltaObject.updateAll(smallDeltasMap);
+
+          Map<String, Integer> obsoleteDeltasMap = deltas.stream()
+              .filter(d -> d.getMetricType() == CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS).collect(
+              Collectors.toMap(item -> getDeltaCountKey(item.getDbName(), item.getTblName(), item.getPartitionName()),
+                  CompactionMetricsData::getMetricValue));
+          obsoleteDeltaObject.updateAll(obsoleteDeltasMap);
+        } catch (MetaException e) {

Review comment:
       Right, I will do that. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r788957639



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -139,157 +92,37 @@ public static DeltaFilesMetricReporter getInstance() {
     return InstanceHolder.instance;
   }
 
-  public static synchronized void init(HiveConf conf) throws Exception {
-    getInstance().configure(conf);
+  public static synchronized void init(Configuration conf, TxnStore txnHandler) throws Exception {
+    if (!initialized) {
+      getInstance().configure(conf, txnHandler);
+      initialized = true;
+    }
   }
 
-  private void configure(HiveConf conf) throws Exception {
+  private void configure(Configuration conf, TxnStore txnHandler) throws Exception {
     long reportingInterval =
-        HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
-    hiveEntitySeparator = conf.getVar(HiveConf.ConfVars.HIVE_ENTITY_SEPARATOR);
+        MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
+
+    maxCacheSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_MAX_CACHE_SIZE);
 
-    initCachesForMetrics(conf);
     initObjectsForMetrics();
 
     ThreadFactory threadFactory =
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter %d").build();
-    executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
-    executorService.scheduleAtFixedRate(new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS);
+    reporterExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+    reporterExecutorService.scheduleAtFixedRate(new ReportingTask(txnHandler), 0, reportingInterval, TimeUnit.SECONDS);
 
     LOG.info("Started DeltaFilesMetricReporter thread");

Review comment:
       Could you please point me to the original one? I cannot find it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r788953657



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r786892581



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -398,13 +156,6 @@ private static long getBaseSize(AcidDirectory dir) throws IOException {
     return baseSize;
   }
 
-  private static long getModificationTime(AcidUtils.ParsedDirectory dir, FileSystem fs) throws IOException {
-    return dir.getFiles(fs, Ref.from(false)).stream()
-      .map(HadoopShims.HdfsFileStatusWithId::getFileStatus)
-      .mapToLong(FileStatus::getModificationTime)
-      .max()
-      .orElse(new Date().getTime());
-  }
 
   private static long getDirSize(AcidUtils.ParsedDirectory dir, FileSystem fs) throws IOException {

Review comment:
       It will slow it down, but there is no other way we could calculate the directory size. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r788965590



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -34,99 +26,60 @@
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CompactionMetricsDataStruct;
+import org.apache.hadoop.hive.metastore.api.CompactionMetricsMetricType;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.metastore.txn.CompactionMetricsData;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.io.AcidDirectory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-
 import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hive.common.util.Ref;
-import org.apache.tez.common.counters.CounterGroup;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.counters.TezCounters;
+import org.apache.thrift.TException;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import java.io.IOException;
-import java.io.Serializable;
 import java.lang.management.ManagementFactory;
-import java.util.Arrays;
 import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.EnumMap;
-import java.util.HashMap;
 import java.util.List;
-import java.util.PriorityQueue;
-import java.util.Queue;
-
-import java.util.Set;
+import java.util.Map;
 import java.util.concurrent.Executors;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_DELTAS;
 import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_OBSOLETE_DELTAS;
 import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_SMALL_DELTAS;
 
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_DELTAS;
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_OBSOLETE_DELTAS;
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_SMALL_DELTAS;
-
 /**
  * Collects and publishes ACID compaction related metrics.
- * Everything should be behind 2 feature flags: {@link HiveConf.ConfVars#HIVE_SERVER2_METRICS_ENABLED} and
+ * Everything should be behind 2 feature flags: {@link MetastoreConf.ConfVars#METRICS_ENABLED} and
  * {@link MetastoreConf.ConfVars#METASTORE_ACIDMETRICS_EXT_ON}.
- * First we store the information in the jobConf, then in Tez Counters, then in a cache stored here, then in a custom
- * MBean.
+ * First we store the information in the HMS backend DB COMPACTION_METRICS_CACHE table, then in a custom MBean.

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] klcopp commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
klcopp commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r788866328



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +260,168 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler, long baseSize, Map<Path, Long> deltaSizes) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        if (deltaSizes.containsKey(delta.getPath())) {
+          long deltaSize = deltaSizes.get(delta.getPath());
+          if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+            numSmallDeltas++;
+          }
+        }
+      }
+
+      int numObsoleteDeltas = filterOutBaseAndOriginalFiles(dir.getObsolete()).size();
+
+      if (numDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_DELTAS, numDeltas,
+            txnHandler);
+      }
+
+      if (numSmallDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_SMALL_DELTAS,
+            numSmallDeltas, txnHandler);
+      }
+
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, txnHandler);
+      }
+
+      LOG.debug("Finished updating delta file metrics from initiator.\n deltaPctThreshold = {}, deltasThreshold = {}, "
+          + "obsoleteDeltasThreshold = {}, numDeltas = {}, numSmallDeltas = {},  numObsoleteDeltas = {}",
+          deltaPctThreshold, deltasThreshold, obsoleteDeltasThreshold, numDeltas, numSmallDeltas, numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
+    }
+  }
+
+  public static void updateMetricsFromWorker(AcidDirectory directory, String dbName, String tableName, String partitionName,
+      CompactionType type, Configuration conf, IMetaStoreClient client) {
+    LOG.debug("Updating delta file metrics from worker");
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // we have an instance of the AcidDirectory before the compaction worker was started
+      // from this we can get how many delta directories existed
+      // the previously active delta directories are now moved to obsolete
+      int numObsoleteDeltas = directory.getCurrentDirectories().size();
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, client);
+      }

Review comment:
       Sorry, one more thing:
   In case of the nightmare scenario where only the worker is reporting metrics, I vote for an 
   else { 
     client.removeCompactionMetricsData(...)
   }
   here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r788981616



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +260,168 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler, long baseSize, Map<Path, Long> deltaSizes) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        if (deltaSizes.containsKey(delta.getPath())) {
+          long deltaSize = deltaSizes.get(delta.getPath());
+          if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+            numSmallDeltas++;
+          }
+        }
+      }
+
+      int numObsoleteDeltas = filterOutBaseAndOriginalFiles(dir.getObsolete()).size();
+
+      if (numDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_DELTAS, numDeltas,
+            txnHandler);
+      }
+
+      if (numSmallDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_SMALL_DELTAS,
+            numSmallDeltas, txnHandler);
+      }
+
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, txnHandler);
+      }
+
+      LOG.debug("Finished updating delta file metrics from initiator.\n deltaPctThreshold = {}, deltasThreshold = {}, "
+          + "obsoleteDeltasThreshold = {}, numDeltas = {}, numSmallDeltas = {},  numObsoleteDeltas = {}",
+          deltaPctThreshold, deltasThreshold, obsoleteDeltasThreshold, numDeltas, numSmallDeltas, numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
+    }
+  }
+
+  public static void updateMetricsFromWorker(AcidDirectory directory, String dbName, String tableName, String partitionName,
+      CompactionType type, Configuration conf, IMetaStoreClient client) {
+    LOG.debug("Updating delta file metrics from worker");
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // we have an instance of the AcidDirectory before the compaction worker was started
+      // from this we can get how many delta directories existed
+      // the previously active delta directories are now moved to obsolete
+      int numObsoleteDeltas = directory.getCurrentDirectories().size();
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, client);
+      }

Review comment:
       Corrected. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r786506022



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -381,17 +397,19 @@ public CompactionType run() throws Exception {
     }
   }
 
-  private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdList writeIds,
-                                                 StorageDescriptor sd, Map<String, String> tblproperties)
+  private AcidDirectory getAcidDirectory(StorageDescriptor sd,ValidWriteIdList writeIds) throws IOException {

Review comment:
       space




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] klcopp commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
klcopp commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r787027526



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -142,6 +144,9 @@ public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     this.workerName = getWorkerId();
     setName(workerName);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON);

Review comment:
       There is a possibility that COMPACTOR_INITIATOR_ON==false on a given HS2 instance (even if the Initiator / Cleaner are running in some HMS somewhere else).
   
   There's already a risk that MetastoreConf.ConfVars.METRICS_ENABLED == false on any given HS2, which means that only the Initiator and Cleaner are updating the metrics, which means the metric values are incorrect. Adding another config (COMPACTOR_INITIATOR_ON) just increases this risk.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r786500771



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa
     }
     StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
-    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, dir.getObsolete().size());

Review comment:
       there are a few remove methods, see line #338 (soft-drop partition), updateDeltaFilesMetrics should be called there as well, or if possible we should move it inside of remove at line #431




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r786756287



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -142,6 +144,9 @@ public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     this.workerName = getWorkerId();
     setName(workerName);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&

Review comment:
       Done

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -87,14 +106,15 @@ public void init(AtomicBoolean stop) throws Exception {
     cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
             conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM),
             COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON);

Review comment:
       It doesn't hurt if we double check :)

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -142,6 +144,9 @@ public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     this.workerName = getWorkerId();
     setName(workerName);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON);

Review comment:
       Do we want to update metrics, when the initiator/Cleaner is not running? Can that be a valid use case?

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
##########
@@ -81,6 +81,7 @@
   public void setUp() throws Exception {
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, true);
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true);
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);

Review comment:
       We need this flag set `true`, otherwise the metrics are not collected. 

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
##########
@@ -8831,6 +8833,34 @@ public void mark_failed(CompactionInfoStruct cr) throws MetaException {
     getTxnHandler().markFailed(CompactionInfo.compactionStructToInfo(cr));
   }
 
+  @Override
+  public CompactionMetricsDataResponse get_compaction_metrics_data(String dbName, String tblName, String partitionName, CompactionMetricsMetricType type) throws MetaException {
+    CompactionMetricsData metricsData =
+        getTxnHandler().getCompactionMetricsData(dbName, tblName, partitionName,
+            CompactionMetricsDataConverter.thriftCompactionMetricType2DbType(type));
+    CompactionMetricsDataResponse response = new CompactionMetricsDataResponse();
+    if (metricsData != null) {
+      response.setData(CompactionMetricsDataConverter.dataToStruct(metricsData));
+    }
+    return response;
+  }
+
+  @Override
+  public boolean update_compaction_metrics_data(CompactionMetricsDataStruct struct, int version) throws MetaException {
+      return getTxnHandler().updateCompactionMetricsData(CompactionMetricsDataConverter.structToData(struct), version);

Review comment:
       Per java doc, the object must be always non-null.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -671,6 +679,13 @@ private String getWorkerId() {
     return name.toString();
   }
 
+  private void updateDeltaFilesMetrics(AcidDirectory directory, String dbName, String tableName, String partName,
+      CompactionType type) {
+    if (metricsEnabled) {
+      DeltaFilesMetricReporter.updateMetricsFromWorker(directory, dbName, tableName, partName, type, conf, msc);

Review comment:
       All the `updateMetricsFrom*` methods are static. They are completely stateless, and the outcome of the metrics computation is stored in the backend DB, which is accessible by all the compaction threads regardless of which process is hosting them. 

##########
File path: service/src/java/org/apache/hive/service/server/HiveServer2.java
##########
@@ -214,9 +214,6 @@ public synchronized void init(HiveConf hiveConf) {
     try {
       if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
         MetricsFactory.init(hiveConf);
-        if (MetastoreConf.getBoolVar(hiveConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-          DeltaFilesMetricReporter.init(hiveConf);

Review comment:
       Good catch!

##########
File path: standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
##########
@@ -661,6 +661,16 @@ CREATE TABLE COMPLETED_COMPACTIONS (
 
 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION);
 
+-- HIVE-25842
+CREATE TABLE COMPACTION_METRICS_CACHE (
+  CMC_DATABASE varchar(128) NOT NULL,
+  CMC_TABLE varchar(128) NOT NULL,
+  CMC_PARTITION varchar(767),
+  CMC_METRIC_TYPE varchar(128) NOT NULL,
+  CMC_METRIC_VALUE integer NOT NULL,

Review comment:
       I think if for some reason the value is null, that row shouldn't be in the table.

##########
File path: standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
##########
@@ -2926,11 +2945,15 @@ PartitionsResponse get_partitions_req(1:PartitionsRequest req)
   void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1)
+  CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName, 2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType type) throws(1:MetaException o1)

Review comment:
       Yes, it makes sense to change the param to a request object. 

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                .metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + StringUtils.stringifyException(e));

Review comment:
       The whole logic on the `DeltaFilesMetricReporter` is wrapped in a huge try-catch block, that catches every `Throwable`, so this shouldn't tackle the compaction threads. 

##########
File path: common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
##########
@@ -34,10 +34,10 @@
   /**
    * Initializes static Metrics instance.
    */
-  public synchronized static void init(HiveConf conf) throws Exception {
+  public synchronized static void init(Configuration conf) throws Exception {

Review comment:
       Two reasons:
   1. It's always better to code to the interface
   2. The initiator, where this method is called, has an instance of HiveMestoreConf. 

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -308,7 +307,22 @@
       "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" FROM \"TXN_COMPONENTS\" " +
           "INNER JOIN \"TXNS\" ON \"TC_TXNID\" = \"TXN_ID\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED +
       " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" HAVING COUNT(\"TXN_ID\") > ?";
-
+  private static final String SELECT_COMPACTION_METRICS_CACHE_QUERY =
+      "SELECT \"CMC_METRIC_VALUE\", \"CMC_VERSION\" FROM \"COMPACTION_METRICS_CACHE\" " +
+      "WHERE \"CMC_DATABASE\" = ? AND \"CMC_TABLE\" = ? AND \"CMC_METRIC_TYPE\" = ?";
+  private static final String NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY =
+      "* FROM \"COMPACTION_METRICS_CACHE\" WHERE \"CMC_METRIC_TYPE\" = ? ORDER BY \"CMC_METRIC_VALUE\" DESC";

Review comment:
       Correct. Changed it to select column names.

##########
File path: standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
##########
@@ -2926,11 +2945,15 @@ PartitionsResponse get_partitions_req(1:PartitionsRequest req)
   void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1)
+  CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName, 2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType type) throws(1:MetaException o1)
+  bool update_compaction_metrics_data(1: CompactionMetricsDataStruct data, 2: i32 version) throws(1:MetaException o1)
+  void add_compaction_metrics_data(1: CompactionMetricsDataStruct data) throws(1:MetaException o1)
+  void remove_compaction_metrics_data(1: string dbName, 2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType type) throws(1:MetaException o1)

Review comment:
       Done

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                .metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + StringUtils.stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getCompactionMetricsData(dbName, tblName, partitionName, type);
+    }
+  }
+
+  @Override
+  public List<CompactionMetricsData> getTopCompactionMetricsDataPerType(int limit)
+      throws MetaException {
+    Connection dbConn = null;
+    List<CompactionMetricsData> metricsDataList = new ArrayList<>();
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        for (CompactionMetricsData.MetricType type : CompactionMetricsData.MetricType.values()) {
+          String query = sqlGenerator.addLimitClause(limit, NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY);
+          try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+            pstmt.setString(1, type.toString());
+            ResultSet resultSet = pstmt.executeQuery();
+            while (resultSet.next()) {
+              CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+              metricsDataList.add(builder
+                  .dbName(resultSet.getString(1))
+                  .tblName(resultSet.getString(2))
+                  .partitionName(resultSet.getString(3))
+                  .metricType(type)
+                  .metricValue(resultSet.getInt(5))
+                  .version(resultSet.getInt(6))
+                  .build());
+            }
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to getCompactionMetricsDataForType");
+        checkRetryable(e, "getCompactionMetricsDataForType");
+        throw new MetaException("Unable to execute getCompactionMetricsDataForType()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getTopCompactionMetricsDataPerType(limit);
+    }
+    return metricsDataList;
+  }
+
+  @Override
+  public boolean updateCompactionMetricsData(CompactionMetricsData data, int version) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY;
+        if (data.getPartitionName() != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setInt(1, data.getMetricValue());
+          pstmt.setInt(2, data.getVersion());
+          pstmt.setString(3, data.getDbName());
+          pstmt.setString(4, data.getTblName());
+          pstmt.setString(5, data.getMetricType().toString());
+          pstmt.setInt(6, version);
+          if (data.getPartitionName() != null) {
+            pstmt.setString(7, data.getPartitionName());
+          }
+          boolean updateRes = pstmt.executeUpdate() > 0;
+          dbConn.commit();
+          return updateRes;
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "updateCompactionMetricsData(" + data + ", " + version + ")");
+        throw new MetaException("Unable to execute updateCompactionMetricsData()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      updateCompactionMetricsData(data, version);
+    }
+    return true;
+  }
+
+  @Override
+  public void addCompactionMetricsData(CompactionMetricsData data) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        try (PreparedStatement pstmt = dbConn.prepareStatement(INSERT_COMPACTION_METRICS_CACHE_QUERY)) {
+          pstmt.setString(1, data.getDbName());
+          pstmt.setString(2, data.getTblName());
+          pstmt.setString(3, data.getPartitionName());

Review comment:
       Per definition, the `CMC_PARTITION` column can accept null values. The `PreparedStatement` can convert java null values to db specific null values. 

##########
File path: standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
##########
@@ -2926,11 +2945,15 @@ PartitionsResponse get_partitions_req(1:PartitionsRequest req)
   void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1)
+  CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName, 2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType type) throws(1:MetaException o1)
+  bool update_compaction_metrics_data(1: CompactionMetricsDataStruct data, 2: i32 version) throws(1:MetaException o1)

Review comment:
       I moved the update and add together.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa
     }
     StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
-    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, dir.getObsolete().size());
+    return success;

Review comment:
       `dir.getObsolote()` doesn't include the aborted files.  We have a different function for that `dir.getAbortedDirectories()`

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -87,14 +106,15 @@ public void init(AtomicBoolean stop) throws Exception {
     cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
             conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM),
             COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&

Review comment:
       Yes, this was intentional

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa
     }
     StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
-    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, dir.getObsolete().size());

Review comment:
       Good idea! 

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa
     }
     StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
-    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, dir.getObsolete().size());

Review comment:
       The soft-drop partition is called when the partition was dropped before the cleaner could clean it. Since the partition was already dropped, the `TxnHandler.cleanupRecords` must have been called, which removes all the records from the table that belongs to that particular partition. 

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -381,17 +397,19 @@ public CompactionType run() throws Exception {
     }
   }
 
-  private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdList writeIds,
-                                                 StorageDescriptor sd, Map<String, String> tblproperties)
+  private AcidDirectory getAcidDirectory(StorageDescriptor sd,ValidWriteIdList writeIds) throws IOException {

Review comment:
       fixed

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -331,6 +339,12 @@ private boolean foundCurrentOrFailedCompactions(ShowCompactResponse compactions,
     }
     return false;
   }
+  
+  private void updateDeltaFilesMetrics(AcidDirectory directory, String dbName, String tableName, String partName) {

Review comment:
       The two methods have different signatures. 

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -671,6 +679,13 @@ private String getWorkerId() {
     return name.toString();
   }
 
+  private void updateDeltaFilesMetrics(AcidDirectory directory, String dbName, String tableName, String partName,

Review comment:
       I will create a connection pool in a follow-up PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] klcopp commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
klcopp commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r789454143



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -139,157 +92,37 @@ public static DeltaFilesMetricReporter getInstance() {
     return InstanceHolder.instance;
   }
 
-  public static synchronized void init(HiveConf conf) throws Exception {
-    getInstance().configure(conf);
+  public static synchronized void init(Configuration conf, TxnStore txnHandler) throws Exception {
+    if (!initialized) {
+      getInstance().configure(conf, txnHandler);
+      initialized = true;
+    }
   }
 
-  private void configure(HiveConf conf) throws Exception {
+  private void configure(Configuration conf, TxnStore txnHandler) throws Exception {
     long reportingInterval =
-        HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
-    hiveEntitySeparator = conf.getVar(HiveConf.ConfVars.HIVE_ENTITY_SEPARATOR);
+        MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
+
+    maxCacheSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_MAX_CACHE_SIZE);
 
-    initCachesForMetrics(conf);
     initObjectsForMetrics();
 
     ThreadFactory threadFactory =
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter %d").build();
-    executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
-    executorService.scheduleAtFixedRate(new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS);
+    reporterExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+    reporterExecutorService.scheduleAtFixedRate(new ReportingTask(txnHandler), 0, reportingInterval, TimeUnit.SECONDS);
 
     LOG.info("Started DeltaFilesMetricReporter thread");

Review comment:
       Never mind, I had reading problems :D 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] klcopp commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
klcopp commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r782811556



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -142,6 +144,9 @@ public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     this.workerName = getWorkerId();
     setName(workerName);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON);

Review comment:
       COMPACTOR_INITIATOR_ON could be false the HS2 running this Worker, but metrics should still be collected.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -87,14 +106,15 @@ public void init(AtomicBoolean stop) throws Exception {
     cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
             conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM),
             COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON);

Review comment:
       COMPACTOR_INITIATOR_ON also controls whether the Cleaner runs, so this line is unnecessary... but if you want to leave it in for posterity/the future, I understand.

##########
File path: standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
##########
@@ -2926,11 +2945,15 @@ PartitionsResponse get_partitions_req(1:PartitionsRequest req)
   void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1)
+  CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName, 2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType type) throws(1:MetaException o1)

Review comment:
       If there's any doubt that the parameters might be changed, I recommend introducing a CompactionMetricsDataRequest object

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java
##########
@@ -39,199 +45,396 @@
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
-import java.util.EnumMap;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_DELTAS;
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_OBSOLETE_DELTAS;
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_SMALL_DELTAS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public class TestDeltaFilesMetrics extends CompactorTest  {
 
   private void setUpHiveConf() {
-    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED, true);
-    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE, 2);
-    HiveConf.setTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_CACHE_DURATION, 7200, TimeUnit.SECONDS);
-    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD, 100);
-    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD, 100);
-    HiveConf.setTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, 1, TimeUnit.SECONDS);
+    MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD, 1);
+    MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD, 1);
+    MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_REPORTING_INTERVAL, 1,
+        TimeUnit.SECONDS);
+    MetastoreConf.setDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD, 0.15f);
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, true);
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON, true);
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
   }
 
-  private void initAndCollectFirstMetrics() throws Exception {
-    MetricsFactory.close();
-    MetricsFactory.init(conf);
+  @After
+  public void tearDown() throws Exception {
+    DeltaFilesMetricReporter.close();
+  }
 
-    DeltaFilesMetricReporter.init(conf);
 
-    TezCounters tezCounters = new TezCounters();
-    tezCounters.findCounter(NUM_OBSOLETE_DELTAS + "", "default.acid/p=1").setValue(200);
-    tezCounters.findCounter(NUM_OBSOLETE_DELTAS + "", "default.acid/p=2").setValue(100);
-    tezCounters.findCounter(NUM_OBSOLETE_DELTAS + "", "default.acid/p=3").setValue(150);
-    tezCounters.findCounter(NUM_OBSOLETE_DELTAS + "", "default.acid_v2").setValue(250);
+  static void verifyMetricsMatch(Map<String, String> expected, Map<String, String> actual) {
+    Assert.assertTrue("Actual metrics " + actual + " don't match expected: " + expected,
+        equivalent(expected, actual));
+  }
 
-    tezCounters.findCounter(NUM_DELTAS + "", "default.acid/p=1").setValue(150);
-    tezCounters.findCounter(NUM_DELTAS + "", "default.acid/p=2").setValue(100);
-    tezCounters.findCounter(NUM_DELTAS + "", "default.acid/p=3").setValue(250);
-    tezCounters.findCounter(NUM_DELTAS + "", "default.acid_v2").setValue(200);
+  private static boolean equivalent(Map<String, String> lhs, Map<String, String> rhs) {
+    return lhs.size() == rhs.size() && Maps.difference(lhs, rhs).areEqual();
+  }
 
-    tezCounters.findCounter(NUM_SMALL_DELTAS + "", "default.acid/p=1").setValue(250);
-    tezCounters.findCounter(NUM_SMALL_DELTAS + "", "default.acid/p=2").setValue(200);
-    tezCounters.findCounter(NUM_SMALL_DELTAS + "", "default.acid/p=3").setValue(150);
-    tezCounters.findCounter(NUM_SMALL_DELTAS + "", "default.acid_v2").setValue(100);
+  static Map<String, String> gaugeToMap(String metric) throws Exception {
+    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    ObjectName oname = new ObjectName(DeltaFilesMetricReporter.OBJECT_NAME_PREFIX + metric);
+    MBeanInfo mbeanInfo = mbs.getMBeanInfo(oname);
 
-    DeltaFilesMetricReporter.getInstance().submit(tezCounters, null);
-    Thread.sleep(1000);
+    Map<String, String> result = new HashMap<>();
+    for (MBeanAttributeInfo attr : mbeanInfo.getAttributes()) {
+      result.put(attr.getName(), String.valueOf(mbs.getAttribute(oname, attr.getName())));
+    }
+    return result;
   }
 
-  @After
-  public void tearDown() {
-    DeltaFilesMetricReporter.close();
+  @Override
+  boolean useHive130DeltaDirName() {
+    return false;
   }
 
   @Test
-  public void testDeltaFilesMetric() throws Exception {
+  public void testDeltaFileMetricPartitionedTable() throws Exception {
     setUpHiveConf();
-    initAndCollectFirstMetrics();
+    String dbName = "default";
+    String tblName = "dp";
+    String partName = "ds=part1";
 
+    Table t = newTable(dbName, tblName, true);
+    List<LockComponent> components = new ArrayList<>();
+
+    Partition p = newPartition(t, "part1");
+    addBaseFile(t, p, 20L, 20);
+    addDeltaFile(t, p, 21L, 22L, 2);
+    addDeltaFile(t, p, 23L, 24L, 20);
+
+    components.add(createLockComponent(dbName, tblName, partName));
+
+    burnThroughTransactions(dbName, tblName, 23);
+    long txnid = openTxn();
+
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    Assert.assertEquals(LockState.ACQUIRED, res.getState());
+
+    long writeid = allocateWriteId(dbName, tblName, txnid);
+    Assert.assertEquals(24, writeid);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    startInitiator();
+
+    TimeUnit.SECONDS.sleep(1);

Review comment:
       Since the reporting interval is also set to 1 s, just to be on the safe side, maybe increase this sleep to 1.5 s?

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                .metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + StringUtils.stringifyException(e));

Review comment:
       We should be really careful about throwing exceptions... if metric collection fails, the Initiator/Worker/Cleaner should continue with no issues.
   (Also, IMO logging a warning is enough, but it's up to you)

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -308,7 +307,22 @@
       "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" FROM \"TXN_COMPONENTS\" " +
           "INNER JOIN \"TXNS\" ON \"TC_TXNID\" = \"TXN_ID\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED +
       " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" HAVING COUNT(\"TXN_ID\") > ?";
-
+  private static final String SELECT_COMPACTION_METRICS_CACHE_QUERY =
+      "SELECT \"CMC_METRIC_VALUE\", \"CMC_VERSION\" FROM \"COMPACTION_METRICS_CACHE\" " +
+      "WHERE \"CMC_DATABASE\" = ? AND \"CMC_TABLE\" = ? AND \"CMC_METRIC_TYPE\" = ?";
+  private static final String NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY =
+      "* FROM \"COMPACTION_METRICS_CACHE\" WHERE \"CMC_METRIC_TYPE\" = ? ORDER BY \"CMC_METRIC_VALUE\" DESC";

Review comment:
       Since the COMPACTION_METRICS_CACHE schema might change in the future, it's better to select each column by name instead of "select *"

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_DELTAS, numDeltas,

Review comment:
       For example, if the threshold is deltas=5, there are deltas=7 in the cache, but now there are 2 deltas in this AcidDir: Will the cache entry be removed?

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
##########
@@ -8831,6 +8833,34 @@ public void mark_failed(CompactionInfoStruct cr) throws MetaException {
     getTxnHandler().markFailed(CompactionInfo.compactionStructToInfo(cr));
   }
 
+  @Override
+  public CompactionMetricsDataResponse get_compaction_metrics_data(String dbName, String tblName, String partitionName, CompactionMetricsMetricType type) throws MetaException {
+    CompactionMetricsData metricsData =
+        getTxnHandler().getCompactionMetricsData(dbName, tblName, partitionName,
+            CompactionMetricsDataConverter.thriftCompactionMetricType2DbType(type));
+    CompactionMetricsDataResponse response = new CompactionMetricsDataResponse();
+    if (metricsData != null) {
+      response.setData(CompactionMetricsDataConverter.dataToStruct(metricsData));
+    }
+    return response;
+  }
+
+  @Override
+  public boolean update_compaction_metrics_data(CompactionMetricsDataStruct struct, int version) throws MetaException {
+      return getTxnHandler().updateCompactionMetricsData(CompactionMetricsDataConverter.structToData(struct), version);

Review comment:
       Could `struct` be null?

##########
File path: standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
##########
@@ -661,6 +661,16 @@ CREATE TABLE COMPLETED_COMPACTIONS (
 
 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION);
 
+-- HIVE-25842
+CREATE TABLE COMPACTION_METRICS_CACHE (
+  CMC_DATABASE varchar(128) NOT NULL,
+  CMC_TABLE varchar(128) NOT NULL,
+  CMC_PARTITION varchar(767),
+  CMC_METRIC_TYPE varchar(128) NOT NULL,
+  CMC_METRIC_VALUE integer NOT NULL,

Review comment:
       Is it a possibility that in the future, for whatever reason, we'll want this value to be NULL?

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
##########
@@ -81,6 +81,7 @@
   public void setUp() throws Exception {
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, true);
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true);
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);

Review comment:
       Does this do anything?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -22,7 +22,24 @@
 import org.apache.hadoop.hive.common.StringableMap;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;

Review comment:
       +1, thanks!

##########
File path: standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
##########
@@ -2926,11 +2945,15 @@ PartitionsResponse get_partitions_req(1:PartitionsRequest req)
   void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1)
+  CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName, 2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType type) throws(1:MetaException o1)
+  bool update_compaction_metrics_data(1: CompactionMetricsDataStruct data, 2: i32 version) throws(1:MetaException o1)
+  void add_compaction_metrics_data(1: CompactionMetricsDataStruct data) throws(1:MetaException o1)
+  void remove_compaction_metrics_data(1: string dbName, 2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType type) throws(1:MetaException o1)

Review comment:
       Same as above re: CompactionMetricsDataRequest

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -142,6 +144,9 @@ public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     this.workerName = getWorkerId();
     setName(workerName);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&

Review comment:
       Maybe add a short comment highlighting that the **HMS** metrics need to be on.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {

Review comment:
       Should be >=, since the threshold definition is "The minimum number of active delta files a table/partition must have..."
   
   Same for the others.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -671,6 +679,13 @@ private String getWorkerId() {
     return name.toString();
   }
 
+  private void updateDeltaFilesMetrics(AcidDirectory directory, String dbName, String tableName, String partName,
+      CompactionType type) {
+    if (metricsEnabled) {
+      DeltaFilesMetricReporter.updateMetricsFromWorker(directory, dbName, tableName, partName, type, conf, msc);

Review comment:
       The DeltaFilesMetricReporter instance is located in memory of the HMS, but this Worker thread is probably running on an HS2... how does this work?

##########
File path: service/src/java/org/apache/hive/service/server/HiveServer2.java
##########
@@ -214,9 +214,6 @@ public synchronized void init(HiveConf hiveConf) {
     try {
       if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
         MetricsFactory.init(hiveConf);
-        if (MetastoreConf.getBoolVar(hiveConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-          DeltaFilesMetricReporter.init(hiveConf);

Review comment:
       There is also an unnecessary DeltaFilesMetricReporter.close() in this file.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -310,81 +143,6 @@ private static String getDeltaCountKey(String dbName, String tableName, String p
     return key.toString();
   }
 
-  private static void logDeltaDirMetrics(AcidDirectory dir, Configuration conf, int numObsoleteDeltas, int numDeltas,
-      int numSmallDeltas) {
-    long loggerFrequency = HiveConf
-        .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ACID_METRICS_LOGGER_FREQUENCY, TimeUnit.MILLISECONDS);
-    if (loggerFrequency <= 0) {
-      return;
-    }
-    long currentTime = System.currentTimeMillis();
-    if (lastSuccessfulLoggingTime == 0 || currentTime >= lastSuccessfulLoggingTime + loggerFrequency) {
-      lastSuccessfulLoggingTime = currentTime;
-      if (numDeltas >= HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ACTIVE_DELTA_DIR_THRESHOLD)) {
-        LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " active delta directories. This can " +
-            "cause performance degradation.");
-      }
-
-      if (numObsoleteDeltas >=
-          HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_OBSOLETE_DELTA_DIR_THRESHOLD)) {
-        LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " obsolete delta directories. This can " +
-            "indicate compaction cleaner issues.");
-      }
-
-      if (numSmallDeltas >= HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_SMALL_DELTA_DIR_THRESHOLD)) {
-        LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " small delta directories. This can " +
-            "indicate performance degradation and there might be a problem with your streaming setup.");
-      }
-    }
-  }
-
-  private static int getNumObsoleteDeltas(AcidDirectory dir, long checkThresholdInSec) throws IOException {
-    int numObsoleteDeltas = 0;
-    for (Path obsolete : dir.getObsolete()) {
-      FileStatus stat = dir.getFs().getFileStatus(obsolete);
-      if (System.currentTimeMillis() - stat.getModificationTime() >= checkThresholdInSec * 1000) {
-        numObsoleteDeltas++;
-      }
-    }
-    return numObsoleteDeltas;
-  }
-
-  public static void createCountersForAcidMetrics(TezCounters tezCounters, JobConf jobConf) {
-    if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) &&
-      MetastoreConf.getBoolVar(jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-
-      Arrays.stream(DeltaFilesMetricType.values())
-        .filter(type -> jobConf.get(type.name()) != null)
-        .forEach(type ->
-            Splitter.on(ENTRY_SEPARATOR).withKeyValueSeparator(KEY_VALUE_SEPARATOR).split(jobConf.get(type.name())).forEach(
-              (path, cnt) -> tezCounters.findCounter(type.value, path).setValue(Long.parseLong(cnt))
-            )
-        );
-    }
-  }
-
-  public static void addAcidMetricsToConfObj(EnumMap<DeltaFilesMetricType,
-      Queue<Pair<String, Integer>>> deltaFilesStats, Configuration conf) {
-    try {
-      deltaFilesStats.forEach((type, value) -> conf
-          .set(type.name(), Joiner.on(ENTRY_SEPARATOR).withKeyValueSeparator(KEY_VALUE_SEPARATOR).join(value)));
-
-    } catch (Exception e) {
-      LOG.warn("Couldn't add Delta metrics to conf object", e);
-    }
-  }
-
-  public static void backPropagateAcidMetrics(JobConf jobConf, Configuration conf) {
-    if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) &&
-      MetastoreConf.getBoolVar(jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-      try {
-        Arrays.stream(DeltaFilesMetricType.values()).filter(type -> conf.get(type.name()) != null)
-            .forEach(type -> jobConf.set(type.name(), conf.get(type.name())));
-      } catch (Exception e) {
-        LOG.warn("Couldn't back propagate Delta metrics to jobConf object", e);
-      }
-    }
-  }
 
   private static long getBaseSize(AcidDirectory dir) throws IOException {
     long baseSize = 0;

Review comment:
       What do you think about getting rid of this bit?
   ```
   for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles()) {
           baseSize += origStat.getFileStatus().getLen();
   ```

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_DELTAS, numDeltas,
+            txnHandler);
+      }
+
+      if (numSmallDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_SMALL_DELTAS,
+            numSmallDeltas, txnHandler);
+      }
+
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, txnHandler);
+      }
+
+      LOG.debug("Finished updating delta file metrics from initiator.\n deltaPctThreshold = {}, deltasThreshold = {}, "
+          + "obsoleteDeltasThreshold = {}, numDeltas = {}, numSmallDeltas = {},  numObsoleteDeltas = {}",
+          deltaPctThreshold, deltasThreshold, obsoleteDeltasThreshold, numDeltas, numSmallDeltas, numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
+    }
+  }
+
+  public static void updateMetricsFromWorker(AcidDirectory directory, String dbName, String tableName, String partitionName,
+      CompactionType type, Configuration conf, IMetaStoreClient client) {
+    LOG.debug("Updating delta file metrics from worker");
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // we have an instance of the AcidDirectory before the compaction worker was started
+      // from this we can get how many delta directories existed
+      // the previously active delta directories are now moved to obsolete
+      int numObsoleteDeltas = directory.getCurrentDirectories().size();
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, client);
+      }
+
+      // We don't know the size of the newly create delta directories, that would require a fresh AcidDirectory
+      // Clear the small delta num counter from the cache for this key
+      client.removeCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_SMALL_DELTAS);
+
+      // The new number of active delta dirs are either 0, 1 or 2.
+      // If we ran MAJOR compaction, no new delta is created, just base dir
+      // If we ran MINOR compaction, we can have 1 or 2 new delta dirs, depending on whether we had deltas or
+      // delete deltas.
+      if (type == CompactionType.MAJOR) {
+        client.removeCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS);
+      } else {
+        int numNewDeltas = 0;
+        // check whether we had deltas
+        if (directory.getDeleteDeltas().size() > 0) {
+          numNewDeltas++;
+        }
+
+        // if the size of the current dirs is bigger than the size of delete deltas, it means we have active deltas
+        if (directory.getCurrentDirectories().size() > directory.getDeleteDeltas().size()) {
+          numNewDeltas++;
+        }
+
+        // recalculate the delta count
+        CompactionMetricsDataStruct prevDelta =
+            client.getCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS)
+                .getData();
+        int deltaNum = numNewDeltas;
+        if (prevDelta != null) {
+          deltaNum += prevDelta.getMetricvalue() - directory.getCurrentDirectories().size();
+        }
+        if (deltaNum > deltasThreshold) {
+          updateMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS, deltaNum, client);
+        } else {
+          client.removeCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS);
+        }
+      }
+
+      LOG.debug("Finished updating delta file metrics from worker.\n deltasThreshold = {}, "
+              + "obsoleteDeltasThreshold = {}, numObsoleteDeltas = {}",
+          deltasThreshold, obsoleteDeltasThreshold, numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
+    }
+  }
+
+  public static void updateMetricsFromCleaner(String dbName, String tableName, String partitionName,
+      int deletedFilesCount, Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from cleaner");
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    try {
+      CompactionMetricsData prevObsoleteDelta =
+          txnHandler.getCompactionMetricsData(dbName, tableName, partitionName,
+              CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS);
+      int numObsoleteDeltas = 0;
+      if (prevObsoleteDelta != null) {
+        numObsoleteDeltas = prevObsoleteDelta.getMetricValue() - deletedFilesCount;
+        if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+          updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS,
+              numObsoleteDeltas, txnHandler);
+        } else {
+          txnHandler.removeCompactionMetricsData(dbName, tableName, partitionName,
+              CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS);
+        }
+      }
+
+      LOG.debug("Finished updating delta file metrics from cleaner.\n obsoleteDeltasThreshold = {}, "
+              + "numObsoleteDeltas = {}", obsoleteDeltasThreshold, numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
+    }
+  }
+
+  private static void updateMetrics(String dbName, String tblName, String partitionName,
+      CompactionMetricsData.MetricType type, int numDeltas, TxnStore txnHandler) throws MetaException {
+    CompactionMetricsData delta = txnHandler.getCompactionMetricsData(dbName, tblName, partitionName, type);

Review comment:
       Nit: The name `delta` is a bit confusing

##########
File path: standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
##########
@@ -2926,11 +2945,15 @@ PartitionsResponse get_partitions_req(1:PartitionsRequest req)
   void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1)
   void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1)
+  CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName, 2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType type) throws(1:MetaException o1)
+  bool update_compaction_metrics_data(1: CompactionMetricsDataStruct data, 2: i32 version) throws(1:MetaException o1)

Review comment:
       Just a thought - we're inserting data (not updating) iff version == 1, otherwise it's an update. Do you think it would make sense to combine update_compaction_metrics_data and add_compaction_metrics_data?

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                .metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + StringUtils.stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getCompactionMetricsData(dbName, tblName, partitionName, type);
+    }
+  }
+
+  @Override
+  public List<CompactionMetricsData> getTopCompactionMetricsDataPerType(int limit)
+      throws MetaException {
+    Connection dbConn = null;
+    List<CompactionMetricsData> metricsDataList = new ArrayList<>();
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        for (CompactionMetricsData.MetricType type : CompactionMetricsData.MetricType.values()) {
+          String query = sqlGenerator.addLimitClause(limit, NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY);
+          try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+            pstmt.setString(1, type.toString());
+            ResultSet resultSet = pstmt.executeQuery();
+            while (resultSet.next()) {
+              CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+              metricsDataList.add(builder
+                  .dbName(resultSet.getString(1))
+                  .tblName(resultSet.getString(2))
+                  .partitionName(resultSet.getString(3))
+                  .metricType(type)
+                  .metricValue(resultSet.getInt(5))
+                  .version(resultSet.getInt(6))
+                  .build());
+            }
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to getCompactionMetricsDataForType");
+        checkRetryable(e, "getCompactionMetricsDataForType");
+        throw new MetaException("Unable to execute getCompactionMetricsDataForType()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getTopCompactionMetricsDataPerType(limit);
+    }
+    return metricsDataList;
+  }
+
+  @Override
+  public boolean updateCompactionMetricsData(CompactionMetricsData data, int version) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY;
+        if (data.getPartitionName() != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setInt(1, data.getMetricValue());
+          pstmt.setInt(2, data.getVersion());
+          pstmt.setString(3, data.getDbName());
+          pstmt.setString(4, data.getTblName());
+          pstmt.setString(5, data.getMetricType().toString());
+          pstmt.setInt(6, version);
+          if (data.getPartitionName() != null) {
+            pstmt.setString(7, data.getPartitionName());
+          }
+          boolean updateRes = pstmt.executeUpdate() > 0;
+          dbConn.commit();
+          return updateRes;
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "updateCompactionMetricsData(" + data + ", " + version + ")");
+        throw new MetaException("Unable to execute updateCompactionMetricsData()" + stringifyException(e));

Review comment:
       Same as above (re: throwing exceptions)

##########
File path: common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java
##########
@@ -34,10 +34,10 @@
   /**
    * Initializes static Metrics instance.
    */
-  public synchronized static void init(HiveConf conf) throws Exception {
+  public synchronized static void init(Configuration conf) throws Exception {

Review comment:
       Why are the changes in this file necessary?

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                .metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + StringUtils.stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getCompactionMetricsData(dbName, tblName, partitionName, type);
+    }
+  }
+
+  @Override
+  public List<CompactionMetricsData> getTopCompactionMetricsDataPerType(int limit)
+      throws MetaException {
+    Connection dbConn = null;
+    List<CompactionMetricsData> metricsDataList = new ArrayList<>();
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        for (CompactionMetricsData.MetricType type : CompactionMetricsData.MetricType.values()) {
+          String query = sqlGenerator.addLimitClause(limit, NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY);
+          try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+            pstmt.setString(1, type.toString());
+            ResultSet resultSet = pstmt.executeQuery();
+            while (resultSet.next()) {
+              CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+              metricsDataList.add(builder
+                  .dbName(resultSet.getString(1))
+                  .tblName(resultSet.getString(2))
+                  .partitionName(resultSet.getString(3))
+                  .metricType(type)
+                  .metricValue(resultSet.getInt(5))
+                  .version(resultSet.getInt(6))
+                  .build());
+            }
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to getCompactionMetricsDataForType");
+        checkRetryable(e, "getCompactionMetricsDataForType");
+        throw new MetaException("Unable to execute getCompactionMetricsDataForType()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getTopCompactionMetricsDataPerType(limit);
+    }
+    return metricsDataList;
+  }
+
+  @Override
+  public boolean updateCompactionMetricsData(CompactionMetricsData data, int version) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY;
+        if (data.getPartitionName() != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setInt(1, data.getMetricValue());
+          pstmt.setInt(2, data.getVersion());
+          pstmt.setString(3, data.getDbName());
+          pstmt.setString(4, data.getTblName());
+          pstmt.setString(5, data.getMetricType().toString());
+          pstmt.setInt(6, version);
+          if (data.getPartitionName() != null) {
+            pstmt.setString(7, data.getPartitionName());
+          }
+          boolean updateRes = pstmt.executeUpdate() > 0;
+          dbConn.commit();
+          return updateRes;
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "updateCompactionMetricsData(" + data + ", " + version + ")");
+        throw new MetaException("Unable to execute updateCompactionMetricsData()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      updateCompactionMetricsData(data, version);
+    }
+    return true;
+  }
+
+  @Override
+  public void addCompactionMetricsData(CompactionMetricsData data) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        try (PreparedStatement pstmt = dbConn.prepareStatement(INSERT_COMPACTION_METRICS_CACHE_QUERY)) {
+          pstmt.setString(1, data.getDbName());
+          pstmt.setString(2, data.getTblName());
+          pstmt.setString(3, data.getPartitionName());

Review comment:
       Might be setting partition name to the string "null" – is this what we want?

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                .metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + StringUtils.stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getCompactionMetricsData(dbName, tblName, partitionName, type);
+    }
+  }
+
+  @Override
+  public List<CompactionMetricsData> getTopCompactionMetricsDataPerType(int limit)
+      throws MetaException {
+    Connection dbConn = null;
+    List<CompactionMetricsData> metricsDataList = new ArrayList<>();
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        for (CompactionMetricsData.MetricType type : CompactionMetricsData.MetricType.values()) {
+          String query = sqlGenerator.addLimitClause(limit, NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY);
+          try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+            pstmt.setString(1, type.toString());
+            ResultSet resultSet = pstmt.executeQuery();
+            while (resultSet.next()) {
+              CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+              metricsDataList.add(builder
+                  .dbName(resultSet.getString(1))
+                  .tblName(resultSet.getString(2))
+                  .partitionName(resultSet.getString(3))
+                  .metricType(type)
+                  .metricValue(resultSet.getInt(5))
+                  .version(resultSet.getInt(6))
+                  .build());
+            }
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to getCompactionMetricsDataForType");
+        checkRetryable(e, "getCompactionMetricsDataForType");
+        throw new MetaException("Unable to execute getCompactionMetricsDataForType()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getTopCompactionMetricsDataPerType(limit);
+    }
+    return metricsDataList;
+  }
+
+  @Override
+  public boolean updateCompactionMetricsData(CompactionMetricsData data, int version) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY;
+        if (data.getPartitionName() != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setInt(1, data.getMetricValue());
+          pstmt.setInt(2, data.getVersion());
+          pstmt.setString(3, data.getDbName());
+          pstmt.setString(4, data.getTblName());
+          pstmt.setString(5, data.getMetricType().toString());
+          pstmt.setInt(6, version);
+          if (data.getPartitionName() != null) {
+            pstmt.setString(7, data.getPartitionName());
+          }
+          boolean updateRes = pstmt.executeUpdate() > 0;
+          dbConn.commit();
+          return updateRes;
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "updateCompactionMetricsData(" + data + ", " + version + ")");
+        throw new MetaException("Unable to execute updateCompactionMetricsData()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      updateCompactionMetricsData(data, version);
+    }
+    return true;
+  }
+
+  @Override
+  public void addCompactionMetricsData(CompactionMetricsData data) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        try (PreparedStatement pstmt = dbConn.prepareStatement(INSERT_COMPACTION_METRICS_CACHE_QUERY)) {
+          pstmt.setString(1, data.getDbName());
+          pstmt.setString(2, data.getTblName());
+          pstmt.setString(3, data.getPartitionName());
+          pstmt.setString(4, data.getMetricType().toString());
+          pstmt.setInt(5, data.getMetricValue());
+          pstmt.setInt(6, data.getVersion());
+          pstmt.executeUpdate();
+          dbConn.commit();
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "addCompactionMetricsData(" + data + ")");
+        throw new MetaException("Unable to execute addCompactionMetricsData()" + stringifyException(e));

Review comment:
       Same as above (re: throwing exceptions)

##########
File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
##########
@@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws MetaException {
     }
   }
 
+  @Override
+  public CompactionMetricsData getCompactionMetricsData(String dbName, String tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = SELECT_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          ResultSet resultSet = pstmt.executeQuery();
+          CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+          if (resultSet.next()) {
+            return builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type)
+                .metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build();
+          } else {
+            return null;
+          }
+        }
+
+      } catch (SQLException e) {
+        LOG.error("Unable to getDeltaMetricsInfo");
+        checkRetryable(e, "getDeltaMetricsInfo");
+        throw new MetaException("Unable to execute getDeltaMetricsInfo()" + StringUtils.stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getCompactionMetricsData(dbName, tblName, partitionName, type);
+    }
+  }
+
+  @Override
+  public List<CompactionMetricsData> getTopCompactionMetricsDataPerType(int limit)
+      throws MetaException {
+    Connection dbConn = null;
+    List<CompactionMetricsData> metricsDataList = new ArrayList<>();
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        for (CompactionMetricsData.MetricType type : CompactionMetricsData.MetricType.values()) {
+          String query = sqlGenerator.addLimitClause(limit, NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY);
+          try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+            pstmt.setString(1, type.toString());
+            ResultSet resultSet = pstmt.executeQuery();
+            while (resultSet.next()) {
+              CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder();
+              metricsDataList.add(builder
+                  .dbName(resultSet.getString(1))
+                  .tblName(resultSet.getString(2))
+                  .partitionName(resultSet.getString(3))
+                  .metricType(type)
+                  .metricValue(resultSet.getInt(5))
+                  .version(resultSet.getInt(6))
+                  .build());
+            }
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to getCompactionMetricsDataForType");
+        checkRetryable(e, "getCompactionMetricsDataForType");
+        throw new MetaException("Unable to execute getCompactionMetricsDataForType()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getTopCompactionMetricsDataPerType(limit);
+    }
+    return metricsDataList;
+  }
+
+  @Override
+  public boolean updateCompactionMetricsData(CompactionMetricsData data, int version) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY;
+        if (data.getPartitionName() != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setInt(1, data.getMetricValue());
+          pstmt.setInt(2, data.getVersion());
+          pstmt.setString(3, data.getDbName());
+          pstmt.setString(4, data.getTblName());
+          pstmt.setString(5, data.getMetricType().toString());
+          pstmt.setInt(6, version);
+          if (data.getPartitionName() != null) {
+            pstmt.setString(7, data.getPartitionName());
+          }
+          boolean updateRes = pstmt.executeUpdate() > 0;
+          dbConn.commit();
+          return updateRes;
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "updateCompactionMetricsData(" + data + ", " + version + ")");
+        throw new MetaException("Unable to execute updateCompactionMetricsData()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      updateCompactionMetricsData(data, version);
+    }
+    return true;
+  }
+
+  @Override
+  public void addCompactionMetricsData(CompactionMetricsData data) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        try (PreparedStatement pstmt = dbConn.prepareStatement(INSERT_COMPACTION_METRICS_CACHE_QUERY)) {
+          pstmt.setString(1, data.getDbName());
+          pstmt.setString(2, data.getTblName());
+          pstmt.setString(3, data.getPartitionName());
+          pstmt.setString(4, data.getMetricType().toString());
+          pstmt.setInt(5, data.getMetricValue());
+          pstmt.setInt(6, data.getVersion());
+          pstmt.executeUpdate();
+          dbConn.commit();
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "addCompactionMetricsData(" + data + ")");
+        throw new MetaException("Unable to execute addCompactionMetricsData()" + stringifyException(e));
+      } finally {
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      addCompactionMetricsData(data);
+    }
+  }
+
+  public void removeCompactionMetricsData(String dbName, String tblName, String partitionName,
+      CompactionMetricsData.MetricType type) throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String query = DELETE_COMPACTION_METRICS_CACHE_QUERY;
+        if (partitionName != null) {
+          query += " AND \"CMC_PARTITION\" = ?";
+        } else {
+          query += " AND \"CMC_PARTITION\" IS NULL";
+        }
+        try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
+          pstmt.setString(1, dbName);
+          pstmt.setString(2, tblName);
+          pstmt.setString(3, type.toString());
+          if (partitionName != null) {
+            pstmt.setString(4, partitionName);
+          }
+          pstmt.executeUpdate();
+          dbConn.commit();
+        }
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(e, "removeCompactionMetricsData(" + dbName + ", " +  tblName + ", " + partitionName + ", " +
+            type + ")");
+        throw new MetaException("Unable to execute removeCompactionMetricsData()" + stringifyException(e));

Review comment:
       Same as above (re: throwing exceptions)

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -398,13 +156,6 @@ private static long getBaseSize(AcidDirectory dir) throws IOException {
     return baseSize;
   }
 
-  private static long getModificationTime(AcidUtils.ParsedDirectory dir, FileSystem fs) throws IOException {
-    return dir.getFiles(fs, Ref.from(false)).stream()
-      .map(HadoopShims.HdfsFileStatusWithId::getFileStatus)
-      .mapToLong(FileStatus::getModificationTime)
-      .max()
-      .orElse(new Date().getTime());
-  }
 
   private static long getDirSize(AcidUtils.ParsedDirectory dir, FileSystem fs) throws IOException {

Review comment:
       I'm a bit concerned about this method. Do you think it might slow down the Initiator/Worker/Cleaner?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -34,99 +26,60 @@
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CompactionMetricsDataStruct;
+import org.apache.hadoop.hive.metastore.api.CompactionMetricsMetricType;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.metastore.txn.CompactionMetricsData;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.io.AcidDirectory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-
 import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hive.common.util.Ref;
-import org.apache.tez.common.counters.CounterGroup;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.counters.TezCounters;
+import org.apache.thrift.TException;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import java.io.IOException;
-import java.io.Serializable;
 import java.lang.management.ManagementFactory;
-import java.util.Arrays;
 import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.EnumMap;
-import java.util.HashMap;
 import java.util.List;
-import java.util.PriorityQueue;
-import java.util.Queue;
-
-import java.util.Set;
+import java.util.Map;
 import java.util.concurrent.Executors;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_DELTAS;
 import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_OBSOLETE_DELTAS;
 import static org.apache.hadoop.hive.metastore.metrics.MetricsConstants.COMPACTION_NUM_SMALL_DELTAS;
 
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_DELTAS;
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_OBSOLETE_DELTAS;
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_SMALL_DELTAS;
-
 /**
  * Collects and publishes ACID compaction related metrics.
- * Everything should be behind 2 feature flags: {@link HiveConf.ConfVars#HIVE_SERVER2_METRICS_ENABLED} and
+ * Everything should be behind 2 feature flags: {@link MetastoreConf.ConfVars#METRICS_ENABLED} and
  * {@link MetastoreConf.ConfVars#METASTORE_ACIDMETRICS_EXT_ON}.
- * First we store the information in the jobConf, then in Tez Counters, then in a cache stored here, then in a custom
- * MBean.
+ * First we store the information in the HMS backend DB COMPACTION_METRICS_CACHE table, then in a custom MBean.

Review comment:
       Maybe also mention that these values are logged...

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_DELTAS, numDeltas,
+            txnHandler);
+      }
+
+      if (numSmallDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_SMALL_DELTAS,
+            numSmallDeltas, txnHandler);
+      }
+
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, txnHandler);
+      }
+
+      LOG.debug("Finished updating delta file metrics from initiator.\n deltaPctThreshold = {}, deltasThreshold = {}, "
+          + "obsoleteDeltasThreshold = {}, numDeltas = {}, numSmallDeltas = {},  numObsoleteDeltas = {}",
+          deltaPctThreshold, deltasThreshold, obsoleteDeltasThreshold, numDeltas, numSmallDeltas, numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
+    }
+  }
+
+  public static void updateMetricsFromWorker(AcidDirectory directory, String dbName, String tableName, String partitionName,
+      CompactionType type, Configuration conf, IMetaStoreClient client) {
+    LOG.debug("Updating delta file metrics from worker");
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // we have an instance of the AcidDirectory before the compaction worker was started
+      // from this we can get how many delta directories existed
+      // the previously active delta directories are now moved to obsolete
+      int numObsoleteDeltas = directory.getCurrentDirectories().size();
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, client);
+      }
+
+      // We don't know the size of the newly create delta directories, that would require a fresh AcidDirectory
+      // Clear the small delta num counter from the cache for this key
+      client.removeCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_SMALL_DELTAS);
+
+      // The new number of active delta dirs are either 0, 1 or 2.
+      // If we ran MAJOR compaction, no new delta is created, just base dir
+      // If we ran MINOR compaction, we can have 1 or 2 new delta dirs, depending on whether we had deltas or
+      // delete deltas.
+      if (type == CompactionType.MAJOR) {
+        client.removeCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS);
+      } else {
+        int numNewDeltas = 0;
+        // check whether we had deltas
+        if (directory.getDeleteDeltas().size() > 0) {
+          numNewDeltas++;
+        }
+
+        // if the size of the current dirs is bigger than the size of delete deltas, it means we have active deltas
+        if (directory.getCurrentDirectories().size() > directory.getDeleteDeltas().size()) {
+          numNewDeltas++;
+        }
+
+        // recalculate the delta count
+        CompactionMetricsDataStruct prevDelta =
+            client.getCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS)
+                .getData();
+        int deltaNum = numNewDeltas;
+        if (prevDelta != null) {
+          deltaNum += prevDelta.getMetricvalue() - directory.getCurrentDirectories().size();

Review comment:
       There is a high chance that directory.getCurrentDirectories().size() > prevDelta.getMetricvalue() , i.e. more deltas were inserted since the initiator queued this compaction... in this case deltaNum shouldn't be changed

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();

Review comment:
       Should probably be:
   int numObsoleteDeltas = dir.getObsolete().size() + dir.getAbortedDirectories.size()
   
   And the Cleaner's update should also subtract aborted directories.
   
   I'm open to discuss this, though

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa
     }
     StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
-    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, dir.getObsolete().size());
+    return success;

Review comment:
       - Base directories and original files may be in the list of obsolete files the Cleaner deletes. Do you think it would be worth filtering the obsolete/aborted files’ names for “delta”?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa
     }
     StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
-    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, dir.getObsolete().size());

Review comment:
       should probably be only updated if success == true (I believe this means that at least 1 dir was removed)

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -139,157 +92,37 @@ public static DeltaFilesMetricReporter getInstance() {
     return InstanceHolder.instance;
   }
 
-  public static synchronized void init(HiveConf conf) throws Exception {
-    getInstance().configure(conf);
+  public static synchronized void init(Configuration conf, TxnStore txnHandler) throws Exception {
+    if (!initialized) {
+      getInstance().configure(conf, txnHandler);
+      initialized = true;
+    }
   }
 
-  private void configure(HiveConf conf) throws Exception {
+  private void configure(Configuration conf, TxnStore txnHandler) throws Exception {
     long reportingInterval =
-        HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
-    hiveEntitySeparator = conf.getVar(HiveConf.ConfVars.HIVE_ENTITY_SEPARATOR);
+        MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
+
+    maxCacheSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_MAX_CACHE_SIZE);
 
-    initCachesForMetrics(conf);
     initObjectsForMetrics();
 
     ThreadFactory threadFactory =
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter %d").build();
-    executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
-    executorService.scheduleAtFixedRate(new ReportingTask(), 0, reportingInterval, TimeUnit.SECONDS);
+    reporterExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+    reporterExecutorService.scheduleAtFixedRate(new ReportingTask(txnHandler), 0, reportingInterval, TimeUnit.SECONDS);
 
     LOG.info("Started DeltaFilesMetricReporter thread");

Review comment:
       This is a duplicate log line

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa
     }
     StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
-    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, dir.getObsolete().size());

Review comment:
       More readable solution : Update only if dir.getObsolete().size() > 0 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] klcopp commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
klcopp commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r787032118



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();

Review comment:
       I'm seriously wondering if we should.
   Cons:
   - the metric name is "obsolete deltas" would == obsolete + aborted deltas
   - We already have metrics about the amount of aborts in the system (gotten from metadata)
   
   Pros:
   - The Cleaner's job is to remove aborted directories as well ; so including metrics about aborted directories would help with observability of Cleaner health
   - Aborted directories can clog up the file system just as much as obsolete directories




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r786901861



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();

Review comment:
       Do we want to calculate aborted directories in the obsolete delta count? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r786502218



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -275,6 +276,13 @@ public void init(AtomicBoolean stop) throws Exception {
     compactionExecutor = CompactorUtil.createExecutorWithThreadFactory(
             conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE),
             COMPACTOR_INTIATOR_THREAD_NAME_FORMAT);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&

Review comment:
       same as above, q is whether we need to support runtime changes to the metrics configs




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r787541067



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -398,13 +156,6 @@ private static long getBaseSize(AcidDirectory dir) throws IOException {
     return baseSize;
   }
 
-  private static long getModificationTime(AcidUtils.ParsedDirectory dir, FileSystem fs) throws IOException {
-    return dir.getFiles(fs, Ref.from(false)).stream()
-      .map(HadoopShims.HdfsFileStatusWithId::getFileStatus)
-      .mapToLong(FileStatus::getModificationTime)
-      .max()
-      .orElse(new Date().getTime());
-  }
 
   private static long getDirSize(AcidUtils.ParsedDirectory dir, FileSystem fs) throws IOException {

Review comment:
       I think we should collect the small delta metrics because they can indicate issues with streaming (I wish I had such a metric when I investigated the Northfolk escalation :) )
   Also, we are doing the same calculation in the `Initiator` code (`Initiator#sumDirSize`) to determine the compaction type.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r786495219



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -87,14 +106,15 @@ public void init(AtomicBoolean stop) throws Exception {
     cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
             conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM),
             COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&

Review comment:
       with that change you won't be able to enable/disable metrics collection at runtime, is it intentional?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r786900655



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_DELTAS, numDeltas,
+            txnHandler);
+      }
+
+      if (numSmallDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_SMALL_DELTAS,
+            numSmallDeltas, txnHandler);
+      }
+
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, txnHandler);
+      }
+
+      LOG.debug("Finished updating delta file metrics from initiator.\n deltaPctThreshold = {}, deltasThreshold = {}, "
+          + "obsoleteDeltasThreshold = {}, numDeltas = {}, numSmallDeltas = {},  numObsoleteDeltas = {}",
+          deltaPctThreshold, deltasThreshold, obsoleteDeltasThreshold, numDeltas, numSmallDeltas, numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
+    }
+  }
+
+  public static void updateMetricsFromWorker(AcidDirectory directory, String dbName, String tableName, String partitionName,
+      CompactionType type, Configuration conf, IMetaStoreClient client) {
+    LOG.debug("Updating delta file metrics from worker");
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // we have an instance of the AcidDirectory before the compaction worker was started
+      // from this we can get how many delta directories existed
+      // the previously active delta directories are now moved to obsolete
+      int numObsoleteDeltas = directory.getCurrentDirectories().size();
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, client);
+      }
+
+      // We don't know the size of the newly create delta directories, that would require a fresh AcidDirectory
+      // Clear the small delta num counter from the cache for this key
+      client.removeCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_SMALL_DELTAS);
+
+      // The new number of active delta dirs are either 0, 1 or 2.
+      // If we ran MAJOR compaction, no new delta is created, just base dir
+      // If we ran MINOR compaction, we can have 1 or 2 new delta dirs, depending on whether we had deltas or
+      // delete deltas.
+      if (type == CompactionType.MAJOR) {
+        client.removeCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS);
+      } else {
+        int numNewDeltas = 0;
+        // check whether we had deltas
+        if (directory.getDeleteDeltas().size() > 0) {
+          numNewDeltas++;
+        }
+
+        // if the size of the current dirs is bigger than the size of delete deltas, it means we have active deltas
+        if (directory.getCurrentDirectories().size() > directory.getDeleteDeltas().size()) {
+          numNewDeltas++;
+        }
+
+        // recalculate the delta count
+        CompactionMetricsDataStruct prevDelta =
+            client.getCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS)
+                .getData();
+        int deltaNum = numNewDeltas;
+        if (prevDelta != null) {
+          deltaNum += prevDelta.getMetricvalue() - directory.getCurrentDirectories().size();

Review comment:
       That is true, but in that case shouldn't we clean up the active delta metrics, if that falls behind the threshold? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] klcopp commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
klcopp commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r787032118



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();

Review comment:
       I'm seriously wondering if we should.
   Cons:
   - the metric name is "obsolete deltas" would == obsolete + aborted deltas
   - We already have metrics about the amount of aborts in the system (gotten from metadata)
   Pros:
   - The Cleaner's job is to remove aborted directories as well ; so including metrics about aborted directories would help with observability of Cleaner health
   - Aborted directories can clog up the file system just as much as obsolete directories




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r788944194



##########
File path: ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestDeltaFilesMetrics.java
##########
@@ -39,199 +45,396 @@
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
-import java.util.EnumMap;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_DELTAS;
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_OBSOLETE_DELTAS;
-import static org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter.DeltaFilesMetricType.NUM_SMALL_DELTAS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public class TestDeltaFilesMetrics extends CompactorTest  {
 
   private void setUpHiveConf() {
-    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED, true);
-    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE, 2);
-    HiveConf.setTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_CACHE_DURATION, 7200, TimeUnit.SECONDS);
-    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_OBSOLETE_DELTA_NUM_THRESHOLD, 100);
-    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_DELTA_NUM_THRESHOLD, 100);
-    HiveConf.setTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, 1, TimeUnit.SECONDS);
+    MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD, 1);
+    MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD, 1);
+    MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_REPORTING_INTERVAL, 1,
+        TimeUnit.SECONDS);
+    MetastoreConf.setDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD, 0.15f);
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, true);
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON, true);
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
   }
 
-  private void initAndCollectFirstMetrics() throws Exception {
-    MetricsFactory.close();
-    MetricsFactory.init(conf);
+  @After
+  public void tearDown() throws Exception {
+    DeltaFilesMetricReporter.close();
+  }
 
-    DeltaFilesMetricReporter.init(conf);
 
-    TezCounters tezCounters = new TezCounters();
-    tezCounters.findCounter(NUM_OBSOLETE_DELTAS + "", "default.acid/p=1").setValue(200);
-    tezCounters.findCounter(NUM_OBSOLETE_DELTAS + "", "default.acid/p=2").setValue(100);
-    tezCounters.findCounter(NUM_OBSOLETE_DELTAS + "", "default.acid/p=3").setValue(150);
-    tezCounters.findCounter(NUM_OBSOLETE_DELTAS + "", "default.acid_v2").setValue(250);
+  static void verifyMetricsMatch(Map<String, String> expected, Map<String, String> actual) {
+    Assert.assertTrue("Actual metrics " + actual + " don't match expected: " + expected,
+        equivalent(expected, actual));
+  }
 
-    tezCounters.findCounter(NUM_DELTAS + "", "default.acid/p=1").setValue(150);
-    tezCounters.findCounter(NUM_DELTAS + "", "default.acid/p=2").setValue(100);
-    tezCounters.findCounter(NUM_DELTAS + "", "default.acid/p=3").setValue(250);
-    tezCounters.findCounter(NUM_DELTAS + "", "default.acid_v2").setValue(200);
+  private static boolean equivalent(Map<String, String> lhs, Map<String, String> rhs) {
+    return lhs.size() == rhs.size() && Maps.difference(lhs, rhs).areEqual();
+  }
 
-    tezCounters.findCounter(NUM_SMALL_DELTAS + "", "default.acid/p=1").setValue(250);
-    tezCounters.findCounter(NUM_SMALL_DELTAS + "", "default.acid/p=2").setValue(200);
-    tezCounters.findCounter(NUM_SMALL_DELTAS + "", "default.acid/p=3").setValue(150);
-    tezCounters.findCounter(NUM_SMALL_DELTAS + "", "default.acid_v2").setValue(100);
+  static Map<String, String> gaugeToMap(String metric) throws Exception {
+    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    ObjectName oname = new ObjectName(DeltaFilesMetricReporter.OBJECT_NAME_PREFIX + metric);
+    MBeanInfo mbeanInfo = mbs.getMBeanInfo(oname);
 
-    DeltaFilesMetricReporter.getInstance().submit(tezCounters, null);
-    Thread.sleep(1000);
+    Map<String, String> result = new HashMap<>();
+    for (MBeanAttributeInfo attr : mbeanInfo.getAttributes()) {
+      result.put(attr.getName(), String.valueOf(mbs.getAttribute(oname, attr.getName())));
+    }
+    return result;
   }
 
-  @After
-  public void tearDown() {
-    DeltaFilesMetricReporter.close();
+  @Override
+  boolean useHive130DeltaDirName() {
+    return false;
   }
 
   @Test
-  public void testDeltaFilesMetric() throws Exception {
+  public void testDeltaFileMetricPartitionedTable() throws Exception {
     setUpHiveConf();
-    initAndCollectFirstMetrics();
+    String dbName = "default";
+    String tblName = "dp";
+    String partName = "ds=part1";
 
+    Table t = newTable(dbName, tblName, true);
+    List<LockComponent> components = new ArrayList<>();
+
+    Partition p = newPartition(t, "part1");
+    addBaseFile(t, p, 20L, 20);
+    addDeltaFile(t, p, 21L, 22L, 2);
+    addDeltaFile(t, p, 23L, 24L, 20);
+
+    components.add(createLockComponent(dbName, tblName, partName));
+
+    burnThroughTransactions(dbName, tblName, 23);
+    long txnid = openTxn();
+
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    Assert.assertEquals(LockState.ACQUIRED, res.getState());
+
+    long writeid = allocateWriteId(dbName, tblName, txnid);
+    Assert.assertEquals(24, writeid);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+
+    startInitiator();
+
+    TimeUnit.SECONDS.sleep(1);

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r788980468



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_DELTAS, numDeltas,

Review comment:
       You are right, it won't be removed. I added an else part to the condition to resolve it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r786508087



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -671,6 +679,13 @@ private String getWorkerId() {
     return name.toString();
   }
 
+  private void updateDeltaFilesMetrics(AcidDirectory directory, String dbName, String tableName, String partName,

Review comment:
       move to the common place if possible. updateDeltaFilesMetrics could be triggered by multiple threads leading to resource starvation, think about introducing connection pool 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r787529159



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -142,6 +144,9 @@ public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     this.workerName = getWorkerId();
     setName(workerName);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON);

Review comment:
       I see your point now. I will remove this additional config check. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] klcopp commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
klcopp commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r787631271



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();
+
+      if (numDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_DELTAS, numDeltas,
+            txnHandler);
+      }
+
+      if (numSmallDeltas > deltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_SMALL_DELTAS,
+            numSmallDeltas, txnHandler);
+      }
+
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, txnHandler);
+      }
+
+      LOG.debug("Finished updating delta file metrics from initiator.\n deltaPctThreshold = {}, deltasThreshold = {}, "
+          + "obsoleteDeltasThreshold = {}, numDeltas = {}, numSmallDeltas = {},  numObsoleteDeltas = {}",
+          deltaPctThreshold, deltasThreshold, obsoleteDeltasThreshold, numDeltas, numSmallDeltas, numObsoleteDeltas);
+
+    } catch (Throwable t) {
+      LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
+    }
+  }
+
+  public static void updateMetricsFromWorker(AcidDirectory directory, String dbName, String tableName, String partitionName,
+      CompactionType type, Configuration conf, IMetaStoreClient client) {
+    LOG.debug("Updating delta file metrics from worker");
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // we have an instance of the AcidDirectory before the compaction worker was started
+      // from this we can get how many delta directories existed
+      // the previously active delta directories are now moved to obsolete
+      int numObsoleteDeltas = directory.getCurrentDirectories().size();
+      if (numObsoleteDeltas > obsoleteDeltasThreshold) {
+        updateMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_OBSOLETE_DELTAS,
+            numObsoleteDeltas, client);
+      }
+
+      // We don't know the size of the newly create delta directories, that would require a fresh AcidDirectory
+      // Clear the small delta num counter from the cache for this key
+      client.removeCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_SMALL_DELTAS);
+
+      // The new number of active delta dirs are either 0, 1 or 2.
+      // If we ran MAJOR compaction, no new delta is created, just base dir
+      // If we ran MINOR compaction, we can have 1 or 2 new delta dirs, depending on whether we had deltas or
+      // delete deltas.
+      if (type == CompactionType.MAJOR) {
+        client.removeCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS);
+      } else {
+        int numNewDeltas = 0;
+        // check whether we had deltas
+        if (directory.getDeleteDeltas().size() > 0) {
+          numNewDeltas++;
+        }
+
+        // if the size of the current dirs is bigger than the size of delete deltas, it means we have active deltas
+        if (directory.getCurrentDirectories().size() > directory.getDeleteDeltas().size()) {
+          numNewDeltas++;
+        }
+
+        // recalculate the delta count
+        CompactionMetricsDataStruct prevDelta =
+            client.getCompactionMetricsData(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS)
+                .getData();
+        int deltaNum = numNewDeltas;
+        if (prevDelta != null) {
+          deltaNum += prevDelta.getMetricvalue() - directory.getCurrentDirectories().size();

Review comment:
       At line 378, deltaNum== numNewDeltas=={0,1,2} If prevDelta.getMetricvalue() - directory.getCurrentDirectories().size() < 0 (because new deltas were inserted since the Initiator updated the metric) then deltaNum should still be =={0,1,2} at line 380.
   We'd still clean up the active delta metrics, if the threshold > {0,1,2}.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r789579851



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -412,7 +421,11 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa
     }
     StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
-    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    if (dir.getObsolete().size() > 0) {
+      updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, obsoleteDirs);

Review comment:
       Reverted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter merged pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter merged pull request #2916:
URL: https://github.com/apache/hive/pull/2916


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] klcopp commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
klcopp commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r787029629



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
##########
@@ -396,7 +423,9 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa
     }
     StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
         .map(Path::getName).collect(Collectors.joining(",")));
-    return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+    updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, dir.getObsolete().size());
+    return success;

Review comment:
       Let me rephrase then (discussion of aborted directories below):
   Base directories and original files may be in the list of obsolete files the Cleaner deletes. Do you think it would be worth filtering the obsolete files’ names for “delta”?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] klcopp commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
klcopp commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r787028622



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -398,13 +156,6 @@ private static long getBaseSize(AcidDirectory dir) throws IOException {
     return baseSize;
   }
 
-  private static long getModificationTime(AcidUtils.ParsedDirectory dir, FileSystem fs) throws IOException {
-    return dir.getFiles(fs, Ref.from(false)).stream()
-      .map(HadoopShims.HdfsFileStatusWithId::getFileStatus)
-      .mapToLong(FileStatus::getModificationTime)
-      .max()
-      .orElse(new Date().getTime());
-  }
 
   private static long getDirSize(AcidUtils.ParsedDirectory dir, FileSystem fs) throws IOException {

Review comment:
       Do you think that collecting metrics about small deltas is worth the amount this might slow down the Initiator/Worker/Cleaner?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r788944777



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -310,81 +143,6 @@ private static String getDeltaCountKey(String dbName, String tableName, String p
     return key.toString();
   }
 
-  private static void logDeltaDirMetrics(AcidDirectory dir, Configuration conf, int numObsoleteDeltas, int numDeltas,
-      int numSmallDeltas) {
-    long loggerFrequency = HiveConf
-        .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ACID_METRICS_LOGGER_FREQUENCY, TimeUnit.MILLISECONDS);
-    if (loggerFrequency <= 0) {
-      return;
-    }
-    long currentTime = System.currentTimeMillis();
-    if (lastSuccessfulLoggingTime == 0 || currentTime >= lastSuccessfulLoggingTime + loggerFrequency) {
-      lastSuccessfulLoggingTime = currentTime;
-      if (numDeltas >= HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ACTIVE_DELTA_DIR_THRESHOLD)) {
-        LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " active delta directories. This can " +
-            "cause performance degradation.");
-      }
-
-      if (numObsoleteDeltas >=
-          HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_OBSOLETE_DELTA_DIR_THRESHOLD)) {
-        LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " obsolete delta directories. This can " +
-            "indicate compaction cleaner issues.");
-      }
-
-      if (numSmallDeltas >= HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_SMALL_DELTA_DIR_THRESHOLD)) {
-        LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " small delta directories. This can " +
-            "indicate performance degradation and there might be a problem with your streaming setup.");
-      }
-    }
-  }
-
-  private static int getNumObsoleteDeltas(AcidDirectory dir, long checkThresholdInSec) throws IOException {
-    int numObsoleteDeltas = 0;
-    for (Path obsolete : dir.getObsolete()) {
-      FileStatus stat = dir.getFs().getFileStatus(obsolete);
-      if (System.currentTimeMillis() - stat.getModificationTime() >= checkThresholdInSec * 1000) {
-        numObsoleteDeltas++;
-      }
-    }
-    return numObsoleteDeltas;
-  }
-
-  public static void createCountersForAcidMetrics(TezCounters tezCounters, JobConf jobConf) {
-    if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) &&
-      MetastoreConf.getBoolVar(jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-
-      Arrays.stream(DeltaFilesMetricType.values())
-        .filter(type -> jobConf.get(type.name()) != null)
-        .forEach(type ->
-            Splitter.on(ENTRY_SEPARATOR).withKeyValueSeparator(KEY_VALUE_SEPARATOR).split(jobConf.get(type.name())).forEach(
-              (path, cnt) -> tezCounters.findCounter(type.value, path).setValue(Long.parseLong(cnt))
-            )
-        );
-    }
-  }
-
-  public static void addAcidMetricsToConfObj(EnumMap<DeltaFilesMetricType,
-      Queue<Pair<String, Integer>>> deltaFilesStats, Configuration conf) {
-    try {
-      deltaFilesStats.forEach((type, value) -> conf
-          .set(type.name(), Joiner.on(ENTRY_SEPARATOR).withKeyValueSeparator(KEY_VALUE_SEPARATOR).join(value)));
-
-    } catch (Exception e) {
-      LOG.warn("Couldn't add Delta metrics to conf object", e);
-    }
-  }
-
-  public static void backPropagateAcidMetrics(JobConf jobConf, Configuration conf) {
-    if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) &&
-      MetastoreConf.getBoolVar(jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-      try {
-        Arrays.stream(DeltaFilesMetricType.values()).filter(type -> conf.get(type.name()) != null)
-            .forEach(type -> jobConf.set(type.name(), conf.get(type.name())));
-      } catch (Exception e) {
-        LOG.warn("Couldn't back propagate Delta metrics to jobConf object", e);
-      }
-    }
-  }
 
   private static long getBaseSize(AcidDirectory dir) throws IOException {
     long baseSize = 0;

Review comment:
       removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r787537866



##########
File path: standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
##########
@@ -661,6 +661,16 @@ CREATE TABLE COMPLETED_COMPACTIONS (
 
 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION);
 
+-- HIVE-25842
+CREATE TABLE COMPACTION_METRICS_CACHE (
+  CMC_DATABASE varchar(128) NOT NULL,
+  CMC_TABLE varchar(128) NOT NULL,
+  CMC_PARTITION varchar(767),
+  CMC_METRIC_TYPE varchar(128) NOT NULL,
+  CMC_METRIC_VALUE integer NOT NULL,

Review comment:
       If for some reason, later on, we want to allow NULL values in this column (I can't think of any reason right now why would we do that), the NOT NULL -> NULL conversion is backward compatible change. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r787885695



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/metrics/DeltaFilesMetricReporter.java
##########
@@ -512,7 +284,177 @@ private void shutdown() {
     }
   }
 
-  public static class DeltaFilesMetadata implements Serializable {
-    public String dbName, tableName, partitionName;
+  public static void updateMetricsFromInitiator(AcidDirectory dir, String dbName, String tableName, String partitionName,
+      Configuration conf, TxnStore txnHandler) {
+    LOG.debug("Updating delta file metrics from initiator");
+    double deltaPctThreshold = MetastoreConf.getDoubleVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_PCT_THRESHOLD);
+    int deltasThreshold = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_DELTA_NUM_THRESHOLD);
+    int obsoleteDeltasThreshold = MetastoreConf.getIntVar(conf,
+        MetastoreConf.ConfVars.METASTORE_DELTAMETRICS_OBSOLETE_DELTA_NUM_THRESHOLD);
+    try {
+      // We have an AcidDir from the initiator, therefore we can use that to calculate active,small, obsolete delta
+      // count
+      long baseSize = getBaseSize(dir);
+
+      int numDeltas = dir.getCurrentDirectories().size();
+      int numSmallDeltas = 0;
+
+      for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
+        long deltaSize = getDirSize(delta, dir.getFs());
+        if (baseSize != 0 && deltaSize / (float) baseSize < deltaPctThreshold) {
+          numSmallDeltas++;
+        }
+      }
+
+      int numObsoleteDeltas = dir.getObsolete().size();

Review comment:
       Included aborted directories and filtered out original and base directories from the obsolete delta count. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #2916: HIVE-25842: Reimplement delta file metric collection

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #2916:
URL: https://github.com/apache/hive/pull/2916#discussion_r786503651



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
##########
@@ -331,6 +339,12 @@ private boolean foundCurrentOrFailedCompactions(ShowCompactResponse compactions,
     }
     return false;
   }
+  
+  private void updateDeltaFilesMetrics(AcidDirectory directory, String dbName, String tableName, String partName) {

Review comment:
       i think, this method could be extracted to a common place to be used by Cleaner and Initiator, like MetaStoreCompactorThread




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org